kafka的简单使用
生活随笔
收集整理的這篇文章主要介紹了
kafka的简单使用
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??
在eclipse中新建kafka-demo的maven項(xiàng)目,pom.xml依賴如下
ProducerDemo.java
package?com.leech.kafka.demo;import?java.util.Date; import?java.util.Properties;import?kafka.javaapi.producer.Producer; import?kafka.producer.KeyedMessage; import?kafka.producer.ProducerConfig;public?class?ProducerDemo?{public?static?void?main(String[]?args)?{//Random?rnd?=?new?Random();int?events=100;//?設(shè)置配置屬性Properties?props?=?new?Properties();//props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");props.put("metadata.broker.list","192.168.1.82:9092");props.put("serializer.class",?"kafka.serializer.StringEncoder");//?key.serializer.class默認(rèn)為serializer.classprops.put("key.serializer.class",?"kafka.serializer.StringEncoder");//?可選配置,如果不配置,則使用默認(rèn)的partitionerprops.put("partitioner.class",?"com.leech.kafka.demo.PartitionerDemo");//?觸發(fā)acknowledgement機(jī)制,否則是fire?and?forget,可能會(huì)引起數(shù)據(jù)丟失//?值為0,1,-1,可以參考//?http://kafka.apache.org/08/configuration.htmlprops.put("request.required.acks",?"1");ProducerConfig?config?=?new?ProducerConfig(props);//?創(chuàng)建producerProducer<String,?String>?producer?=?new?Producer<String,?String>(config);//?產(chǎn)生并發(fā)送消息long?start=System.currentTimeMillis();for?(long?i?=?0;?i?<?events;?i++)?{long?runtime?=?new?Date().getTime();String?ip?=?"192.168.2."?+?i;//rnd.nextInt(255);String?msg?=?runtime?+?",www.example.com,"?+?ip;//如果topic不存在,則會(huì)自動(dòng)創(chuàng)建,默認(rèn)replication-factor為1,partitions為0KeyedMessage<String,?String>?data?=?new?KeyedMessage<String,?String>("page_visits",?ip,?msg);producer.send(data);}System.out.println("耗時(shí):"?+?(System.currentTimeMillis()?-?start));//?關(guān)閉producerproducer.close();} }ConsumerDemo.java
package?com.leech.kafka.demo;import?java.util.HashMap; import?java.util.List; import?java.util.Map; import?java.util.Properties; import?java.util.concurrent.ExecutorService; import?java.util.concurrent.Executors;import?kafka.consumer.Consumer; import?kafka.consumer.ConsumerConfig; import?kafka.consumer.KafkaStream; import?kafka.javaapi.consumer.ConsumerConnector;/***?詳細(xì)可以參考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example*?*?@author?Fung**/ public?class?ConsumerDemo?{private?final?ConsumerConnector?consumer;private?final?String?topic;private?ExecutorService?executor;public?ConsumerDemo(String?a_zookeeper,?String?a_groupId,?String?a_topic)?{consumer?=?Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));this.topic?=?a_topic;}public?void?shutdown()?{if?(consumer?!=?null)consumer.shutdown();if?(executor?!=?null)executor.shutdown();}public?void?run(int?numThreads)?{Map<String,?Integer>?topicCountMap?=?new?HashMap<String,?Integer>();topicCountMap.put(topic,?new?Integer(numThreads));Map<String,?List<KafkaStream<byte[],?byte[]>>>?consumerMap?=?consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[],?byte[]>>?streams?=?consumerMap.get(topic);//?now?launch?all?the?threadsexecutor?=?Executors.newFixedThreadPool(numThreads);//?now?create?an?object?to?consume?the?messages//int?threadNumber?=?0;for?(final?KafkaStream?stream?:?streams)?{executor.submit(new?ConsumerMsgTask(stream,?threadNumber));threadNumber++;}}private?static?ConsumerConfig?createConsumerConfig(String?a_zookeeper,String?a_groupId)?{Properties?props?=?new?Properties();props.put("zookeeper.connect",?a_zookeeper);props.put("group.id",?a_groupId);props.put("zookeeper.session.timeout.ms",?"400");props.put("zookeeper.sync.time.ms",?"200");props.put("auto.commit.interval.ms",?"1000");return?new?ConsumerConfig(props);}public?static?void?main(String[]?arg)?{String[]?args?=?{?"192.168.1.82:2181",?"group-1",?"page_visits",?"12"?};String?zooKeeper?=?args[0];String?groupId?=?args[1];String?topic?=?args[2];int?threads?=?Integer.parseInt(args[3]);ConsumerDemo?demo?=?new?ConsumerDemo(zooKeeper,?groupId,?topic);demo.run(threads);try?{Thread.sleep(10000);}?catch?(InterruptedException?ie)?{}demo.shutdown();} }ConsumerMsgTask.java
package?com.leech.kafka.demo;import?kafka.consumer.ConsumerIterator; import?kafka.consumer.KafkaStream;public?class?ConsumerMsgTask?implements?Runnable?{private?KafkaStream?m_stream;private?int?m_threadNumber;public?ConsumerMsgTask(KafkaStream?stream,?int?threadNumber)?{m_threadNumber?=?threadNumber;m_stream?=?stream;}@Overridepublic?void?run()?{ConsumerIterator<byte[],?byte[]>?it?=?m_stream.iterator();while?(it.hasNext())System.out.println("Thread?"?+?m_threadNumber?+?":?"+?new?String(it.next().message()));System.out.println("Shutting?down?Thread:?"?+?m_threadNumber);} }PartitionerDemo.java
package?com.leech.kafka.demo;import?kafka.producer.Partitioner; import?kafka.utils.VerifiableProperties;public?class?PartitionerDemo?implements?Partitioner?{public?PartitionerDemo(VerifiableProperties?props)?{}@Overridepublic?int?partition(Object?obj,?int?numPartitions)?{int?partition?=?0;if?(obj?instanceof?String)?{String?key=(String)obj;int?offset?=?key.lastIndexOf('.');if?(offset?>?0)?{partition?=?Integer.parseInt(key.substring(offset?+?1))?%?numPartitions;}}else{partition?=?obj.toString().length()?%?numPartitions;}return?partition;}}轉(zhuǎn)載于:https://my.oschina.net/chaun/blog/408511
總結(jié)
以上是生活随笔為你收集整理的kafka的简单使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于wcf三大工具的使用(wsdl.ex
- 下一篇: Javascript网页摇一摇