kettle增加字段报错_【实战】使用 Kettle 工具将 mysql 数据增量导入到 MongoDB 中
每一個成功人士的背后,必定曾經(jīng)做出過勇敢而又孤獨的決定。
放棄不難,但堅持很酷~
最近有一個將 mysql 數(shù)據(jù)導(dǎo)入到 MongoDB 中的需求,打算使用 Kettle 工具實現(xiàn)。本文章記錄了數(shù)據(jù)導(dǎo)入從 0 到 1 的過程,最終實現(xiàn)了每秒鐘快速導(dǎo)入約 1200 條數(shù)據(jù)。一起來看吧~
一、Kettle 連接圖
簡單說下該轉(zhuǎn)換流程,增量導(dǎo)入數(shù)據(jù):
1)根據(jù) source 和 db 字段來獲取 MongoDB 集合內(nèi) business_time 最大值。
2)設(shè)置 mysql 語句
3)對查詢的字段進行改名
4)過濾數(shù)據(jù):只往 MongoDB 里面導(dǎo)入 person_id,address,business_time 字段均不為空的數(shù)據(jù)。
符合過濾條件的數(shù)據(jù),增加常量,并將其導(dǎo)入到 mongoDB 中。
不符合過濾條件的數(shù)據(jù),增加常量,將其導(dǎo)入到 Excel 表中記錄。
二、流程組件解析
1、MongoDB input
1)Configure connection
Host name(s) or IP address(es):網(wǎng)絡(luò)名稱或者地址。可以輸入多個主機名或IP地址,用逗號分隔。還可以通過將主機名和端口號與冒號分隔開,為每個主機名指定不同的端口號,并將主機名和端口號的組合與逗號分隔開。例如,要為兩個不同的MongoDB實例包含主機名和端口號,您將輸入localhost 1:27017,localhost 2:27018,并使 Port 字段為空。
Port:端口號
Username:用戶名
Password:密碼
Authenticate using Kerberos:指示是否使用Kerberos服務(wù)來管理身份驗證過程。
Connection timeout:連接超時時間(毫秒)
Socket timeout:等待寫操作(以毫秒為單位)的時間
2)Input options
Database:檢索數(shù)據(jù)的數(shù)據(jù)庫的名稱。點擊 “Get DBs” 按鈕以獲取數(shù)據(jù)庫列表。
Collection:集合名稱。點擊 “Get collections” 按鈕獲取集合列表。
Read preference:表示要先讀取哪個節(jié)點。
Tag set specification/#/Tag Set:標簽允許您自定義寫關(guān)注和讀取副本的首選項。
3)query
根據(jù) source 和 db 字段來獲取 bussiness_time 的最大值,Kettle 的 MongoDB 查詢語句如下圖所示:
對應(yīng)的 MongDB 的寫法為:
記得勾選 Query is aggregation pipeline 選項:
4)Fields
取消選中 Output single JSON field ,表示下一組件接收到的結(jié)果是一個 Number 類型的單值,否則就是一個 json 對象。
2、表輸入
設(shè)置 mysql 數(shù)據(jù)庫 jdbc 連接后,填好 SQL 語句之后,在下方的“從步驟插入數(shù)據(jù)”下拉列表中,選中“MongoDB input”。“MongoDB input” 中的變量,在 SQL 語句中用 ? 表示,如下圖所示:
如果導(dǎo)數(shù)的時候發(fā)生中文亂碼,可以點擊 編輯 ,選擇 數(shù)據(jù)庫連接 的 選項,添加配置項:characterEncoding utf8,即可解決。如下圖所示:
3、字段選擇
如果查詢出來的列名需要更改,則可以使用“字段選擇”組件,該組件還可以移除某字段,本次應(yīng)用中,主要使用該組件將字段名進行修改。如下圖所示:
4、過濾選擇
只保留 person_id,address,business_time 字段都不為空的數(shù)據(jù):
5、增加常量
很簡單,在“增加常量”組件內(nèi)設(shè)置好要增加常量的類型和值即可。
6、Excel 輸出
添加“Excel 輸出”,設(shè)置好文件名,如果有必要的話還可以設(shè)置 Excel 字段格式,如下圖所示:
7、MongoDB output
1)Configure connection
如下圖所示,由于一開始就介紹了 MongoDB 的連接方式,所以在這里不在贅述。
2)Output options
Batch insert size:每次批量插入的條數(shù)。
Truncate collection:執(zhí)行操作前先清空集合
Update:更新數(shù)據(jù)
Upsert:選擇 Upsert 選項將寫入模式從 insert 更改為 upsert(即:如果找到匹配項則更新,否則插入新記錄)。使用前提是 勾選 Update 選項。
Muli-update:多次更新,可以更新所有匹配的文檔,而不僅僅是第一個。
3)Mongo document fields
根據(jù) id、source、db 字段插入更新數(shù)據(jù),如下圖所示:
更多 MongoDB output 可參考:
https://wiki.pentaho.com/display/EAI/MongoDB+Output
三、索引優(yōu)化
1、mysql
為 mysql 查詢字段添加索引。(略)
2、MongoDB
對 MongoDB 查詢做優(yōu)化,創(chuàng)建復(fù)合索引:
對于 MongoDB input 組件來說,會關(guān)聯(lián)查詢出 business_time 最大值,所以要創(chuàng)建復(fù)合索引,創(chuàng)建復(fù)合索引時要注意字段順序,按照查詢順序創(chuàng)建:
db.trajectory_data.createIndex({source:?1,?db:?1,?business_time:?1})對于 MongoDB output 組件來說,因為已經(jīng)設(shè)置了 插入或更新 數(shù)據(jù)的規(guī)則,也會涉及到查詢,所以再設(shè)置一個復(fù)合索引:
db.trajectory_data.createIndex({id:?1,?source:?1,?db:?1})四、運行
運行前,需要在集合內(nèi)插入一條含 business_time 字段的 demo 數(shù)據(jù),否則 MongoDB input 會因為查不到數(shù)據(jù)而報錯:
db.trajectory_data.insert({????id:?0,
????source:?'xx數(shù)據(jù)',
????db:?"17-db2",
????business_time:?0
})
成功插入數(shù)據(jù)后,執(zhí)行該轉(zhuǎn)換:
可視化操作
命令行操作:${KETTLE_HOME}/pan.sh -file=xxx.ktr
可通過點擊 “執(zhí)行結(jié)果” --> “步驟度量” 來查看各組件運行狀態(tài),如下圖所示:
24 分鐘共導(dǎo)了 172 萬的數(shù)據(jù),每秒鐘約導(dǎo)入 1200 條數(shù)據(jù)。
這樣子,這個轉(zhuǎn)換基本就算完成了。可以在 linux 上寫一個定時任務(wù)去執(zhí)行這個轉(zhuǎn)換,每次轉(zhuǎn)換 mysql 都會將大于 mongoDB 集合中 business_time 字段最大值的數(shù)據(jù)增量導(dǎo)入到 MongoDB 中。
五、不足
像上述的 Kettle 流程也是有不足的。假如一次性拉取的數(shù)據(jù)量過大,很有可能導(dǎo)致 Mysql 或 Kettle 內(nèi)存溢出而報錯。所以上述流程只適合小數(shù)據(jù)量導(dǎo)入。大數(shù)據(jù)量導(dǎo)入的話還是建議分批次導(dǎo)入或者分頁導(dǎo)入,大家可以關(guān)注我,我會持續(xù)更新技術(shù)干貨哦?~
?熱 文?推 薦???【實戰(zhàn)】Kettle自定義jar包供JavaScript使用??HBase原理(一):架構(gòu)理解??Kafka消費者 之 指定位移消費??都快2020年了,ambari自定義服務(wù)集成,你還沒掌握嗎?文末有福利?Ambari2.6.1集成Apache Kylin服務(wù)?Elasticsearch 6.x 配置詳解?看完您如果還不明白 Kerberos 原理,算我輸!??用心整理 | Spring AOP 干貨文章,圖文并茂,附帶 AOP 示例 ~?Spring IOC,看完這篇文章,我才算是懂了!
歡迎大家留言討論
? ? ?你點的每個“在看”,我都認真當成了喜歡總結(jié)
以上是生活随笔為你收集整理的kettle增加字段报错_【实战】使用 Kettle 工具将 mysql 数据增量导入到 MongoDB 中的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: B站宣布与杰威尔达成版权合作:周杰伦新专
- 下一篇: ati2plab.exe是什么进程 at