理解 Paimon changelog producer
介紹
目的
Chaneglog producer 的主要目的是為了在 Paimon 表上產生流讀的 changelog, 所以如果只是批讀的表是可以不用設置 Chaneglog producer 的.
一般對于數據庫如 MySQL 來說, 當執行的語句涉及數據的修改例如插入、更新、刪除時,MySQL 會將這些數據變動記錄在 binlog 中。相當于額外記錄一份操作日志, 類似于 Paimon 中的 input changelog producer 的模式
存儲形式
Chaneglog 一般是以單獨的 changelog 文件的形式存儲的, 也是在 snapshot commit 期間提交的. 在每次 Snapshot 的元數據中就會記錄 changelogManifestList. 因此在 Snapshot 過期時, 也會一起過期.
Changelog producer 有四種模式, 分別是 None, input, lookup, full comapction. 一般來說, 是要以盡可能低的代價生成 Changelog. 這四種的生成代價是由低到高的.
四種模式
None
不查找舊值, 不額外寫Chaneglog
默認就是 none, 這種模式下在 Paimon 側不會額外存儲數據. Source 讀取的時候, 就是將 snapshot 的 delta list 文件讀取出來, 就是本次 Snapshot 的增量 Changelog 了.
那么在這種模式下, 對于一個主鍵寫入兩條 INSERT 數據, 批式查詢讀出來是一個合并后的值, 流式查詢應該讀出來是兩個 INSERT 數據, 實際上這個 changelog 是不對的, 應該讀取第二條的時間應該是 -U +U 才對.
驗證
CREATE TABLE T (
a INT
,b INT
,c STRING
,PRIMARY KEY (a) NOT ENFORCED
)
WITH (
'merge-engine' = 'deduplicate'
,'changelog-producer' = 'none'
,'continuous.discovery-interval' = '1s' -- 調低discovery-interval
);
BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * FROM T");
sql("INSERT INTO T VALUES(1, 1, '1')");
// 兩次插入之間間隔2s, 這樣source可以讀取到兩次snapshot的數據
Thread.sleep(2000);
sql("INSERT INTO T VALUES(1, 1, '2')");
assertThat(iterator.collect(3))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, 1, "1"),
Row.ofKind(RowKind.INSERT, 2, 2, "2"));
測試流轉
// 第一次commit
Successfully commit snapshot #1 (path /warehouse/default.db/T/snapshot/snapshot-1) by user 6434ee5c-ad2e-4564-a32c-568104392533 with identifier 9223372036854775807 and kind APPEND.
// 掃描到第一個snapshot
start snapshotId: 1
// 第二次commit
Successfully commit snapshot #2 (path /warehouse/default.db/T/snapshot/snapshot-2) by user ce0b10c0-e63f-4db0-ab90-1c542e832791 with identifier 9223372036854775807 and kind APPEND.
// 掃描到delta文件
scan with delta 2
// 輸出數據
[+I[1, 1, 1]]
[+I[1, 1, 1], -U[1, 1, 1]]
[+I[1, 1, 1], -U[1, 1, 1], +U[1, 1, 2]]
ChangelogNormalize
可以看到流讀的輸出產生了正確的 changelog, 但是實際上 none 模式讀取的時候是沒有這個 -U. 具體可以通過 debug ValueContentRowDataRecordIterator 來查看真實讀取的數據. 那這個 changelog 消息從哪里來呢 ? 實際上這個流讀任務會產生 ChangelogNormalize 算子.
if (
isUpsertSource(resolvedSchema, table.tableSource) ||
isSourceChangeEventsDuplicate(resolvedSchema, table.tableSource, config)
) {
// generate changelog normalize node
// primary key has been validated in CatalogSourceTable
val primaryKey = resolvedSchema.getPrimaryKey.get()
val keyFields = primaryKey.getColumns
val inputFieldNames = newScan.getRowType.getFieldNames
val primaryKeyIndices = ScanUtil.getPrimaryKeyIndices(inputFieldNames, keyFields)
val requiredDistribution = FlinkRelDistribution.hash(primaryKeyIndices, requireStrict = true)
// 給source添加pk shuffle
val requiredTraitSet = rel.getCluster.getPlanner
.emptyTraitSet()
.replace(requiredDistribution)
.replace(FlinkConventions.STREAM_PHYSICAL)
val newInput: RelNode = RelOptRule.convert(newScan, requiredTraitSet)
// 本質上就是按照 PK進行last row計算, 用于生成PK的changelog
new StreamPhysicalChangelogNormalize(
scan.getCluster,
traitSet,
newInput,
primaryKeyIndices,
table.contextResolvedTable
)
}
// 表示source是upsert的source
public static boolean isUpsertSource(
ResolvedSchema resolvedSchema, DynamicTableSource tableSource) {
if (!(tableSource instanceof ScanTableSource)) {
return false;
}
ChangelogMode mode = ((ScanTableSource) tableSource).getChangelogMode();
boolean isUpsertMode =
mode.contains(RowKind.UPDATE_AFTER) && !mode.contains(RowKind.UPDATE_BEFORE);
boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();
// 只發送update_after, 不發送update_before, 并且設置了pk
return isUpsertMode && hasPrimaryKey;
}
可以看到在這種模式下, 默認下游流讀的時候是會生成 ChangelogNormalize 算子的, 類似于一個 Last Row 的算子, 實際上就是每條 input 流入的時候, 因為插件告訴 Planner, 我這個 source 只能產生 Upsert 消息(Insert, Update_after, Delete) , 所以下游通過 Normalize 節點自己來生成 Changelog.
所以 none 模式其實本身發送的 changlog 確實是不全的, 但是通過下游 changelog normalize 補足了這個 Changelog. 所以類似于 MySQL 中 binlog 生成的行為, 他其實也是存在查找前鏡像的過程的, 只不過將查找的過程放到了下游的流任務中.
當下游不依賴完整的 Chaneglog, 比如下游也是個同步, 那么下游任務是可以通過參數 scan.remove-normalize 來移除 Normalize 的, 通過偽造 ChangelogMode 為 all 來繞過.
但是這里其實還有一個問題, 下游的 ChaneglogNormalize 節點是有 ttl 的, 假如我某個 key 更新是在 ttl 之后到來, 那么可能導致第二條 Insert/update_after 到來的時候又被當做一條 insert 消息下發, 其實會有數據不準確的問題存在的.
DeltaFollowUpScanner
流式讀取的時候會分為兩個部分, 歷史 + 增量. 有一些模式是不需要讀歷史數據的, 但是增量部分一般都是要讀的. 歷史部分是讀取的某個時刻的快照. 而增量的數據是讀取的 CommitKind 為 Append 的 snapshot 所對應的 delta list. 所以其實這種流讀模式下, delta scanner 只會讀取 L0 的文件.
input
不查找舊值, 額外寫Chaneglog
寫數據過程中雙寫一份文件, 作為 Changelog.
理論上來說這種模式應該是很輕量的一種了, 因為首先額外的一份存儲是都省不了的, 在 None 模式中,雖然在 Paimon 側沒有占用額外的存儲, 但是在下游的流任務的狀態中, 其實是有一份全量表的額外存儲的開銷的. 所以如果 input 模式不考慮存儲開銷, 計算開銷已經是最低了, 因為這種模式不查找舊值.
也因此, 這種模式解決不了的一個問題是, 如果我的輸入源就是沒有完整 Changelog 的, 比如我從一份有重復數據的離線表導入 Paimon, 那么即使雙寫一份數據作為 Changelog, 這份 Changelog 也是不對的, 里面可能存在同一個主鍵的重復數據.
這種模式對于 CDC 的數據源是適用的. 那 None 模式對于 cdc 的數據源是否適用呢 ? 其實是不適用的, 上面我們提到 None 模式的流讀其實就是讀取 L0的文件, 那么我們只要看 L0的文件是否包含 Key 的變更記錄. 因為 write buffer 會有合并的邏輯, 所以, 對于 CDC 的數據, L0中可能會是已經在內存合并后的數據. 比如同一個 key 的-U 和+U 消息, 同時寫入, 那么在 writer buffer 寫入的時候就已經只保留+U 消息了, 所以 None 模式中 L0文件中的數據, 可能已經是合并后的數據, 對于 CDC 的數據也不適用.
那么是不是可以在內存中不進行合并, L0寫入之后在后續 compact 的時候才進行合并, 這樣 None 模式就可以替換 input 的功能, 這樣不引入額外雙寫的代價, 也不用額外查找, 就可以保留上游 cdc 數據的完整 Change log.
lookup
查找舊值, 額外存儲Chaneglog
如果不是 CDC 的數據源, 或者此 Paimon 表本身在寫入的過程中還有計算邏輯(如 partial-update/aggregation), 那么以上兩種模式都不能生成正確的 Changelog.
lookup 的做法, 如其名字, 就是在 compaction 的過程中, 會去向高層查找本次新增 key 的舊值, 如果沒有查找到, 那么本次的就是新增 key, 如果有查找到, 那么就生成完整的 UB 和 UA 消息.
LookupCompaction
如何保證本次寫入的數據一定能夠產生的 Chaneglog. 首先按照 Universal Compaction策略挑選文件參與本次 compaction. 如果沒有挑選到, 那么會通過 LookupCompaction 策略來挑選, 這里其實隱含了, 如果 Universal Compaction 產生了 Compaction Unit, 一定包含所有的 L0文件.
通過 LookupCompaction 策略會將 L0 文件進行 Compaction.
LookupMergeFunction
在 Compaction rewrite 的過程中, 會將相同 key 的數據喂給 LookupMergeFunction
public KeyValue getResult() {
// 1. Find the latest high level record
Iterator<KeyValue> descending = candidates.descendingIterator();
while (descending.hasNext()) {
KeyValue kv = descending.next();
if (kv.level() > 0) {
if (highLevel != null) {
descending.remove();
} else {
highLevel = kv;
}
} else {
containLevel0 = true;
}
}
// 2. Do the merge for inputs
mergeFunction.reset();
candidates.forEach(mergeFunction::add);
return mergeFunction.getResult();
}
- candidates 存儲的相同 key 的多個 SortedRun 的數據
- 插入順序是 sequence number 的增序.
- 對于非 L0 的 kv, sequence 越大, level 越小. 因此 candidates 中的 level 是遞減的, 最后的一部分是 L0的. 可以參見一部分
LookupChangelogMergeFunctionWrapperTest
- 對于非 L0 的 kv, sequence 越大, level 越小. 因此 candidates 中的 level 是遞減的, 最后的一部分是 L0的. 可以參見一部分
- 按照
candidates倒序查找就是, 找到最近的 highlevel 的 value
LookupChangelogMergeFunctionWrapper
public ChangelogResult getResult() {
reusedResult.reset();
KeyValue result = mergeFunction.getResult();
if (result == null) {
return reusedResult;
}
KeyValue highLevel = mergeFunction.highLevel;
boolean containLevel0 = mergeFunction.containLevel0;
// 1. No level 0, just return
// 1. No level 0, just return
// 沒有level 0的數據, 意味著沒有新數據產生
// 那么沒有changelog文件產生, 只是高層文件的合并
if (!containLevel0) {
return reusedResult.setResult(result);
}
// 2. With level 0, with the latest high level, return changelog
// 出現了highlevel的value, 很幸運, 這樣直接就可以得出change log了.
if (highLevel != null) {
// For first row, we should just return old value. And produce no changelog.
setChangelog(highLevel, result);
return reusedResult.setResult(result);
}
// 3. Lookup to find the latest high level record
// 向更高level中查找這個key先前的數據, 為了產生變更流代價還是挺高的
// org.apache.paimon.mergetree.LookupLevels#lookup
highLevel = lookup.apply(result.key());
if (highLevel != null) {
// 找到了更高level的數據, 那么別浪費這個結果, 可以再次進行合并, 得到一個更新的值, 并生成UB和UA消息
mergeFunction2.reset();
mergeFunction2.add(highLevel);
mergeFunction2.add(result);
result = mergeFunction2.getResult();
setChangelog(highLevel, result);
} else {
// 沒有找到更高level的數據, 那么Changelog就是一條insert
setChangelog(null, result);
}
return reusedResult.setResult(result);
}
根據 LookupMergeFunction#getResult 得到的 containLevel0 和 highLevel 的信息, 以及高層 Lookup 完成 Change log 的生成. 在 Lookup 的過程中需要進行文件的二分查找, 以及 Lookup file 的索引文件構建, 整體代價還是比較高的.
full compaction
查找舊值, 額外存儲 Chaneglog
這種模式下一般通過設置 full-compaction.delta-commits 定期進行 full compact, 因為 full compact 其實代價是比較高的. 所以這種模式整體的開銷也是比較大的. 但是在 full compact 的過程中, 其實數據都會被寫到最高層, 所以所有 value 的變化都是可以推演出來的.
FullChangelogMergeFunctionWrapper
public ChangelogResult getResult() {
reusedResult.reset();
if (isInitialized) {
KeyValue merged = mergeFunction.getResult();
// 沒有topLevel
if (topLevelKv == null) {
// merged結果為ADD消息, 那么產生insert的消息. 如果merge完是一條DELETE消息, 相當于這條消息的Changelog還沒有下發就已經刪除了, 所以這個Changelog就不下發了.
if (merged != null && isAdd(merged)) {
reusedResult.addChangelog(replace(reusedAfter, RowKind.INSERT, merged));
}
} else {
// 有topLevel的數據, merged結果為空或者為DELETE消息, 那么產生UB和UA消息
if (merged == null || !isAdd(merged)) {
reusedResult.addChangelog(replace(reusedBefore, RowKind.DELETE, topLevelKv));
} else if (!changelogRowDeduplicate
|| !valueEqualiser.equals(topLevelKv.value(), merged.value())) {
reusedResult
.addChangelog(replace(reusedBefore, RowKind.UPDATE_BEFORE, topLevelKv))
.addChangelog(replace(reusedAfter, RowKind.UPDATE_AFTER, merged));
}
}
return reusedResult.setResultIfNotRetract(merged);
} else {
// 只有一個value, 并且這個value不在topLevel, 那么就是本次新的Changelog, 置為 insert 數據.
if (topLevelKv == null && isAdd(initialKv)) {
reusedResult.addChangelog(replace(reusedAfter, RowKind.INSERT, initialKv));
}
// either topLevelKv is not null, but there is only one kv,
// so topLevelKv must be the only kv, which means there is no change
//
// or initialKv is not an ADD kv, so no new key is added
return reusedResult.setResultIfNotRetract(initialKv);
}
}
參考
https://paimon.apache.org/docs/master/concepts/primary-key-table/#changelog-producers
總結
以上是生活随笔為你收集整理的理解 Paimon changelog producer的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OS X技巧: 触控板手势对链接进行快速
- 下一篇: 打造一个极度舒适的Chrome扩展项目开