Kafka的消息格式
Commit Log
Kafka儲存消息的文件被它叫做log,按照Kafka文檔的說法是:
Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log
這反應(yīng)出來的Kafka的行為是:消息被不斷地append到文件末尾,而且消息是不可變的。
這種行為源于Kafka想要實(shí)現(xiàn)的功能:高吞吐量,多副本,消息持久化。這種簡單的log形式的文件結(jié)構(gòu)能夠更好地實(shí)現(xiàn)這些功能,不過也會在其它方面有所欠缺,比如檢索消息的能力。
而Kafka的行為也決定了它的消息格式。對于Kafka來說,消息的主體部分的格式在網(wǎng)絡(luò)傳輸中和磁盤上是一致的,也就是說消息的主體部分可以直接從網(wǎng)絡(luò)讀取的字節(jié)buffer中寫入到文件(部分情況下),也可以直接從文件中copy到網(wǎng)絡(luò),而不需要在程序中再加工,這有利于降低服務(wù)器端的開銷,以及提高IO速度(比如使用zero-copy的傳輸)。
這也就決定了Kafka的消息格式必須是適于被直接append到文件中的。當(dāng)然啥都可以append到文件后面,問題在于怎么從文件中拆分出來一條條記錄。
記錄的劃分以及消息的格式
對于日志來說,一條記錄以"\n"結(jié)尾,或者通過其它特定的分隔符分隔,這樣就可以從文件中拆分出一條一條的記錄,不過這種格式更適用于文本,對于Kafka來說,需要的是二進(jìn)制的格式。所以,Kafka使用了另一種經(jīng)典的格式:在消息前面固定長度的幾個(gè)字節(jié)記錄下這條消息的大小(以byte記),所以Kafka的記錄格式變成了:
Offset MessageSize Message
消息被以這樣格式append到文件里,在讀的時(shí)候通過MessageSize可以確定一條消息的邊界。
需要注意的是,在Kafka的文檔以及源碼中,消息(Message)并不包括它的offset。Kafka的log是由一條一條的記錄構(gòu)成的,Kafka并沒有給這種記錄起個(gè)專門的名字,但是需要記住的是這個(gè)“記錄”并不等于"Message"。Offset MessageSize Message加在一起,構(gòu)成一條記錄。而在Kafka Protocol中,Message具體的格式為
Message => Crc MagicByte Attributes Key Value ??Crc => int32 ??MagicByte => int8 ??Attributes => int8 ??Key => bytes ??Value => bytes各個(gè)部分的含義是
| Attributes | This byte holds metadata attributes about the message. The lowest 2 bits contain the compression codec used for the message. The other bits should be set to 0. |
| Crc | The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer. |
| Key | The key is an optional message key that was used for partition assignment. The key can be null. |
| MagicByte | This is a version id used to allow backwards compatible evolution of the message binary format. The current value is 0. |
| Offset | This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes. |
| Value | The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null. ? |
MessageSet
之所以要強(qiáng)調(diào)記錄與Message的區(qū)別,是為了更好地理解MessageSet的概念。Kafka protocol里對于MessageSet的定義是這樣的
MessageSet => [Offset MessageSize Message] ??Offset => int64 ??MessageSize => int32也就是說MessageSet是由多條記錄組成的,而不是消息,這就決定了一個(gè)MessageSet實(shí)際上不需要借助其它信息就可以從它對應(yīng)的字節(jié)流中切分出消息,而這決定了更重要的性質(zhì):Kafka的壓縮是以MessageSet為單位的。而以MessageSet為單位壓縮,決定了對于壓縮后的MessageSet,不需要在它的外部記錄這個(gè)MessageSet的結(jié)構(gòu),也就決定了Kafka的消息是可以遞歸包含的,也就是前邊"value"字段的說明“Kafka supports recursive messages in which case this may itself contain a message set"。
具體地說,對于Kafka來說,可以對一個(gè)MessageSet做為整體壓縮,把壓縮后得到的字節(jié)數(shù)組作為一條Message的value。于是,Message既可以表示未壓縮的單條消息,也可以表示壓縮后的MessageSet。
壓縮后的消息的讀取
就看Message頭部的Attributes里的壓縮格式標(biāo)識。說到這個(gè),得說下遞歸包含的事情,理論上,一個(gè)壓縮的的MessageSet里的一個(gè)Message可能會是另一個(gè)壓縮后的MessageSet,或者包含更深層的MessageSet。但是實(shí)際上,Kafka中的一個(gè)Message最多只含有一個(gè)MessageSet。從Message中讀取MessageSet的邏輯,可以在ByteBufferMessageSet的internalIterator方法中找到:
if(isShallow) { //是否要進(jìn)行深層迭代new MessageAndOffset(newMessage, offset)} else { //如果要深層迭代的話newMessage.compressionCodec match {case NoCompressionCodec =>innerIter = nullnew MessageAndOffset(newMessage, offset) //如果這個(gè)Message沒有壓縮,就直接把它作為一個(gè)Message返回case _ =>innerIter = ByteBufferMessageSet.deepIterator(newMessage) //如果這個(gè)Message采用了壓縮,就對它進(jìn)行深層迭代if(!innerIter.hasNext)innerIter = nullmakeNext()}}而ByteBufferMessageSet的deepIterator方法就是對這個(gè)Message的value進(jìn)行解壓,然后從中按照Offset MessageSize Message的格式讀取一條條記錄,對于這次讀取的Message,就不再進(jìn)行深層迭代了。下面是deepIterator的makeNext方法,它被不斷調(diào)用以生成迭代器的元素
override def makeNext(): MessageAndOffset = {try {// read the offsetval offset = compressed.readLong()// read record sizeval size = compressed.readInt()if (size < Message.MinHeaderSize)throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator")// read the record into an intermediate record buffer// and hence has to do extra copyval bufferArray = new Array[Byte](size)compressed.readFully(bufferArray, 0, size)val buffer = ByteBuffer.wrap(bufferArray)val newMessage = new Message(buffer)// the decompressed message should not be a wrapper message since we do not allow nested compressionnew MessageAndOffset(newMessage, offset)} catch {case eofe: EOFException =>compressed.close()allDone()case ioe: IOException =>throw new KafkaException(ioe)}}KAFKA-1718
至于一個(gè)MessageSet中不能包含多個(gè)壓縮后的Message(壓縮后的Message也就是以壓縮后的MessageSet作為value的Message),Kafka Protocol中是這么說的
The outer MessageSet should contain only one compressed "Message" (see?KAFKA-1718?for details).
KAFKA-1718就是在Protocol里添加這么一個(gè)特殊說明的原因。事情是這樣的:
報(bào)各這個(gè)問題的人是Go語言client的作者,他發(fā)現(xiàn)自己發(fā)的Message明顯沒有過大,但是發(fā)生了MessageSizeTooLargeException。后來跟其它人討論,發(fā)現(xiàn)是因?yàn)閎roker端在調(diào)用Log.append時(shí),會把傳送給這個(gè)方法的MessageSet解壓開,然后再組合成一個(gè)壓縮后的MessageSet(ByteBufferMessageSet)。而Go語言的客戶端發(fā)送的MessageSet中包含了多個(gè)壓縮后的Message,這樣即使發(fā)送時(shí)的Message不會超過message.max.bytes的限制,但是broker端再次生成的Message就超過了這個(gè)限制。所以,Kafka Protocol對這種情況做了特殊說明:The outer MessageSet should contain only one compressed "Message"。
Compressed Message的offset
即然可以把壓縮后的MessageSet作為Message的value,那么這個(gè)Message的offset該如何設(shè)置呢?
這個(gè)offset的值只有兩種可能:1, 被壓縮的MessageSet里Message的最大offset; 2, 被壓縮的MessageSet里Message的最小offset.
這兩種取值沒有功能的不同,只有效率的不同。
由于FetchRequest協(xié)議中的offset是要求broker提供大于等于這個(gè)offset的消息,因此broker會檢查log,找到符合條件的,然后傳輸出去。那么由于FetchRequest中的offset位置的消息可位于一個(gè)compressed message中,所以broker需要確定一個(gè)compressed Message是否需要被包含在respone中。
- 如果compressed Message的offset是它包含的MessageSet的最小offset。那么,我們對于這個(gè)Message是否應(yīng)包含在response中,無法給出"是”或"否“的回答。比如FetchRequest中指明的開始讀取的offset是14,而一個(gè)compressed Message的offset是13,那么這個(gè)Message中可能包含offset為14的消息,也可能不包含。
- 如果compressed Message的offset是它包含的MessageSet的最大offset,那么,可以根據(jù)這個(gè)offset確定這個(gè)Message“不應(yīng)該”包含在response中。比如FetchRequest中指明的開始讀取的offset是14,那么如果一個(gè)compressed Message的offset是13,那它就不該被包含在response中。而當(dāng)我們順序排除這種不符合條件的Message,就可以找到第一個(gè)應(yīng)該被包含在response中的Message(壓縮或者未壓縮), 從它開始讀取。
在第一種情況下(最小offset),我們盡管可以通過連續(xù)的兩個(gè)Message確定第一個(gè)Message的offset范圍,但是這樣在讀取時(shí)需要在讀取第二個(gè)Message的offset之后跳回到第一個(gè)Message, ?這通常會使得最近一次讀(也就讀第二個(gè)offset)的文件系統(tǒng)的緩存失效。而且邏輯比第二種情況更復(fù)雜。在第二種情況下,broker只需要找到第一個(gè)其offset大于或等于目標(biāo)offset的Message,從它可以讀取即可,而且也通常能利用到文件系統(tǒng)緩存,因?yàn)閛ffset和消息內(nèi)容有可能在同一個(gè)緩存塊中。
在處理FetchRequest時(shí),broker的邏輯也正是如此。對FetchRequest的處理會調(diào)用到Log#read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None)方法,然后調(diào)用到LogSegment的read方法,它的之后的調(diào)用有很多,所有不貼代碼了,它的注釋說明了讀取的邏輯
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified
即,返回的MessageSet的第一條Message的offset >= startOffset。
而在broker給compressed Message賦予offset時(shí),其邏輯也是賦予其包含的messages中的最大offset。這段邏輯在ByteBufferMessageSet的create方法中:
messageWriter.write(codec = compressionCodec) { outputStream =>val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) //創(chuàng)建壓縮流try {for (message <- messages) {offset = offsetCounter.getAndIncrement //offsetCounter是一個(gè)AtomicLong,使用它的當(dāng)前值作為這條Message的offset,然后+1作為下一條消息的offsetoutput.writeLong(offset)//寫入這條日志記錄的offsetoutput.writeInt(message.size)//寫入這條日志記錄的大小output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) //寫入這條記錄的Message }} finally {output.close()}}val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)writeMessage(buffer, messageWriter, offset)//以最后一個(gè)Message的offset作為這個(gè)compressed Message的offsetValidate Message
什么需要驗(yàn)證?
先看一下消息的哪些特征需要被驗(yàn)證。
首先,網(wǎng)絡(luò)傳輸過程中,數(shù)據(jù)可能會產(chǎn)生錯(cuò)誤,即使是寫在磁盤上的消息,也可能會由于磁盤的問題產(chǎn)生錯(cuò)誤。因此,broker對接收到的消息需要驗(yàn)證其完整性。這里的消息就是前邊協(xié)議里定義的Message。對于消息完整性的檢測,是使用CRC32校驗(yàn),但是并不是對消息的所有部分計(jì)算CRC,而是對Message的Crc部分以后的部分,不包括記錄的offset和MessageSize部分。把offset和MessageSize加到CRC計(jì)算中,可以對完整性有更強(qiáng)的估證,但是壞處在于這兩個(gè)部分在消息由producer到達(dá)broker以后,會被broker重寫,因此如果把它們計(jì)算在crc里邊,就需要在broker端重新計(jì)算crc32,這樣會帶來額外的開銷。
CRC32沒有檢測出錯(cuò)誤的概率在0.0047%以下,加上TCP本身也有校驗(yàn)機(jī)制,不能檢測出錯(cuò)誤的概率就很小了(這個(gè)還需要再仔細(xì)算一下)。
除了消息的完整性,還需要對消息的合規(guī)性進(jìn)行檢驗(yàn),主要是檢驗(yàn)offset是否是單調(diào)增長的,以及MessageSize是超過了最大值。
這里檢驗(yàn)時(shí)使用的MessageSize就不是Message本身的大小了,而是一個(gè)記錄的大小,包括offset和MessageSize,這個(gè)也挺奇怪的,有必要非拉上這倆嗎?
而且在broker端檢驗(yàn)producer發(fā)來的MessageSet時(shí),也沒必要檢驗(yàn)它的offset是否是單調(diào)增長的呀,畢竟leader還要對Message的offset重新賦值。而follower是從leader處拉取的,如果網(wǎng)絡(luò)或者磁盤出錯(cuò),通過對offset的單調(diào)性檢查也可能會漏掉出錯(cuò)了的記錄,對于consumer來說也是同理。所以這里有點(diǎn)奇怪。
何時(shí)需要驗(yàn)證?
在broker把收到的producer request里的MessageSet append到Log之前,以及consumer和follower獲取消息之后,都需要進(jìn)行校驗(yàn)。
這種情況分成兩種:
1. broker和consumer把收到的消息append到log之前
2. consumser收到消息后
第一種情況都是在調(diào)用Log#append時(shí)進(jìn)行檢驗(yàn)的。
如何驗(yàn)證?
先看下Log#append的方法聲明
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo在replica的fetcher線程調(diào)用append方法時(shí),會把a(bǔ)ssignOffsets設(shè)成false,而leader處理produce request時(shí),會把a(bǔ)ssignOffsets設(shè)成true。
下面append方法的一部分代碼
val appendInfo = analyzeAndValidateMessageSet(messages) //驗(yàn)證消息// if we have any valid messages, append them to the logif(appendInfo.shallowCount == 0)return appendInfo// trim any invalid bytes or partial messages before appending it to the on-disk logvar validMessages = trimInvalidBytes(messages, appendInfo)//trim掉不可用的部分或者殘缺的消息try {// they are valid, insert them in the loglock synchronized {appendInfo.firstOffset = nextOffsetMetadata.messageOffset if(assignOffsets) { //如果需要重新賦予offset// assign offsets to the message setval offset = new AtomicLong(nextOffsetMetadata.messageOffset)try {validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact) //驗(yàn)證消息并且賦予offset} catch {case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)}appendInfo.lastOffset = offset.get - 1} else {// we are taking the offsets we are givenif(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)throw new IllegalArgumentException("Out of order offsets found in " + messages)}// re-validate message sizes since after re-compression some may exceed the limit 對壓縮后消息重新驗(yàn)證MessageSize是否超過了允許的最大值for(messageAndOffset <- validMessages.shallowIterator) {if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {// we record the original message set size instead of trimmed size// to be consistent with pre-compression bytesRejectedRate recording BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))}}注意到對MessageSize驗(yàn)證了兩次,第二次是對重新壓縮后的消息。KAFKA-1718里提到MessageSizeToLargeException,就是在這時(shí)候檢測出來的。
初步檢驗(yàn):analyzeAndValidateMessageSet
具體的檢驗(yàn)消息完整性和offset單調(diào)增長的邏輯在analyzeAndValidateMessageSet方法里。這個(gè)方法的實(shí)現(xiàn)里,需要注意幾點(diǎn):
?
?
config.compressionType就是broker配置里的compression.type的值,如果它是“producer", 就會使用producer request使用壓縮方式,否則就使用config.compressionType指明的壓縮方式。注意如果一個(gè)MessageSet里的Message采用了不同的壓縮方式,最后被當(dāng)成sourceCodec的是最后一個(gè)壓縮了的消息的壓縮方式。
再次檢驗(yàn)并且賦予offset :validateMessagesAndAssignOffsets
只有l(wèi)eader處理produce request時(shí),會調(diào)用ByteBufferMessageSet的這個(gè)方法。 它不會檢測analyzeAndValidateMessageSet已經(jīng)檢測的內(nèi)容,但是會把這個(gè)MessageSet進(jìn)行深度遍歷(即如果它里邊的消息是壓縮后,就把這個(gè)消息解壓開再遍歷),這樣它就能做analyzeAndValidateMessageSet不能進(jìn)行的檢測:對于compacted topic檢測其key是否為空,如果為空就拋出InvalidMessageException。
另外,它會把深度遍歷后獲得的Message放在一起重新壓縮。
如果MessageSet的尾部不是完整的Message呢?
這是在獲取ByteBufferMessageSet的iternalIterator時(shí)候處理的。
def makeNextOuter: MessageAndOffset = {// if there isn't at least an offset and size, we are doneif (topIter.remaining < 12)return allDone()val offset = topIter.getLong()val size = topIter.getInt()if(size < Message.MinHeaderSize)throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator")// we have an incomplete messageif(topIter.remaining < size)return allDone(). ...}注意返回allDone()和拋出InvalidMessageException的時(shí)機(jī)。
- 如果這個(gè)MessageSet剩下部分不到12bytes,那剩下的部分就是下一個(gè)MessageSet頭部的一部分,是沒法處理的,也是沒辦法檢驗(yàn)的,因此就返回allDone。
- 如果夠12bytes,就可以讀出offset和MessageSize。MessageSize至少會大于Message頭里邊的那些crc、Attributes, MagicBytes等加起來的大小,因此如果MessageSize比這個(gè)還小,就肯定是個(gè)entry有問題,所以就拋出異常。這里的問題在于,即使MessageSet最后的那個(gè)Message是不完整的,只要MessageSize有問題,也會拋異常,而不是忽略這個(gè)不完整的Message。(這個(gè)可能是沒考慮到,也可能是有別的考慮,不過無論怎么處理最后的這個(gè)不完整的Message,都有一定的道理)。
?consumer端的驗(yàn)證
consumer(0.9)會檢查checksum,不過是可以配置的,原因正如config里說的一樣。
public static final String CHECK_CRCS_CONFIG = "check.crcs";private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";config的文檔說,檢查checksum是為了"ensures no on-the-wire or on-disk corruption to the message occurred."即,為了保證沒有在網(wǎng)絡(luò)傳輸出或者磁盤存儲時(shí)出現(xiàn)了消息的損壞。但是checksum計(jì)算時(shí)會帶來開銷,所以追求最佳性能,可以關(guān)掉checksum的檢查。
?
下面來看一下幾個(gè)與消息格式相關(guān)的KIP。為什么需要這些改變呢?為什么之前沒有實(shí)現(xiàn)這些改變呢?都是因?yàn)楦鞣N折衷吧,需求與性能折衷,需求與實(shí)現(xiàn)所需的工作量的折衷……
下面的幾個(gè)KIP可能會一起加上去,畢竟都是對消息格式的修改,不能搞沖突了。
KIP-31 - Move to relative offsets in compressed message sets
前邊提到了,在leader收到ProduceRequet之后,它會解壓開compressed message(也就是是這個(gè)KIP里的compressed messageset,這兩說說法的確有些亂),然后給里邊包含的message set的每條消息重新賦予offset。這個(gè)做法也是應(yīng)該的,乍一看也沒什么不好。但是問題在于,不僅是直接改個(gè)offset這么簡單,在改完之后,需要重新壓縮這些消息,還要計(jì)算。這么一搞,開銷就大了。KIP-31就是想把這部分的性能損失降下來。(這個(gè)KIP已經(jīng)是accepted狀態(tài))
做法是把在一個(gè)compressed message set里邊的每個(gè)message的offset里記下當(dāng)前message相對于外層的wrapper message的偏移。用漢語說這個(gè)意思比較費(fèi)勁,KIP里這么說
When the producer compresses a message, write the relative offset value in the raw message's offset field. Leave the wrapped message's offset blank.
When broker receives a compressed message, it only needs to?
注意,這個(gè)wrapper message里記的base offset, 是它所含的message set里的最后一個(gè)message的offset。這個(gè)和當(dāng)前的compressed message的offset是一致的。
然后當(dāng)broker收到一個(gè)壓縮后的消息時(shí),它只需要
- 驗(yàn)證CRC與realtive offset的正確性
- 重新設(shè)定外層消息的offset,也就是base offset。
KIP-32 - Add timestamps to Kafka message
在消息里加時(shí)間戳。需要注意的是,這個(gè)KIP還在討論中(以下的內(nèi)容是基于2016年1月7日的版本)。不像上一個(gè)已經(jīng)確定了。
(俺是覺得這個(gè)事情早該做了……)
首先,來看一下動機(jī),這個(gè)提有意思
Motivation
This KIP tries to address the following issues in Kafka.
說的是這幾個(gè)原因
1. Log retention會不靠譜。當(dāng)前l(fā)og retention是在log segment層面做的,是按照log segment的最后修改時(shí)間確定是否要刪除一個(gè)log segment. 但是,當(dāng)replica重分配發(fā)生時(shí),新被分配的這個(gè)replica的log segment的修改時(shí)間會被設(shè)成當(dāng)前時(shí)間。這么一來,它就不能被按照log retention想要做的那樣(實(shí)際上是想把一段時(shí)間之前的消息刪除)被刪除。
2. 由于和1同樣的原因,對于一個(gè)新創(chuàng)建的replica(意思應(yīng)該是移動位置的replica, 并不是增加分區(qū)后新加的replica)log rolling有時(shí)候也會不靠譜。
3. 有些場景中需要消息含有時(shí)間戳,比如流處理。
感覺,貌似第三個(gè)原因才是決定性的,擁抱流處理。
接口的變化
準(zhǔn)備在Message里加入timestamp字段
準(zhǔn)備增加兩個(gè)配置
- message.timestamp.type 可以選CreateTime或者LogAppendTime,CreateTime就是這條消息生成的時(shí)間,是在producer端指定的。LogAppendTime就是append到log的時(shí)間(實(shí)現(xiàn)細(xì)節(jié)沒有說明)。
- max.message.time.difference.ms 如果選擇了CreateTime, 那么只有當(dāng)createTime和broker的本地時(shí)間相差在這個(gè)配置指定的差距之內(nèi),broker才會接受這條消息。
糾結(jié)之處
之前關(guān)于這個(gè)KIP的討論主要是關(guān)于使用哪個(gè)時(shí)間, 是使用LogAppendTime(broker time),還是CreateTime(application time)。
兩種都有利有弊:
The good things about LogAppendTime are: 使用LogAppendTime的好處在于
The good things about CreateTime are: 使用CreateTime的好處是
在俺看來,這兩個(gè)選擇的確挺糾結(jié)的。用戶肯定是想用自己產(chǎn)生消息的時(shí)間,不然很難準(zhǔn)確地找到一條消息。但是,如果使用用戶指定的時(shí)間,broker端的行為就變得復(fù)雜了,比如,如果用戶指定的時(shí)間不是單調(diào)遞增的,該怎么建時(shí)間索引。但是用戶產(chǎn)生畸形的時(shí)間,倒可以通過配置里max.message.time.difference.ms來控制。或許可以加另一個(gè)配置,允許broker在一定范圍內(nèi)修改CreateTime,比如最多可以更改1000ms。這樣就能即使消息的timestamp單調(diào)增長,也能使用戶對消息的時(shí)間的估計(jì)比較準(zhǔn)確。不過,這樣可能就需要讓broker time的含義變成broker收到消息時(shí)間,而不是append到log的時(shí)間。否則就難以確定何時(shí)該拒絕無法在指定范圍內(nèi)修改timestamp的消息。
?
KIP-33 - Add a time based log index
動機(jī):
當(dāng)前按照時(shí)間戳查找offset得到的結(jié)果是非常粗粒度的,只能在log segment的級別。(對于reassigned replica就差得沒譜了。)所以這個(gè)KIP提議建一個(gè)基于時(shí)間的對日志的索引,來允許按timestamp搜索消息的結(jié)果更準(zhǔn)確。
這個(gè)KIP和KIP-32是緊密相關(guān)的。這倆KIP都在討論過程中。
?
轉(zhuǎn)載于:https://www.cnblogs.com/devos/p/5100611.html
總結(jié)
以上是生活随笔為你收集整理的Kafka的消息格式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Spring】Spring学习笔记-0
- 下一篇: GO 输出字符数同时输出这个字符串的字节