kafka处理流式数据_通过Apache Kafka集成流式传输大数据
kafka處理流式數據
從實時過濾和處理大量數據,到將日志數據和度量數據記錄到不同來源的集中處理程序中,Apache Kafka越來越多地集成到各種系統和解決方案中。 使用CData Sync ,可以輕松地將此類解決方案應用于任何CRM,ERP或Analytics軟件。
配置Apache Kafka目標以進行CData同步
在CData Sync中設置Kafka目標非常簡單。 只需提供“服務器”和“端口”,然后復制命令就可以接管—不需要其他配置。 要設置這種連接,請首先導航到“連接”頁面,然后單擊“目標”選項卡,然后選擇“ Kafka”。
指定服務器和端口屬性。 如果啟用了身份驗證,請同時指定“用戶”和“密碼”屬性。 單擊“保存更改”和“測試連接”以保存更改,并確保CData Sync可以連接到Kafka服務器。
其他一些屬性可用,并在“高級”選項卡中進行了分類:
- 啟用冪等:確保郵件僅傳遞一次。 在某些情況下,生產者可能會產生重復的消息。 為了進行驗證,客戶端可以在執行作業后將消耗的結果數與“受影響的記錄”狀態進行比較。
- 序列化格式:指定產生的消息的格式; 可用值為JSON,XML和CSV 。
- 主題:如果指定,則該屬性將替代表名作為復制的目標主題。
增量更新如何工作
CData Sync使無縫更新變得無縫。 服務器不需要任何配置,并且所有必需的屬性都已預先配置,具體取決于Source和Source表。
與SQL Server等其他數據庫工具不同,Kafka不支持可靠的狀態存儲方式。 CData Sync使用本地SQLite數據庫來解決此問題。 它將存儲上次復制表的時間,并使用該時間戳過濾最新記錄。 大多數企業系統都提供一個系統列來指定記錄的最后更新日期,這足以滿足此目的。
例如,QuickBooks Online中的“帳戶”表包含此列。 復制表:
并在修改三個記錄后運行另一個副本:
某些表沒有自動更新的列,該列保存記錄的最后更新日期。 在這種情況下,別無選擇,只能從一開始就完全復制結果。 Kafka提供了附加到消息的時間戳字段,可用于區分較新的結果。
QuickBooks Online中的department表沒有用于指定上次更新時間的列。 復制此表將導致:
在添加兩個新記錄的同時運行復制一次,將產生:
優化查詢
有多種方法可以管理CData Sync生成的消息大小。 可能需要進行優化,具體取決于Kafka服務器的配置,或者值得考慮的只是簡單地提高復制性能。
壓縮類型:指定如何壓縮生成的數據。 可用選項為gzip,lz4,snappy或無。 指定非“ none”以外的壓縮類型將減少消息有效負載。
最大批處理大小:指定在單個請求中發送的最大批處理大小(以字節為單位)。 批處理中充滿了整個消息。 如果批次已等待一段時間,則可以提前發送批次而無需填充。 降低此值可能會降低性能,但是如果生成的消息超過服務器的最大允許消息大小,則可能有必要。
排除列:如果單個記錄本身太大,則轉換功能提供了一種從輸出消息中省略某些列的方法。 這是最常用的聚合列。 要排除列,請導航至作業,然后單擊所需表旁邊的轉換按鈕:
接下來,取消選擇聚合列:
最后,單擊“確定”進行保存。
設置CData Sync來管理數據源
通過使用計劃作業,可以設置完全自動的記錄提取,Kafka消費者可以使用該記錄始終與新條目保持最新。 時間可以根據特定數據集的需要進行調整。
要安排作業,請選擇所需的作業。 在“計劃”部分下,選中“計劃此作業自動運行”框。 最后,選擇適當的間隔。
結論
CData Sync和Apache Kafka可以成為強大的組合。 可以從任何數據源復制到Kafka使用者,以支持從分析到日志記錄的一系列需求。 自動檢測新記錄并安排作業可確保新數據穩定地流向其訂戶。 壓縮,轉換和其他優化可以進一步控制數據格式,量和頻率。 下載CData Sync的30天免費試用版 ,今天就開始將大數據流式傳輸到Apache Kafka!
翻譯自: https://www.javacodegeeks.com/2019/11/stream-big-data-with-apache-kafka-integration.html
kafka處理流式數據
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的kafka处理流式数据_通过Apache Kafka集成流式传输大数据的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: psd 素材怎么用到js里面(用psd素
- 下一篇: ps渐变线条怎么做的(ps渐变线条怎么做