使用datax同步cassandra数据
DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平臺,實現各種異構數據源之間高效的數據同步功能。最近,阿里云cassandra團隊為datax提供了cassandra讀寫插件,進一步豐富了datax支持的數據源,可以很方便實現cassandra之間以及cassandra與其他數據源之間的數據同步。本文簡單介紹如何使用datax同步cassandra的數據,針對幾種常見的場景給出配置文件示例,還提供了提升同步性能的建議和實測的性能數據。
datax快速入門
使用datax同步數據的方法很簡單,一共只需要三步:
1 部署datax。
2 編寫同步作業配置文件。
3 運行datax,等待同步作業完成。
datax的部署和運行都很簡單,可以通過datax官方提供的下載地址下載DataX工具包,下載后解壓至本地某個目錄,進入bin目錄,即可運行同步作業:
同步作業的配置格式可以參考datax文檔。
一個典型的配置文件如下:
一個同步作業的配置文件主要包括兩部分,setting包括任務調度的一些配置,content描述同步任務的內容,里面包含reader插件的配置和writer插件的配置。例如我們需要從mysql同步數據到cassandra,那么我們只需要把reader配置為mysqlreader,writer配置為cassandrawriter,并提供相應的插件配置信息即可。在datax項目頁面上面可以看到datax支持的插件列表,點擊對應的鏈接就可以查看相關插件的文檔了解插件需要的配置內容和格式要求。例如,cassandra插件的文檔可點擊如下鏈接:讀插件?寫插件。
以下列舉幾種常見的場景。
場景一 cassandra之間的數據同步
最常見的場景是把數據從一個集群同步到另一個集群,例如機房整體遷移、上云等。這時需要先手動在目標集群創建好keyspace和表的schema,然后使用datax進行同步。作為例子,下面的配置文件把數據從cassandra的一個表同步到另一個表:
{"job": {"setting": {"speed": {"channel": 3}},"content": [{"reader": {"name": "cassandrareader","parameter": {"host": "localhost","port": 9042,"useSSL": false,"keyspace": "test","table": "datax_src","column": ["id","name"]}},"writer": {"name": "cassandrawriter","parameter": {"host": "localhost","port": 9042,"useSSL": false,"keyspace": "test","table": "datax_dst","column": ["id","name"]}}}]} }場景二 從mysql同步到cassandra
datax支持多種數據源,可以很方便做到cassandra和其他數據源之間的數據同步。下面的配置把數據從mysql同步到cassandra:
{"job": {"setting": {"speed": {"channel": 3}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","column": ["id","name"],"splitPk": "db_id","connection": [{"table": ["table"],"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]}]}},"writer": {"name": "cassandrawriter","parameter": {"host": "localhost","port": 9042,"useSSL": false,"keyspace": "test","table": "datax_dst","column": ["id","name"]}}}]} }場景三 只同步cassandra中的一部分數據
我們在讀插件的配置中提供了where關鍵字,可以用來只同步一部分數據。例如對于時序數據等場景定期同步的情況,就可以通過增加where的條件來實現只同步增量數據。where條件的格式和cql相同,例如"where":"textcol='a'"的作用類似于使用select * from table_name where textcol = 'a'進行查詢。另外還有allowFiltering關鍵字配合where使用,作用和cql中的ALLOW FILTERING關鍵字也是相同的。下面給出一個配置的例子:
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "cassandrareader","parameter": {"host": "localhost","port": 9042,"useSSL": false,"keyspace": "test","table": "datax_src","column": ["deviceId","time","log"],"where":"time > '2019-09-25'","allowFiltering":true}},"writer": {"name": "cassandrawriter","parameter": {"host": "localhost","port": 9042,"useSSL": false,"keyspace": "test","table": "datax_dst","column": ["deviceId","time","log"]}}}]} }提高同步速度
以cassandra之間的數據同步為例。如下這些配置會對數據同步任務的性能產生影響:
(1)并行度
可以通過調大任務的并行度來提高同步速度。這主要通過job.setting.speed.channel從參數來實現。例如下面這個配置的效果是會有10個線程并行執行同步任務。
需要注意的是,cassandra讀插件里面,切分任務是通過在cql語句中增加token范圍條件來實現的,所以只有使用RandomPartitioner和Murmur3Partitioner的集群才能夠正確切分。如果您的集群使用了其他的Partitioner,cassandrareader插件會忽略channel配置,只用一個線程進行同步。
(2)batch
可以通過配置batchSize關鍵字在cassandra寫插件里面使用UNLOGGED batch來提高寫入速度。但是需要注意cassandra中對batch的使用有一些限制,使用這個關鍵字之前建議先閱讀[《簡析Cassandra的BATCH操作》(https://yq.aliyun.com/articles/719784?spm=a2c4e.11155435.0.0.65386b04OYOsvK)一文中關于batch使用限制的內容。
(3)連接池配置
寫插件還提供了連接池相關的配置connectionsPerHost和maxPendingPerConnection。這兩個參數的具體含義可以參考[java driver文檔](https://docs.datastax.com/en/developer/java-driver/3.7/manual/pooling/)。
(4)一致性配置
讀寫插件中都提供了consistancyLevel關鍵字,默認的讀寫一致性級別都是LOCAL_QUORUM。如果您的業務場景里面可以允許兩個集群的數據有少量不一致,也可以考慮不使用默認一致性級別來提高讀寫性能,例如使用ONE級別來讀數據。
性能數據
我們通過一個測試來觀察datax同步數據的性能。
服務端使用阿里云cassandra,源集群和目標集群均為3節點,規格為4CPU 8GB。客戶端使用一臺ECS,規則為4 CPU 16 GB。
首先使用cassandra-stress向源集群寫入500w行數據:
寫入過程的統計數據如下:
然后使用datax將這些數據從源集群同步到目標集群。配置文件如下:
{"job": {"setting": {"speed": {"channel": 10}},"content": [{"reader": {"name": "cassandrareader","parameter": {"host": "<源集群NODE>","port": 9042,"username":"<USER>","password":"<PWD>","useSSL": false,"keyspace": "test","table": "standard1","column": ["key","\"C0\"","\"C1\"","\"C2\"","\"C3\"","\"C4\"","\"C5\"","\"C6\"","\"C7\"","\"C8\"","\"C9\""]}},"writer": {"name": "cassandrawriter","parameter": {"host": "<目標集群NODE>","port": 9042,"username":"<USER>","password":"<PWD>","useSSL": false,"keyspace": "test","table": "standard1","batchSize":6,"column": ["key","\"C0\"","\"C1\"","\"C2\"","\"C3\"","\"C4\"","\"C5\"","\"C6\"","\"C7\"","\"C8\"","\"C9\""]}}}]} }同步過程的統計數據如下:
可見,datax同步數據的性能和cassandra-stress的性能相當,甚至要好一些。
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的使用datax同步cassandra数据的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 新基建来了!5G边缘计算如何展现勃勃生机
- 下一篇: Serverless 实战 —— 快速搭