kafka消息消费有延迟_简易实现kafka延迟消息
背景
當前業務存在以下場景:在一個事務內的最后一步是發送kafka消息,消費端收到通知后讀取數據并做處理。但是由于kafka幾乎是即時收到消息,導致偶爾出現“在發完kafka和提交事務的間隙,消費端收到了消息并讀取到了事務提交前的數據”。
這個問題可以通過延遲消息來解決。
發送端 vs 消費端
要做延遲,那么首先要考慮的是:延遲放在發送端,還是放在消費端?最終選擇放在消費端:讓數據先被kafka存儲起來,數據更安全。
想把延遲消息做成一個服務,不只是支持某一個場景/業務,在這種設計前提下,讓延遲邏輯放在消費端,可以統一調整邏輯,也方便排查問題。
思路
是在整體外面包一層代理:另外創建一個延遲Topic,延遲消息都發到延遲Topic里。
有專門的服務來消費延遲Topic的消息,取到消息之后存儲起來,定期檢查消息是否已經延遲時間。
已到延遲時間的消息,重新發送到原先Topic。
這樣做的好處是,不需要對kafka做任何改造。
存儲
延遲隊列消費者拉取到消息之后,要怎么存儲?第三方存儲,其需要滿足以下幾個條件:高性能:寫入延遲要低,MQ的一個重要作用是削峰填谷,在選擇臨時存儲時,寫入性能必須要高,關系型數據庫(如Mysql)通常不滿足需求。
高可靠:延遲消息寫入后,不能丟失,需要進行持久化,并進行備份
存儲成本低:可以支持大量消息存儲,(Redis存儲成本太高)。
支持排序: 支持按照某個字段對消息進行排序,對于延遲消息需要按照時間進行排序。普通消息通常先發送的會被先消費,延遲消息與普通消息不同,需要進行排序。例如先發一條延遲10s的消息,再發一條延遲5s的消息,那么后發送的消息需要被先消費。
支持長時間保存:一些業務的延遲消息,需要延遲幾個月,甚至更長,所以延遲消息必須能長時間保留。不過通常不建議延遲太長時間,存儲成本比較大,且業務邏輯可能已經發生變化,已經不需要消費這些消息。
基于以上條件,選擇了RocksDB來存儲數據:高性能嵌入式KV存儲引擎。
數據持久化到磁盤。
基于LMS存儲,key自然排序,迭代器(Iterator)根據key順序遍歷。
代碼
發送端
消息基類public class DelayMessage {
/**
* 事件唯一ID,用于去重檢查
*/
private String eventId = UUIDGenerator.generateString();
/**
* 事件時間
*/
@JSONField(format = KafkaConstants.DATETIME_FORMAT)
private Date eventTime = new Date();
/**
* 真實事件時間
*/
@JSONField(format = KafkaConstants.DATETIME_FORMAT)
private Date actualTime;
/**
* 真實Topic
*/
private String actualTopic;
public Date getActualTime() {
return actualTime;
}
public T setActualTime(Date actualTime) {
this.actualTime = actualTime;
return (T) this;
}
public String getActualTopic() {
return actualTopic;
}
public T setActualTopic(String actualTopic) {
this.actualTopic = actualTopic;
return (T) this;
}
public Date getEventTime() {
return eventTime;
}
public T setEventTime(Date eventTime) {
this.eventTime = eventTime;
return (T) this;
}
}
消息對象繼承DelayMessage,將消息發送到延遲Topic。
延遲服務消費端
接收延遲消息@KafkaListener(topics = {KafkaConstants.KAFKA_TOPIC_MESSAGE_DELAY}, containerFactory = "kafkaContainerFactory")
public boolean onMessage(String json) throws Throwable {
try {
DelayMessage delayMessage = deserialize(json, DelayMessage.class);
if (!isDelay(delayMessage)) {
// 如果接收到消息時,消息已經可以發送了,直接發送到實際的隊列
sendActualTopic(delayMessage, json);
} else {
// 存儲
localStorage(delayMessage, json);
}
} catch (Throwable e) {
log.error("consumer kafka delay message[{}] error!", json, e);
throw e;
}
return true;
}
private void sendActualTopic(DelayMessage delayMessage, String message) {
kafkaSender.send(message, delayMessage.getActualTopic());
}
@SneakyThrows
private void localStorage(DelayMessage delayMessage, String message) {
String key = generateRdbKey(delayMessage);
if (rocksDb.keyMayExist(RocksDbUtils.toByte(key), null)) {
return;
}
rocksDb.put(RocksDbUtils.toByte(key), RocksDbUtils.toByte(message));
}
private String generateRdbKey(DelayMessage delayMessage) {
return delayMessage.getActualTime().getTime() + RDB_KEY_SPLITTER + delayMessage.getEventId();
}
這里要注意生成key的方法:RocksDB是按key自然排序,迭代器遍歷時是按key順序遍歷。
按時間來生成key,遍歷時遇到第一個不符合的key,即可結束遍歷。
key里加上消息ID,用以去重。
處理存儲的延遲消息
啟動定時任務(ScheduledExecutorService)定時檢查消息。private void handleRdbMessage() {
try {
try (RocksIterator rocksIterator = rocksDb.newIterator()) {
for (rocksIterator.seekToFirst(); rocksIterator.isValid(); rocksIterator.next()) {
String key = "";
String value = "";
try {
byte[] keyByte = rocksIterator.key();
key = RocksDbUtils.toString(keyByte);
if (!isMessageExpired(key)) {
break;
}
value = RocksDbUtils.toString(rocksIterator.value());
DelayMessage delayMessage = JSON.parseObject(value, DelayMessage.class);
sendActualTopic(delayMessage, value);
rocksDb.delete(keyByte);
} catch (NumberFormatException e) {
// 異常key
log.error("handler kafka rocksdb delay message[{}:{}] NumberFormatException error!", key, value, e);
if (StringUtils.isNotBlank(key)) {
rocksDb.delete(RocksDbUtils.toByte(key));
}
} catch (Exception e) {
log.error("handler kafka rocksdb delay message[{}:{}] error!", key, value, e);
}
}
}
} catch (Exception e) {
// 捕獲異常,否則ScheduledExecutorService會停止定時任務
log.error("handler kafka rocksdb delay message error!", e);
}
}
private boolean isMessageExpired(String rdbKey) {
long actualTime = Long.valueOf(rdbKey.split(RDB_KEY_SPLITTER)[0]);
return actualTime <= System.currentTimeMillis();
}
這里sendActualTopic和rocksDb.delete兩個操作并不是原子性,但一般kafka消費端都會做防重復,所以也不會有問題。
其他
當前僅僅簡易實現了延遲隊列,還有很多需要完成完善的地方,比如:當前數據分散到不同的消費節點上,如果某一個節點服務器異常導致數據丟失,就只能人工介入,從kafka文件里獲取數據;可通過部署不同的kafka group來達到數據備份,通過選主方式來決定哪一個group執行業務。
一條消息被存儲三份:實際隊列,延遲隊列,RocksDB,可以通過操作kafka CommitLog的方式,讓RocksDB里僅存儲CommitLog offset 相關信息,減小RocksDB占用空間。
參考:
總結
以上是生活随笔為你收集整理的kafka消息消费有延迟_简易实现kafka延迟消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 利用JAVA流处理-统计男员工人数;找出
- 下一篇: Windows XP 任务计划每1分钟运