springboot和kafka集成
生活随笔
收集整理的這篇文章主要介紹了
springboot和kafka集成
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
2019獨角獸企業重金招聘Python工程師標準>>>
1.pom.xml文件
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.2.2.RELEASE</version> </dependency>2.發送方的配置
package com.test.frame.kafka.configuration;import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.converter.StringJsonMessageConverter;import java.util.HashMap; import java.util.Map;/*** created by guanguan on 2017/9/6**/ @Configuration @EnableKafka public class KafkaProducerConfiguration {@Value("${kafka.bootstrap_servers}")private String bootstrapServers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String,String> producerFactory(){return new DefaultKafkaProducerFactory<String,String>(producerConfigs());}@Beanpublic KafkaTemplate<String,?> kafkaTemplate(){KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());//設置后可以傳送實體template.setMessageConverter(new StringJsonMessageConverter());return template; }@Beanpublic Producer producer(){return new Producer();}}3.消費者方的配置
package com.test.frame.kafka.configuration;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.converter.StringJsonMessageConverter;import java.util.HashMap; import java.util.Map;/*** created by guanguan on 2017/9/6**/ @Configuration @EnableKafka public class KafkaConsumerConfiguration {@Value("${kafka.bootstrap_servers}")private String bootstrapServers;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props1 = new HashMap<>();props1.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props1.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props1.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props1.put(ConsumerConfig.GROUP_ID_CONFIG, "jd-group"); //統一在一個組內return props1;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setMessageConverter(new StringJsonMessageConverter());return factory;}}4.發送
package com.test.frame.kafka.configuration;import com.test.frame.kafka.model.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.support.MessageBuilder;/*** created by guanguan on 2017/9/6**/public class Producer {private static final Logger logger = LoggerFactory.getLogger(Producer.class);@Value("${kafka.topic}")private String topic;@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(User payload) {kafkaTemplate.send(MessageBuilder.withPayload(payload).setHeader(KafkaHeaders.TOPIC, topic).build());logger.info("send message=> "+payload.toString());}}5.接收
package com.test.frame.kafka.configuration;import com.test.frame.kafka.model.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;/*** created by guanguan on 2017/9/6**/ @Component public class Consumer {private static final Logger logger = LoggerFactory.getLogger(Consumer.class);@KafkaListener(topics = "${kafka.topic}")public void recvMessage(User user) {logger.info("recv msg:=> " + user.toString());} }6.測試:
package com.test.frame.kafka.controller;import com.test.frame.kafka.configuration.Producer; import com.test.frame.kafka.model.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;@RestController @RequestMapping("/") public class ApiController {@AutowiredProducer producer;@RequestMapping("/")public String testkafka(@RequestParam String test) {User user = new User();user.setName("hh");producer.sendMessage(user);return "send kafak ok!";}}表明已經接收成功。
application.yml文件
kafka:bootstrap_servers: localhost:9092topic: test-topic?
轉載于:https://my.oschina.net/u/2263272/blog/1530299
總結
以上是生活随笔為你收集整理的springboot和kafka集成的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 这13个开源GIS软件,你了解几个?【转
- 下一篇: Sring boot学习笔记(三)-自带