kafaka生产者消费者demo(简易上手demo)
kafaka生產(chǎn)者消費(fèi)者demo(簡(jiǎn)易上手demo)
文章目錄
- kafaka生產(chǎn)者消費(fèi)者demo(簡(jiǎn)易上手demo)
- 導(dǎo)包
- kafka官方client
- spring官方template
- spring官方springcloud stream starter
- kafka官方client使用
- 生產(chǎn)者Demo
- 消費(fèi)者Demo
- 簡(jiǎn)易的多線(xiàn)程生產(chǎn)者
- 生產(chǎn)
- 消費(fèi)
- 使用線(xiàn)程池優(yōu)化生產(chǎn)者
- ProducerThreadPool
- 測(cè)試使用
- 測(cè)試結(jié)果
- spring官方template使用
- 配置
- 生產(chǎn)者Demo
- 消費(fèi)者Demo
- spring官方springcloud stream starter使用
- 配置
- 啟動(dòng)類(lèi)
- 生產(chǎn)者Demo
- 消費(fèi)者Demo
- 測(cè)試類(lèi)
- 結(jié)果
導(dǎo)包
kafka官方client
kafka官方提供的Java client jar包
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.0</version></dependency>spring官方template
也可以使用spring官方提供的kafaka template
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.4</version> </dependency>spring官方springcloud stream starter
使用spring-cloud-starter-stream-kafka可以整合kafka進(jìn)入到spring項(xiàng)目中
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-kafka --> <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.2.2</version> </dependency>kafka官方client使用
生產(chǎn)者Demo
使用KafkaProducer做生產(chǎn)者,可以使用多線(xiàn)程模擬多個(gè)生產(chǎn)者,這里提供簡(jiǎn)單的test來(lái)供以參考。
- bootstrap.servers: kafka服務(wù)器的地址。
- acks:消息的確認(rèn)機(jī)制,默認(rèn)值是0。
- acks=0:如果設(shè)置為0,生產(chǎn)者不會(huì)等待kafka的響應(yīng)。
- acks=1:這個(gè)配置意味著kafka會(huì)把這條消息寫(xiě)到本地日志文件中,但是不會(huì)等待集群中其他機(jī)器的成功響應(yīng)。
- acks=all:這個(gè)配置意味著leader會(huì)等待所有的follower同步完成。這個(gè)確保消息不會(huì)丟失,除非kafka集群中所有機(jī)器掛掉。這是最強(qiáng)的可用性保證。
- retries:配置為大于0的值的話(huà),客戶(hù)端會(huì)在消息發(fā)送失敗時(shí)重新發(fā)送。(允許重發(fā)的情況)
- batch.size:當(dāng)多條消息需要發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)嘗試合并網(wǎng)絡(luò)請(qǐng)求。這會(huì)提高client和生產(chǎn)者的效率。
- key.serializer: 鍵序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer。
- value.deserializer:值序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer。
消費(fèi)者Demo
使用KafkaConsumer做消費(fèi)者client API,可以通過(guò)多線(xiàn)程模擬生產(chǎn)訂閱關(guān)系。這里給一個(gè)簡(jiǎn)單的消費(fèi)者demo。
- bootstrap.servers: kafka的地址。
- group.id:組名,不同組名可以重復(fù)消費(fèi)。(同組重復(fù)消費(fèi)會(huì)拋異常)
- enable.auto.commit:是否自動(dòng)提交,默認(rèn)為true。
- auto.commit.interval.ms: 從poll(拉)的回話(huà)處理時(shí)長(zhǎng)。
- session.timeout.ms:超時(shí)時(shí)間。
- max.poll.records:一次最大拉取的數(shù)據(jù)條數(shù)。
- auto.offset.reset:消費(fèi)規(guī)則,默認(rèn)earliest 。
- earliest: 當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),從頭開(kāi)始消費(fèi) 。
- latest: 當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù) 。
- none: topic各分區(qū)都存在已提交的offset時(shí),從offset后開(kāi)始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常。
- key.deserializer: 鍵反序列化器,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer
- value.deserializer:值反序列化器,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer
簡(jiǎn)易的多線(xiàn)程生產(chǎn)者
生產(chǎn)
實(shí)現(xiàn)Runnable接口可以實(shí)現(xiàn)簡(jiǎn)易的多線(xiàn)程生產(chǎn)者,模擬多個(gè)生產(chǎn)者生產(chǎn)
@Getter public class MyselfProducer implements Runnable{//主題(當(dāng)主題不存在,自動(dòng)創(chuàng)建主題)private final String topic;//配置private final Properties properties;//主題和配置的多線(xiàn)程共享public MyselfProducer(String topic,Properties properties){this.topic = topic;this.properties = properties;}@Overridepublic void run() {//每個(gè)線(xiàn)程單獨(dú)的生產(chǎn)者KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);//生產(chǎn)信息for (int i = 0; i < 100; i++) {String msg = String.format("hello,線(xiàn)程%s發(fā)送第%d條信息",Thread.currentThread().getName() , i);//消息(key可以為null,key值影響消息發(fā)往哪個(gè)分區(qū))ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, String.valueOf(i), msg);//發(fā)送kafkaProducer.send(producerRecord);//控制臺(tái)顯示System.out.println(msg);}//關(guān)閉kafkaProducer.close();} }消費(fèi)
使用多線(xiàn)程進(jìn)行生產(chǎn),然后使用消費(fèi)者Demo進(jìn)行消費(fèi),獲得以下結(jié)果
使用線(xiàn)程池優(yōu)化生產(chǎn)者
ProducerThreadPool
public class ProducerThreadPool{//主題(當(dāng)主題不存在,自動(dòng)創(chuàng)建主題)private final String topic;//配置private final Properties properties;//要產(chǎn)生的生產(chǎn)者線(xiàn)程類(lèi)private final Class<? extends Runnable> producerClass;//線(xiàn)程池private final ThreadPoolExecutor executor;public ProducerThreadPool(String topic,Properties properties,Class<? extends Runnable> c){//初始化線(xiàn)程池this.executor = new ThreadPoolExecutor(5,10,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());//主題this.topic = topic;//配置this.properties = properties;//線(xiàn)程類(lèi)this.producerClass = c;}public Future<?> createAndsubmit(){try {//反射出構(gòu)造器Constructor<? extends Runnable> constructor = producerClass.getConstructor(String.class, Properties.class);//實(shí)例化生產(chǎn)者線(xiàn)程Runnable runnable = constructor.newInstance(topic, properties);System.out.println("提交線(xiàn)程池");//提交到線(xiàn)程池return executor.submit(runnable);} catch (NoSuchMethodException e) {e.printStackTrace();} catch (IllegalAccessException e) {e.printStackTrace();} catch (InstantiationException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}return null;}}測(cè)試使用
寫(xiě)一個(gè)Test使用自己寫(xiě)的ProducerThreadPool生產(chǎn)者線(xiàn)程池
@Testpublic void testProducerThreadPool() throws InterruptedException {//主題(當(dāng)主題不存在,自動(dòng)創(chuàng)建主題)String topic = "threadPool_topic";//配置Properties properties = new Properties();//kafka服務(wù)器地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);ProducerThreadPool producerThreadPool = new ProducerThreadPool(topic, properties, MyselfProducer.class);//生產(chǎn)并提交Future<?> futureA = producerThreadPool.createAndsubmit();Future<?> futureB = producerThreadPool.createAndsubmit();Future<?> futureC = producerThreadPool.createAndsubmit();Thread.sleep(5000);System.out.println(String.format("線(xiàn)程A狀態(tài)%s",futureA.isDone()));System.out.println(String.format("線(xiàn)程B狀態(tài)%s",futureB.isDone()));System.out.println(String.format("線(xiàn)程C狀態(tài)%s",futureC.isDone()));}測(cè)試結(jié)果
生產(chǎn)過(guò)程結(jié)果
消費(fèi)結(jié)果
spring官方template使用
配置
使用spring官方提供的kafka template就需要配置Bean,講bean注入到上下文中。
@Configuration @EnableKafka public class KafkaConfiguration {//ConcurrentKafkaListenerContainerFactory為創(chuàng)建Kafka監(jiān)聽(tīng)器的工廠(chǎng)類(lèi)@Beanpublic ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(@Qualifier("consumerFactory") ConsumerFactory<Integer, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);return factory;}//kafkaTemplate實(shí)現(xiàn)了Kafka 生產(chǎn)者等功能@Beanpublic KafkaTemplate<Integer, String> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<Integer, String> producerFactory) {KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory);return template;}//根據(jù)consumerProps填寫(xiě)的參數(shù)創(chuàng)建消費(fèi)者工廠(chǎng)@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerProps());}//根據(jù)senderProps填寫(xiě)的參數(shù)創(chuàng)建生產(chǎn)者工廠(chǎng)@Beanpublic ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(ProducerProps());}//消費(fèi)者配置參數(shù)private Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();//連接地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//GroupIDprops.put(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-Kafka-1");//是否自動(dòng)提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自動(dòng)提交的頻率props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");//Session超時(shí)設(shè)置props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");//鍵的反序列化器props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);//值的反序列化器props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}//生產(chǎn)者配置private Map<String, Object> ProducerProps (){Map<String, Object> props = new HashMap<>();//Kafka服務(wù)器連接地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//重試機(jī)制,0為不啟用重試機(jī)制props.put(ProducerConfig.RETRIES_CONFIG, 1);//控制批處理大小,單位為字節(jié)props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//批量發(fā)送,延遲為1毫秒,啟用該功能能有效減少生產(chǎn)者發(fā)送消息次數(shù),減少網(wǎng)絡(luò)IO次數(shù)props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//生產(chǎn)者可以使用的總內(nèi)存字節(jié)來(lái)緩沖等待發(fā)送到服務(wù)器的記錄props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);//鍵的序列化器props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//值的序列化器props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}}生產(chǎn)者Demo
可以通過(guò)kafkaTemplate發(fā)送消息,也可以通過(guò)spring提供的工廠(chǎng)生產(chǎn)produce并進(jìn)行消息的發(fā)送。
@Component public class MsgProducer {//主題static final String topic = "spring-kafka";//spring提供的模板類(lèi)(生產(chǎn))@Autowiredprivate KafkaTemplate kafkaTemplate;//spring提供的生產(chǎn)者工廠(chǎng)@Autowiredprivate ProducerFactory producerFactory;//使用template發(fā)送消息public void sendMsg(Integer key, String msg){kafkaTemplate.send(topic,key,msg);}public void sendMsg(String msg){kafkaTemplate.send(topic,msg);}//使用原生Producer client API發(fā)送消息public void sendMsgByProducer(Integer key, String msg){Producer producer = producerFactory.createProducer();producer.send(new ProducerRecord(topic,key,msg));producer.close();}public void sendMsgByProducer(String msg){Producer producer = producerFactory.createProducer();producer.send(new ProducerRecord(topic,msg));producer.close();} }消費(fèi)者Demo
更具上面的配置,這些Consumer在組Consumer-Kafka-1,組里面有兩個(gè)不同的Consumer,分別是Consumer-1,Consumer-2。
@Slf4j @Component public class MsgConsumer {static final String topicA = "spring-kafka";static final String topicB = "spring-kafka-B";//訂閱一個(gè)主題@KafkaListener(id = "Consumer-1",topics = {topicA})public String getMsg(String msg){return msg;} //訂閱多個(gè)主題@KafkaListener(id = "Consumer-2",topics = {topicA,topicB})public String getMsgBytwo(String msg){return msg;}//指定主題分區(qū),并指定讀取的分區(qū)offset位置@KafkaListener(id = "Consumer-3",topicPartitions = {@TopicPartition(topic = topicA,partitions = {"0","1"}),@TopicPartition(topic = topicB,partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "100"))})public String getMsgByPartition(String msg){return msg;}//通過(guò)原生Consumer獲取消息public ConsumerRecords getMsgByConsumer(){Consumer consumer = consumerFactory.createConsumer();consumer.subscribe(Collections.singleton(topicA));ConsumerRecords poll = consumer.poll(Duration.ofMillis(500));consumer.close();return poll;}}spring官方springcloud stream starter使用
spring官方提供了一套統(tǒng)一的消息中間件的編程框架,對(duì)外提供統(tǒng)一的編程方式,隱藏底層消息中間件編程的差異。
關(guān)于springcloud stream 的概念可以查看:Spring Cloud Stream 體系及原理介紹-阿里云開(kāi)發(fā)者社區(qū) (aliyun.com)
配置
spring:cloud:stream:kafka:binder:brokers: localhost:9092bindings:input: #channelName,官方提供的默認(rèn)輸入通道名(消費(fèi)者)destination: topicA #消費(fèi)者訂閱的topicgroup: consumer-group-1 #消費(fèi)者分組content-type: text/plainoutput:destination: topicA #生產(chǎn)者將數(shù)據(jù)發(fā)送的topiccontentType: text/plain啟動(dòng)類(lèi)
因?yàn)闇y(cè)試需要,本人同時(shí)bind輸入和輸出channel(Source,Sink)。
@SpringBootApplication @EnableBinding({Source.class, Sink.class}) @ComponentScan("org.example.**") public class WebApplication {public static void main(String[] args) {SpringApplication.run(WebApplication.class,args);} }生產(chǎn)者Demo
@Component public class SourceProducer {@Autowiredprivate Source source;//默認(rèn)有一個(gè)叫output的MessageChannel@Autowiredprivate MessageChannel output;//通過(guò)source發(fā)送public void send(String msg){//source.output獲得是MessageChannelMessageChannel output = source.output();System.out.println("發(fā)送消息:"+msg);output.send(MessageBuilder.withPayload(msg).build());}//通過(guò)MessageChannel直接發(fā)送public void sendByChannel(String msg){System.out.println("發(fā)送消息:"+msg);output.send(MessageBuilder.withPayload(msg).build());} }消費(fèi)者Demo
@Component public class SinkConsumer {@StreamListener(Sink.INPUT)public void getMsg(Message<String> msg){System.out.println("收到消息:"+msg.getPayload());} }測(cè)試類(lèi)
因?yàn)镾pringRunner會(huì)啟動(dòng)spring容器,而容器里面有StreamListener監(jiān)聽(tīng)著Stream,
@SpringBootTest @RunWith(SpringRunner.class) public class ProductorPostTest {@Autowiredprivate SourceProducer sourceProducer;@Testpublic void testSource() throws InterruptedException {String msg = "消息A";while (true){sourceProducer.send(msg);Thread.sleep(1000);}} }結(jié)果
發(fā)送消息:消息A 收到消息:消息A 發(fā)送消息:消息A 收到消息:消息A 發(fā)送消息:消息A 收到消息:消息A總結(jié)
以上是生活随笔為你收集整理的kafaka生产者消费者demo(简易上手demo)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 刘强东回应采销喊话:水龙头已换新 还买了
- 下一篇: 170Hz+FastIPS:优派 27