flume与Mosquitto的集成
                                                            生活随笔
收集整理的這篇文章主要介紹了
                                flume与Mosquitto的集成
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.                        
                                文章來自:http://www.cnblogs.com/hark0623/p/4173714.html? ?轉發請注明?
?
因業務需求,需要flume收集MQTT(Mosquitto)的數據。 ?方法就是flume自定義source,source中來訂閱(subscribe)MQTT
?
flume source的java代碼如下:
package com.yhx.sensor.flume.source;import java.util.HashMap; import java.util.Map;import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.AbstractSource; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic;public class MQTTSource extends AbstractSource implements EventDrivenSource,Configurable {/*** The initialization method for the Source. The context contains all the* Flume configuration info, and can be used to retrieve any configuration* values necessary to set up the Source.*/@Overridepublic void configure(Context arg0) {// TODO Auto-generated method stub }SimpleMqttClient client = null;/*** Start any dependent systems and begin processing events.*/@Overridepublic void start() {// TODO Auto-generated method stub// super.start();client = new SimpleMqttClient();client.runClient();}/*** Stop processing events and shut any dependent systems down.*/@Overridepublic void stop() {// TODO Auto-generated method stub// super.stop();if (client != null) {client.closeConn();}}// public static void main(String[] args) {// SimpleMqttClient smc = new SimpleMqttClient();// smc.runClient();// }public class SimpleMqttClient implements MqttCallback {MqttClient myClient;MqttConnectOptions connOpt;String BROKER_URL = "tcp://192.168.116.128:1883";String M2MIO_DOMAIN = "192.168.116.128";String M2MIO_STUFF = "yhx";String M2MIO_THING = "yhx_flume";// String M2MIO_USERNAME = "<m2m.io username>";// String M2MIO_PASSWORD_MD5 =// "<m2m.io password (MD5 sum of password)>"; Boolean subscriber = true;Boolean publisher = false;/*** * connectionLost This callback is invoked upon losing the MQTT* connection.* */@Overridepublic void connectionLost(Throwable t) {System.out.println("Connection lost!");// code to reconnect to the broker would go here if desired }public void closeConn() {if (myClient != null) {if (myClient.isConnected()) {try {myClient.disconnect();} catch (MqttException e) {// TODO Auto-generated catch block e.printStackTrace();}}}}/*** * deliveryComplete This callback is invoked when a message published by* this client is successfully received by the broker.* */@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// System.out.println("Pub complete" + new// String(token.getMessage().getPayload())); }/*** * messageArrived This callback is invoked when a message is received on* a subscribed topic.* */@Overridepublic void messageArrived(String topic, MqttMessage message)throws Exception {// System.out// .println("-------------------------------------------------");// // System.out.println("| Topic:" + topic.getName());// System.out.println("| Topic:" + topic);// System.out// .println("| Message: " + new String(message.getPayload()));// System.out// .println("-------------------------------------------------"); Map<String, String> headers = new HashMap<String, String>();//headers.put("curDate", df.format(new Date())); Event flumeEvent = EventBuilder.withBody(message.getPayload(),headers);try {getChannelProcessor().processEvent(flumeEvent);} catch (Exception e) {// TODO: handle exception e.printStackTrace();}}/*** * runClient The main functionality of this simple example. Create a* MQTT client, connect to broker, pub/sub, disconnect.* */public void runClient() {// setup MQTT ClientString clientID = M2MIO_THING;connOpt = new MqttConnectOptions();connOpt.setCleanSession(true);connOpt.setKeepAliveInterval(3000);// connOpt.setUserName(M2MIO_USERNAME);// connOpt.setPassword(M2MIO_PASSWORD_MD5.toCharArray());// Connect to Brokertry {myClient = new MqttClient(BROKER_URL, clientID);myClient.setCallback(this);myClient.connect(connOpt);} catch (MqttException e) {e.printStackTrace();System.exit(-1);}System.out.println("Connected to " + BROKER_URL);// setup topic// topics on m2m.io are in the form <domain>/<stuff>/<thing>String myTopic = M2MIO_DOMAIN + "/" + M2MIO_STUFF + "/"+ M2MIO_THING;System.out.println("myTopic:" + myTopic);MqttTopic topic = myClient.getTopic(myTopic);// subscribe to topic if subscriberif (subscriber) {try {int subQoS = 0;myClient.subscribe(myTopic, subQoS);} catch (Exception e) {e.printStackTrace();}}// publish messages if publisherif (publisher) {for (int i = 1; i <= 10; i++) {String pubMsg = "{\"pubmsg\":" + i + "}";int pubQoS = 0;MqttMessage message = new MqttMessage(pubMsg.getBytes());message.setQos(pubQoS);message.setRetained(false);// Publish the messageSystem.out.println("Publishing to topic \"" + topic+ "\" qos " + pubQoS);MqttDeliveryToken token = null;try {// publish message to brokertoken = topic.publish(message);// Wait until the message has been delivered to the// broker token.waitForCompletion();Thread.sleep(100);} catch (Exception e) {e.printStackTrace();}}}// disconnecttry {// wait to ensure subscribed messages are deliveredif (subscriber) {while (true) {Thread.sleep(5000);}}// myClient.disconnect();} catch (Exception e) {e.printStackTrace();} finally {}}}}打JAR包注意要把Class-Path寫上,如下:
Manifest-Version: 1.0 Class-Path: flume-ng-configuration-1.5.2.jar flume-ng-core-1.5.2.jar flume-ng-node-1.5.2.jar flume-ng-sdk-1.5.2.jar org.eclipse.paho.client.mqttv3-1.0.0.jar?
將打好的JAR包放到flume的lib目錄(注意,class-path說明的jar包在lib一定要有。 如果沒有,則放上去)
?
接著修改一下flume的配置文件,如下(主要是sourceMqtt ,看這個。 ?因為我這塊同時還監聽了UDP):
a1.sources = sourceMqtt sourceUdp a1.sinks = sinkMqtt sinkUdp a1.channels = channelMqtt channelUdp# Describe/configure the source a1.sources.sourceMqtt.type = com.yhx.sensor.flume.source.MQTTSource# Describe the sink a1.sinks.sinkMqtt.type = logger# Use a channel which buffers events in memory a1.channels.channelMqtt.type = memory a1.channels.channelMqtt.capacity = 1000 a1.channels.channelMqtt.transactionCapacity = 100# Bind the source and sink to the channel a1.sources.sourceMqtt.channels = channelMqtt a1.sinks.sinkMqtt.channel = channelMqtt# a2.sources = sourceUdp # a2.sinks = sinkUdp # a2.channels = channelUdp# Describe/configure the source a1.sources.sourceUdp.type = syslogudp a1.sources.sourceUdp.host = 0.0.0.0 a1.sources.sourceUdp.port = 12459 a1.sources.sourceUdp.interceptors=interceptorUdpa1.sources.sourceUdp.interceptors.interceptorUdp.type=com.yhx.sensor.flume.intercepter.UDPIntercepter$Builder# Describe the sink a1.sinks.sinkUdp.type = logger# Use a channel which buffers events in memory a1.channels.channelUdp.type = memory a1.channels.channelUdp.capacity = 1000 a1.channels.channelUdp.transactionCapacity = 100# Bind the source and sink to the channel a1.sources.sourceUdp.channels = channelUdp a1.sinks.sinkUdp.channel = channelUdp?
配置文件保存至flume目錄下的conf,叫flume.conf
然后flume啟動命令如下?
bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1?
轉載于:https://www.cnblogs.com/hark0623/p/4173714.html
總結
以上是生活随笔為你收集整理的flume与Mosquitto的集成的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: ASP.NET Page执行顺序如:On
- 下一篇: 了解Sql Server的执行计划
