ServiceMix部署自定义开发程序(ActiveMQ和Kafka实现)
上一篇整理了ServiceMix環境的搭建過程,接下來介紹在ServiceMix平臺下如何開發程序并部署到Karaf容器內,同時介紹下karaf容器內置的ActiveMQ消息組件的發送和接收,以及目前應用比較廣泛的分布式高吞吐量的消息系統Kafka在Karaf容器中的使用。
首先看下我們開發的程序部署的位置,我們使用Maven打包后的jar文件放在下圖的deploy目錄下,如果需要用到配置文件,可以放在etc目錄下,該目錄對應你Maven程序里的etc目錄:
1.用IntelliJ創建maven項目,修改pom內容,定義項目名稱和創建apache felix的plugin配置,并增加引用Kafka,完整內容請查閱后面的源碼文件:
<groupId>com.danejiang</groupId> <artifactId>BundleTest</artifactId> <version>1.0.0</version> <packaging>bundle</packaging> <name>DaneJiang BundleTest</name> <description>DaneJiang BundleTest</description><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.1.0</version></dependency><dependency><groupId>javax.jmdns</groupId><artifactId>jmdns</artifactId><version>3.4.1</version></dependency> </dependencies> 復制代碼2.創建activemq.java,實現ActiveMQ的生產者發送消息、消費者接收消息處理和停止操作: ActiveMQ生產者發送消息代碼如下:
public static boolean send(String topicType,String topicName, String topicMessage) {try {// 創建連接工廠ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616");// 創建JMS連接實例,并啟動連接Connection connection = factory.createConnection("smx", "smx");connection.start();// 創建Session對象,不開啟事務Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 創建主題和生成者:按消息類型分別處理MessageProducer producer = null;if(topicType.toLowerCase().equals("queue")){// 創建主題Queue queue = session.createQueue(topicName);// 創建生成者producer = session.createProducer(queue);}else if(topicType.toLowerCase().equals("topic")){// 創建主題Topic topic = session.createTopic(topicName);// 創建生成者producer = session.createProducer(topic);}else{logger.info("Send MQ Message error:not set topic type.");return false;}// 設置消息不需持久化。默認消息需要持久化//producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 創建文本消息 或者其他格式的信息TextMessage message = session.createTextMessage(topicMessage);// 發送消息。non-persistent 默認異步發送;persistent 默認同步發送producer.send(message);// 關閉會話和連接producer.close();session.close();connection.close();logger.info("Send MQ Message:" + topicName);return true;} catch (Exception e) {logger.info("Send MQ Message error:" + e.toString());return false;}} 復制代碼ActiveMQ消費者代碼如下:
private static Connection rConnection= null;private static Session rSession = null;private static MessageConsumer rMessageConsumer = null;public static boolean receive(String topicType, String topicName) {try {// 創建連接工廠ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616");// 創建JMS連接實例,并啟動連接rConnection = connectionFactory.createConnection("smx", "smx");rConnection.start();// 創建Session對象,不開啟事務rSession = rConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 創建主題和消費者:按消息類型分別處理if (topicType.toLowerCase().equals("queue")) {// 創建主題Queue queue = rSession.createQueue(topicName);// 創建消費者rMessageConsumer = rSession.createConsumer(queue);} else if (topicType.toLowerCase().equals("topic")) {// 創建主題Topic topic = rSession.createTopic(topicName);// 創建消費者rMessageConsumer = rSession.createConsumer(topic);} else {logger.info("Start MQ Message error:not set topic type.");return false;}// 異步消費rMessageConsumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {TextMessage mess = (TextMessage) message;try {//消息處理logger.info("Receive MQ Message:" + topicName+",Result:"+ mainService.doMQ(topicName, mess.getText()));} catch (JMSException e) {logger.info("Receive MQ Message error:" + e.toString());}}});logger.info("Started receive MQ Message:" + topicName);return true;} catch (Exception e) {logger.info("Start receive MQ Message error:" + e.toString());return false;}} 復制代碼下面的方法用于停止消費者的異步消費事件:
public static void stop() {try {// 關閉會話和連接if (rMessageConsumer != null) rMessageConsumer.close();if (rSession != null) rSession.close();if (rConnection != null) rConnection.close();logger.info("Stoped MQ Message.");} catch (Exception e) {logger.info("Stop MQ Message error:" + e.toString());}} 復制代碼3.創建kafka.java,實現Kafka消息組件的生產者發送消息、消費者接收消息處理和停止操作: Kafka生產者代碼如下:
public static boolean send(String topicName, String topicMessage) {try {Thread.currentThread().setContextClassLoader(null);Properties props = new Properties();props.put("bootstrap.servers", "hadoop01:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = null;try {producer = new KafkaProducer<>(props);producer.send(new ProducerRecord<String, String>(topicName, topicMessage));} catch (Exception e) {logger.info("Send Kafka Message error:" + e.toString());return false;} finally {producer.close();}logger.info("Send Kafka Message:" + topicName);return true;} catch (Exception ex) {logger.info("Send Kafka Message error:" + ex.toString());return false;}} 復制代碼Kafka消費者代碼如下:
private static KafkaConsumer<String, String> kafkaConsumer = null;public static boolean receive() {try {Thread.currentThread().setContextClassLoader(null);Properties props = new Properties();props.put("bootstrap.servers", "hadoop01:9092");props.put("group.id", "Group-1");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "earliest");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");kafkaConsumer = new KafkaConsumer<>(props);kafkaConsumer.subscribe(Collections.singletonList("test"));SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmssSSS");new Thread(df.format(new Date())) {public void run() {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> record : records) {logger.info("Receive Kafka Message:" + record.topic()+",Result:"+ mainService.doKafka(record.topic(),record.value()));}}}}.start();return true;} catch (Exception e) {logger.info("Start receive Kafka Message error:" + e.toString());return false;}} 復制代碼下面的方法用于停止消費者處理事件:
public static void stop() {try {// 關閉會話和連接if (kafkaConsumer != null) kafkaConsumer.close();logger.info("Stop Kafka Message.");} catch (Exception e) {logger.info("Stop Kafka Message error:" + e.toString());}} 復制代碼4.創建main函數,模擬消息組件的啟動和停止事件,以及處理過程: start函數啟動消費者監聽事件,stop函數停止監聽,后續這兩個函數會和karaf中bundle的啟動和停止事件綁定:
public static void start() {try {//activemq消費者啟動activemq.receive("Topic", "DaneJiang");//kafka消費者啟動kafka.receive();logger.info("MainService start success.");} catch (Exception ex) {logger.info("MainService start error:" + ex.toString());}}public static void stop() {try {//activemq停止activemq.stop();//kafka停止kafka.stop();} catch (Exception ex) {logger.info("MainService stop error:" + ex.toString());}} 復制代碼下面是接收到消息后的處理事件,具體內容根據需要自行調整:
public static String doMQ(String topicName, String topicMessage) {try {String result = "";switch (topicName.toUpperCase()) {case "DANEJIANG":result = topicMessage + " World!";break;default:result = "Receive Error Type:Type=" + topicName + ",Message=" + topicMessage;break;}return result;} catch (Exception ex) {logger.info("doMQ error:" + ex.toString());return ex.toString();}}public static String doKafka(String topicName, String topicMessage) {try {String result = "";switch (topicName.toUpperCase()) {case "DANEJIANG":result = topicMessage + " World!";break;default:result = "Receive Error Type:Type=" + topicName + ",Message=" + topicMessage;break;}return result;} catch (Exception ex) {logger.info("doKafka error:" + ex.toString());return ex.toString();}} 復制代碼5.最后創建文件Activator.java,用于處理bundle啟動和停止時觸發AceiveMQ和Kafka消息組件的對應事件:
import com.danejiang.service.mainService; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext;public class Activator implements BundleActivator {@Overridepublic void start(BundleContext arg0) throws Exception {mainService.start();System.out.println("start bundle!");}@Overridepublic void stop(BundleContext arg0) throws Exception {mainService.stop();System.out.println("stop bundle!");} } 復制代碼6.代碼完成后,用maven打包成jar文件,并將程序中lib目錄下的jmdns-3.4.1.jar、kafka-clients-2.1.0.jar和lz4-java-1.5.0.jar一并上傳至ServiceMix下的deploy目錄中:
使用命令bin/client進入karaf,輸入bundle:list可以查看到相關組件已經啟用: 輸入log:display可以查看組件的啟動情況:7.文章最后放上這次測試的源代碼,水平有限還請各位指正錯誤,謝謝! github.com/danejiang/S…
原文地址:danejiang.top/2019/04/24/…
總結
以上是生活随笔為你收集整理的ServiceMix部署自定义开发程序(ActiveMQ和Kafka实现)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 虎扑如何搜索用户(猫科豹属虎亚属动物)
- 下一篇: [O365] Azure Active