如何使用JMX监控Kafka
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
 
 歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/how-to-monitor-kafka-with-jmx/
使用kafka做消息隊列中間件時,為了實時監控其性能時,免不了要使用jmx調取kafka broker的內部數據,不管是自己重新做一個kafka集群的監控系統,還是使用一些開源的產品,比如yahoo的kafka manager, 其都需要使用jmx來監控一些敏感的數據。在kafka官網中 http://kafka.apache.org/082/documentation.html#monitoring 這樣說:
Kafka uses Yammer Metrics for metrics reporting in both the server and the client. This can be configured to report stats using pluggable stats reporters to hook up to your monitoring system.
 The easiest way to see the available metrics to fire up jconsole and point it at a running kafka client or server; this will all browsing all metrics with JMX.
可見kafka官方也是提倡使用jmx并且提供了jmx的調用給用戶以監控kafka.
本博文通過使用jmx調用kafka的幾個監測項屬性來講述下如何使用jmx來監控kafka.
 有關Jmx的使用可以參考:
- 從零開始玩轉JMX(一)——簡介和Standard MBean
- 從零開始玩轉JMX(二)——Condition
- 從零開始玩轉JMX(三)——Model MBean
- 從零開始玩轉JMX(四)——Apache Commons Modeler & Dynamic MBean
在使用jmx之前需要確保kafka開啟了jmx監控,kafka啟動時要添加JMX_PORT=9999這一項,也就是:
JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &博主自行搭建了一個kafka集群,只有兩個節點。集群中有一個topic(name=default_channel_kafka_zzh_demo),分為5個partition(0 1 2 3 4).
這里討論的kafka版本是0.8.1.x和0.8.2.x,這兩者在使用jmx監控時會有差異,差異體現在ObjectName之中。熟悉kafka的同學知道,kafka有topic和partition這兩個概念,topic中根據一定的策略來分為若干個partitions, 這里就以此舉例來看,
 在0.8.1.x中有關此項的屬性的ObjectName(String值)為:
 “kafka.log”:type=“Log”,name=“default_channel_kafka_zzh_demo-*-LogEndOffset”
而在0.8.2.x中有關的屬性的ObjectName為:
 kafka.log:type=Log,name=LogEndOffset,topic=default_channel_kafka_zzh_demo,partition=0
所以在程序中要區別對待。
這里采用三個監測項來演示如果使用jmx進行監控:
首先是針對單個kafka broker的。
package kafka.jmx;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import javax.management.*; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; import java.io.IOException; import java.net.MalformedURLException; import java.util.HashMap; import java.util.Map; import java.util.Set;/*** Created by hidden on 2016/12/8.*/ public class JmxConnection {private static Logger log = LoggerFactory.getLogger(JmxConnection.class);private MBeanServerConnection conn;private String jmxURL;private String ipAndPort = "localhost:9999";private int port = 9999;private boolean newKafkaVersion = false;public JmxConnection(Boolean newKafkaVersion, String ipAndPort){this.newKafkaVersion = newKafkaVersion;this.ipAndPort = ipAndPort;}public boolean init(){jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";log.info("init jmx, jmxUrl: {}, and begin to connect it",jmxURL);try {JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null);conn = connector.getMBeanServerConnection();if(conn == null){log.error("get connection return null!");return false;}} catch (MalformedURLException e) {e.printStackTrace();return false;} catch (IOException e) {e.printStackTrace();return false;}return true;}public String getTopicName(String topicName){String s;if (newKafkaVersion) {s = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=" + topicName;} else {s = "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"" + topicName + "-MessagesInPerSec\"";}return s;}/*** @param topicName: topic name, default_channel_kafka_zzh_demo* @return 獲取發送量(單個broker的,要計算某個topic的總的發送量就要計算集群中每一個broker之和)*/ public long getMsgInCountPerSec(String topicName){String objectName = getTopicName(topicName);Object val = getAttribute(objectName,"Count");String debugInfo = "jmxUrl:"+jmxURL+",objectName="+objectName;if(val !=null){log.info("{}, Count:{}",debugInfo,(long)val);return (long)val;}return 0; }/*** @param topicName: topic name, default_channel_kafka_zzh_demo* @return 獲取發送的tps,和發送量一樣如果要計算某個topic的發送量就需要計算集群中每一個broker中此topic的tps之和。*/public double getMsgInTpsPerSec(String topicName){String objectName = getTopicName(topicName);Object val = getAttribute(objectName,"OneMinuteRate");if(val !=null){double dVal = ((Double)val).doubleValue();return dVal;}return 0;}private Object getAttribute(String objName, String objAttr){ObjectName objectName =null;try {objectName = new ObjectName(objName);} catch (MalformedObjectNameException e) {e.printStackTrace();return null;}return getAttribute(objectName,objAttr);}private Object getAttribute(ObjectName objName, String objAttr){if(conn== null){log.error("jmx connection is null");return null;}try {return conn.getAttribute(objName,objAttr);} catch (MBeanException e) {e.printStackTrace();return null;} catch (AttributeNotFoundException e) {e.printStackTrace();return null;} catch (InstanceNotFoundException e) {e.printStackTrace();return null;} catch (ReflectionException e) {e.printStackTrace();return null;} catch (IOException e) {e.printStackTrace();return null;}}/*** @param topicName* @return 獲取topicName中每個partition所對應的logSize(即offset)*/public Map<Integer,Long> getTopicEndOffset(String topicName){Set<ObjectName> objs = getEndOffsetObjects(topicName);if(objs == null){return null;}Map<Integer, Long> map = new HashMap<>();for(ObjectName objName:objs){int partId = getParId(objName);Object val = getAttribute(objName,"Value");if(val !=null){map.put(partId,(Long)val);}}return map;}private int getParId(ObjectName objName){if(newKafkaVersion){String s = objName.getKeyProperty("partition");return Integer.parseInt(s);}else {String s = objName.getKeyProperty("name");int to = s.lastIndexOf("-LogEndOffset");String s1 = s.substring(0, to);int from = s1.lastIndexOf("-") + 1;String ss = s.substring(from, to);return Integer.parseInt(ss);}}private Set<ObjectName> getEndOffsetObjects(String topicName){String objectName;if (newKafkaVersion) {objectName = "kafka.log:type=Log,name=LogEndOffset,topic="+topicName+",partition=*";}else{objectName = "\"kafka.log\":type=\"Log\",name=\"" + topicName + "-*-LogEndOffset\"";}ObjectName objName = null;Set<ObjectName> objectNames = null;try {objName = new ObjectName(objectName);objectNames = conn.queryNames(objName,null);} catch (MalformedObjectNameException e) {e.printStackTrace();return null;} catch (IOException e) {e.printStackTrace();return null;}return objectNames;} }注意代碼中對于兩種不同kafka版本的區別處理。對應前面所說的三個檢測項的方法為:
public Map<Integer,Long> getTopicEndOffset(String topicName) public long getMsgInCountPerSec(String topicName) public double getMsgInTpsPerSec(String topicName)對于整個集群的處理需要另外一個類來保證,總體上是對集群中的每一個broker相應的值進行累加,且看代碼:
package kafka.jmx;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;/*** Created by hidden on 2016/12/8.*/ public class JmxMgr {private static Logger log = LoggerFactory.getLogger(JmxMgr.class);private static List<JmxConnection> conns = new ArrayList<>();public static boolean init(List<String> ipPortList, boolean newKafkaVersion){for(String ipPort:ipPortList){log.info("init jmxConnection [{}]",ipPort);JmxConnection conn = new JmxConnection(newKafkaVersion, ipPort);boolean bRet = conn.init();if(!bRet){log.error("init jmxConnection error");return false;}conns.add(conn);}return true;}public static long getMsgInCountPerSec(String topicName){long val = 0;for(JmxConnection conn:conns){long temp = conn.getMsgInCountPerSec(topicName);val += temp;}return val;}public static double getMsgInTpsPerSec(String topicName){double val = 0;for(JmxConnection conn:conns){double temp = conn.getMsgInTpsPerSec(topicName);val += temp;}return val;}public static Map<Integer, Long> getEndOffset(String topicName){Map<Integer,Long> map = new HashMap<>();for(JmxConnection conn:conns){Map<Integer,Long> tmp = conn.getTopicEndOffset(topicName);if(tmp == null){log.warn("get topic endoffset return null, topic {}", topicName);continue;}for(Integer parId:tmp.keySet()){//change if biggerif(!map.containsKey(parId) || (map.containsKey(parId) && (tmp.get(parId)>map.get(parId))) ){map.put(parId, tmp.get(parId));}}}return map;}public static void main(String[] args) {List<String> ipPortList = new ArrayList<>();ipPortList.add("xx.101.130.1:9999");ipPortList.add("xx.101.130.2:9999");JmxMgr.init(ipPortList,true);String topicName = "default_channel_kafka_zzh_demo";System.out.println(getMsgInCountPerSec(topicName));System.out.println(getMsgInTpsPerSec(topicName));System.out.println(getEndOffset(topicName));} }運行結果:
2016-12-08 19:25:32 -[INFO] - [init jmxConnection [xx.101.130.1:9999]] - [kafka.jmx.JmxMgr:20] 2016-12-08 19:25:32 -[INFO] - [init jmx, jmxUrl: service:jmx:rmi:///jndi/rmi://xx.101.130.1:9999/jmxrmi, and begin to connect it] - [kafka.jmx.JmxConnection:35] 2016-12-08 19:25:33 -[INFO] - [init jmxConnection [xx.101.130.2:9999]] - [kafka.jmx.JmxMgr:20] 2016-12-08 19:25:33 -[INFO] - [init jmx, jmxUrl: service:jmx:rmi:///jndi/rmi://xx.101.130.2:9999/jmxrmi, and begin to connect it] - [kafka.jmx.JmxConnection:35] 2016-12-08 20:45:15 -[INFO] - [jmxUrl:service:jmx:rmi:///jndi/rmi://xx.101.130.1:9999/jmxrmi,objectName=kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=default_channel_kafka_zzh_demo, Count:6000] - [kafka.jmx.JmxConnection:73] 2016-12-08 20:45:15 -[INFO] - [jmxUrl:service:jmx:rmi:///jndi/rmi://xx.101.130.2:9999/jmxrmi,objectName=kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=default_channel_kafka_zzh_demo, Count:4384] - [kafka.jmx.JmxConnection:73] 10384 3.915592283987704E-65 {0=2072, 1=2084, 2=2073, 3=2083, 4=2072}觀察運行結果可以發現 6000+4384 = 10384 = 2072+2084+2073+2083+2072,小伙伴們可以揣摩下原因。
 可以通過jconsole連接service:jmx:rmi:///jndi/rmi://xx.101.130.1:9999/jmxrmi或者service:jmx:rmi:///jndi/rmi://xx.101.130.2:9999/jmxrmi來查看相應的數據值。如下圖:
 
也可以通過命令行的形式來查看某項數據,不過這里要借助一個jar包:cmdline-jmxclient-0.xx.3.jar,這個請自行下載,網上很多。
 將這個jar放入某一目錄,博主這里放在了linux系統下的/root/util目錄中,以offset舉例:
 0.8.1.x版-讀取topic=default_channel_kafka_zzh_demo,partition=0的Value值:
0.8.2.x版-讀取topic=default_channel_kafka_zzh_demo,partition=0的Value值:
java -jar cmdline-jmxclient-0.10.3.jar - xx.101.130.1:9999 kafka.log:type=Log,name=LogEndOffset,topic=default_channel_kafka_zzh_demo,partition=0看出規律了嘛?如果還是沒有,博主再提示一個小技巧,你可以用Jconsole打開相應的屬性,然后將鼠標浮于其上,Jconsole會跳出tooltips來提示怎么拼這些屬性的ObjectName.
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/how-to-monitor-kafka-with-jmx/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
 
總結
以上是生活随笔為你收集整理的如何使用JMX监控Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Kafka集群配置
- 下一篇: RabbitMQ Network Par
