javascript
SpringBoot整合kafka(实现producer和consumer)
轉(zhuǎn)載自?SpringBoot整合kafka(實(shí)現(xiàn)producer和consumer)
在Windows環(huán)境下安裝運(yùn)行Kafka:https://www.jianshu.com/p/d64798e81f3b
本文代碼使用的是Spring Boot 2.1.1.RELEASE 版本
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.1.RELEASE</version><relativePath/> <!-- lookup parent from repository --> </parent>一、 pom.xml文件,引入依賴
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency> </dependencies>采用Kafka提供的StringSerializer和StringDeserializer進(jìn)行序列化和反序列化
1、在application-dev.properties配置生產(chǎn)者
#============== kafka =================== # 指定kafka server的地址,集群配多個(gè),中間,逗號隔開 spring.kafka.bootstrap-servers=127.0.0.1:9092#=============== provider ======================= # 寫入失敗時(shí),重試次數(shù)。當(dāng)leader節(jié)點(diǎn)失效,一個(gè)repli節(jié)點(diǎn)會(huì)替代成為leader節(jié)點(diǎn),此時(shí)可能出現(xiàn)寫入失敗, # 當(dāng)retris為0時(shí),produce不會(huì)重復(fù)。retirs重發(fā),此時(shí)repli節(jié)點(diǎn)完全成為leader節(jié)點(diǎn),不會(huì)產(chǎn)生消息丟失。 spring.kafka.producer.retries=0 # 每次批量發(fā)送消息的數(shù)量,produce積累到一定數(shù)據(jù),一次發(fā)送 spring.kafka.producer.batch-size=16384 # produce積累數(shù)據(jù)一次發(fā)送,緩存大小達(dá)到buffer.memory就發(fā)送數(shù)據(jù) spring.kafka.producer.buffer-memory=33554432#procedure要求leader在考慮完成請求之前收到的確認(rèn)數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化,其值可以為如下: #acks = 0 如果設(shè)置為零,則生產(chǎn)者將不會(huì)等待來自服務(wù)器的任何確認(rèn),該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無法保證服務(wù)器已收到記錄,并且重試配置將不會(huì)生效(因?yàn)榭蛻舳送ǔ2粫?huì)知道任何故障),為每條記錄返回的偏移量始終設(shè)置為-1。 #acks = 1 這意味著leader會(huì)將記錄寫入其本地日志,但無需等待所有副本服務(wù)器的完全確認(rèn)即可做出回應(yīng),在這種情況下,如果leader在確認(rèn)記錄后立即失敗,但在將數(shù)據(jù)復(fù)制到所有的副本服務(wù)器之前,則記錄將會(huì)丟失。 #acks = all 這意味著leader將等待完整的同步副本集以確認(rèn)記錄,這保證了只要至少一個(gè)同步副本服務(wù)器仍然存活,記錄就不會(huì)丟失,這是最強(qiáng)有力的保證,這相當(dāng)于acks = -1的設(shè)置。 #可以設(shè)置的值為:all, -1, 0, 1 spring.kafka.producer.acks=1# 指定消息key和消息體的編解碼方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer-
bootstrap.servers:kafka server的地址
-
acks:寫入kafka時(shí),leader負(fù)責(zé)一個(gè)該partion讀寫,當(dāng)寫入partition時(shí),需要將記錄同步到repli節(jié)點(diǎn),all是全部同步節(jié)點(diǎn)都返回成功,leader才返回ack。
-
retris:寫入失敗時(shí),重試次數(shù)。當(dāng)leader節(jié)點(diǎn)失效,一個(gè)repli節(jié)點(diǎn)會(huì)替代成為leader節(jié)點(diǎn),此時(shí)可能出現(xiàn)寫入失敗,當(dāng)retris為0時(shí),produce不會(huì)重復(fù)。retirs重發(fā),此時(shí)repli節(jié)點(diǎn)完全成為leader節(jié)點(diǎn),不會(huì)產(chǎn)生消息丟失。
-
batch.size:produce積累到一定數(shù)據(jù),一次發(fā)送。
buffer.memory: produce積累數(shù)據(jù)一次發(fā)送,緩存大小達(dá)到buffer.memory就發(fā)送數(shù)據(jù)。
-
linger.ms :當(dāng)設(shè)置了緩沖區(qū),消息就不會(huì)即時(shí)發(fā)送,如果消息總不夠條數(shù)、或者消息不夠buffer大小就不發(fā)送了嗎?當(dāng)消息超過linger時(shí)間,也會(huì)發(fā)送。
-
key/value serializer:序列化類。
2、生產(chǎn)者向kafka發(fā)送消息
@RestController public class KafkaController {@Autowiredprivate KafkaTemplate<String,Object> kafkaTemplate;@GetMapping("/message/send")public boolean send(@RequestParam String message){kafkaTemplate.send("testTopic",message);return true;}}3、在application-dev.properties配置消費(fèi)者
#=============== consumer ======================= # 指定默認(rèn)消費(fèi)者group id --> 由于在kafka中,同一組中的consumer不會(huì)讀取到同一個(gè)消息,依靠groud.id設(shè)置組名 spring.kafka.consumer.group-id=testGroup # smallest和largest才有效,如果smallest重新0開始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設(shè)置smallest spring.kafka.consumer.auto-offset-reset=earliest # enable.auto.commit:true --> 設(shè)置自動(dòng)提交offset spring.kafka.consumer.enable-auto-commit=true #如果'enable.auto.commit'為true,則消費(fèi)者偏移自動(dòng)提交給Kafka的頻率(以毫秒為單位),默認(rèn)值為5000。 spring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息體的編解碼方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer-
Producer是一個(gè)接口,聲明了同步send和異步send兩個(gè)重要方法。
-
ProducerRecord 消息實(shí)體類,每條消息由(topic,key,value,timestamp)四元組封裝。一條消息key可以為空和timestamp可以設(shè)置當(dāng)前時(shí)間為默認(rèn)值。
4、消費(fèi)者監(jiān)聽topic=testTopic的消息
@Component public class ConsumerListener {@KafkaListener(topics = "testTopic")public void onMessage(String message){//insertIntoDb(buffer);//這里為插入數(shù)據(jù)庫代碼System.out.println(message);}}到此,采用Kafka提供的StringSerializer和StringDeserializer進(jìn)行序列化和反序列化,因?yàn)榇朔N序列化方式無法序列化實(shí)體類,顧,下面為自定義序列化和反序列化器進(jìn)行實(shí)體類的消息傳遞
采用自定義序列化和反序列化器進(jìn)行實(shí)體類的序列化和反序列化
和內(nèi)置的StringSerializer字符串序列化一樣,如果要自定義序列化方式,需要實(shí)現(xiàn)接口Serializer。假設(shè)每個(gè)字段按照下圖所示的方式自定義序列化:
?
1、創(chuàng)建User實(shí)體類
public class User implements Serializable {private Long id;private String name;private Integer age;/*** transient 關(guān)鍵字修飾的字段不會(huì)被序列化*/private transient String desc;public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "User{" +"id=" + id +", name='" + name + '\'' +", age=" + age +", desc='" + desc + '\'' +'}';} }2、創(chuàng)建User序列化器
public class UserSerializable implements Serializer<User> {@Overridepublic void configure(Map<String, ?> map, boolean b) {}@Overridepublic byte[] serialize(String topic, User user) {System.out.println("topic : " + topic + ", user : " + user);byte[] dataArray = null;ByteArrayOutputStream outputStream = null;ObjectOutputStream objectOutputStream = null;try {outputStream = new ByteArrayOutputStream();objectOutputStream = new ObjectOutputStream(outputStream);objectOutputStream.writeObject(user);dataArray = outputStream.toByteArray();} catch (Exception e) {throw new RuntimeException(e);}finally {if(outputStream != null){try {outputStream.close();} catch (IOException e) {e.printStackTrace();}}if(objectOutputStream != null){try {objectOutputStream.close();} catch (IOException e) {e.printStackTrace();}}}return dataArray;}@Overridepublic void close() {} }3、創(chuàng)建User反序列化器
public class UserDeserializer implements Deserializer<User> {@Overridepublic void configure(Map<String, ?> map, boolean b) {}@Overridepublic User deserialize(String topic, byte[] bytes) {User user = null;ByteArrayInputStream inputStream = null;ObjectInputStream objectInputStream = null;try {inputStream = new ByteArrayInputStream(bytes);objectInputStream = new ObjectInputStream(inputStream);user = (User)objectInputStream.readObject();} catch (Exception e) {throw new RuntimeException(e);}finally {if(inputStream != null){try {inputStream.close();} catch (IOException e) {e.printStackTrace();}}if(objectInputStream != null){try {objectInputStream.close();} catch (IOException e) {e.printStackTrace();}}}return user;}@Overridepublic void close() {} }4、修改application-dev.properties配置
A、修改生產(chǎn)者配置的value-serializer
# 指定生產(chǎn)者消息key和消息體的編解碼方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.yibo.springbootkafkademo.Serializable.UserSerializableB、修改消費(fèi)者配置的value-deserializer
# 指定消費(fèi)者消息key和消息體的編解碼方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=com.yibo.springbootkafkademo.Serializable.UserDeserializer5、生產(chǎn)者向kafka發(fā)送消息
@RestController public class KafkaController {@Autowiredprivate KafkaTemplate<String,Object> kafkaTemplate;@PostMapping("/user/save")public boolean saveUser(@RequestBody User user){kafkaTemplate.send("userTopic",user);return true;} }6、消費(fèi)者監(jiān)聽topic=userTopic的消息
@Component public class ConsumerListener {@KafkaListener(topics = "userTopic")public void onMessage(User user){//insertIntoDb(buffer);//這里為插入數(shù)據(jù)庫代碼System.out.println(user);} }總結(jié)
可以看到,自定義Serializer和Deserializer非常痛苦,還有很多類型不支持,非常脆弱。復(fù)雜類型的支持更是一件痛苦的事情,不同版本之間的兼容性問題更是一個(gè)極大的挑戰(zhàn)。由于Serializer和Deserializer影響到上下游系統(tǒng),導(dǎo)致牽一發(fā)而動(dòng)全身。自定義序列化&反序列化實(shí)現(xiàn)不是能力的體現(xiàn),而是逗比的體現(xiàn)。所以強(qiáng)烈不建議自定義實(shí)現(xiàn)序列化&反序列化,推薦直接使用StringSerializer和StringDeserializer,然后使用json作為標(biāo)準(zhǔn)的數(shù)據(jù)傳輸格式。站在巨人的肩膀上,事半功倍。
?
總結(jié)
以上是生活随笔為你收集整理的SpringBoot整合kafka(实现producer和consumer)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cf对电脑配置要求高吗(cf对电脑配置要
- 下一篇: 好的游戏电脑配置推荐(好的游戏电脑配置)