kafka安装使用
版本:kafka_2.11-0.10.1.0 ?(之前安裝2.10-0.10.0.0,一直出問題)
?
- 安裝
- Springboot結合Kafka的使用
?
安裝
?
?
?
?
?
?
?
Springboot結合Kafka的使用?
1.在pom文件添加依賴
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>?
2.在application.properties中添加配置
# APACHE KAFKA (KafkaProperties) spring.kafka.bootstrap-servers=192.168.0.155:9092,192.168.0.156:9092 spring.kafka.client-id=K1spring.kafka.consumer.auto-offset-reset= earliest spring.kafka.consumer.enable-auto-commit= true spring.kafka.consumer.group-id= test-consumer-group
spring.kafka.producer.batch-size=2 spring.kafka.producer.bootstrap-servers= 192.168.0.155:9092,192.168.0.156:9092 spring.kafka.producer.client-id= P1 spring.kafka.producer.retries=3 spring.kafka.template.default-topic= test
?
3.創建消費者類(訂閱消息的對象)
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;@Component public class ListenerBean {@KafkaListener(topics = "myTopic")public void processMessage(String content) {System.out.println("you have a new message:" + content);// ... } }?
4.創建生產者類(發布消息的對象)
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController;@Component @RestController @RequestMapping("/send") @EnableAutoConfiguration public class SendMsgBean {private final KafkaTemplate<String,String> kafkaTemplate;@Autowiredpublic SendMsgBean(KafkaTemplate<String,String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@RequestMapping(path="/{msg}",method=RequestMethod.GET)public String send(@PathVariable("msg") String msg) {System.out.println("==sending msg:" + msg);kafkaTemplate.send("test","test-"+msg);return "message has been sent!";} }?
?
只需這4步,就可以在springboot中使用kafka了,現在我們訪問 http://localhost:8080/send/mymessage ?就可以在控制臺看到信息了。
源碼下載
?
參考:
- Kafka producer程序本地運行時發送信息失敗解決方案
轉載于:https://www.cnblogs.com/TiestoRay/p/6394602.html
總結
- 上一篇: ruby on rails Mac 安装
- 下一篇: java开发必背API