mysql增量同步kafka_MySQL数据实时增量同步到Kafka - Flume
寫在前面的話
需求,將MySQL里的數(shù)據(jù)實時增量同步到Kafka。接到活兒的時候,第一個想法就是通過讀取MySQL的binlog日志,將數(shù)據(jù)寫到Kafka。不過對比了一些工具,例如:Canel,Databus,Puma等,這些都是需要部署server和client的。其中server端是由這些工具實現(xiàn),配置了就可以讀binlog,而client端是需要我們動手編寫程序的,遠沒有達到我即插即用的期望和懶人的標準。
同步的格式
原作者的插件flume-ng-sql-source只支持csv的格式,如果開始同步之后,數(shù)據(jù)庫表需要增減字段,則會給開發(fā)者造成很大的困擾。所以我添加了一個分支版本,用來將數(shù)據(jù)以JSON的格式,同步到kafka,字段語義更加清晰。
將此jar包下載之后,和相應的數(shù)據(jù)庫驅(qū)動包,一起放到flume的lib目錄之下即可。
處理機制
flume-ng-sql-source在【status.file.name】文件中記錄讀取數(shù)據(jù)庫表的偏移量,進程重啟后,可以接著上次的進度,繼續(xù)增量讀表。
啟動說明
說明:啟動命令里的【YYYYMM=201711】,會傳入到flume.properties里面,替換${YYYYMM}
[test@localhost?~]$?YYYYMM=201711?bin/flume-ng?agent?-c?conf?-f?conf/flume.properties?-n?sync?&
-c:表示配置文件的目錄,在此我們配置了flume-env.sh,也在conf目錄下;
-f:指定配置文件,這個配置文件必須在全局選項的--conf參數(shù)定義的目錄下,就是說這個配置文件要在前面配置的conf目錄下面;
-n:表示要啟動的agent的名稱,也就是我們flume.properties配置文件里面,配置項的前綴,這里我們配的前綴是【sync】;
flume的配置說明
flume-env.sh
#?配置JVM堆內(nèi)存和java運行參數(shù),配置-DpropertiesImplementation參數(shù)是為了在flume.properties配置文件中使用環(huán)境變量
export?JAVA_OPTS="-Xms512m?-Xmx512m?-Dcom.sun.management.jmxremote?-DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties"
flume.properties
#?數(shù)據(jù)來源
sync.sources?=?s-1
#?數(shù)據(jù)通道
sync.channels?=?c-1
#?數(shù)據(jù)去處,這里配置了failover,根據(jù)下面的優(yōu)先級配置,會先啟用k-1,k-1掛了后再啟用k-2
sync.sinks?=?k-1?k-2
#這個是配置failover的關鍵,需要有一個sink?group
sync.sinkgroups?=?g-1
sync.sinkgroups.g-1.sinks?=?k-1?k-2
#處理的類型是failover
sync.sinkgroups.g-1.processor.type?=?failover
#優(yōu)先級,數(shù)字越大優(yōu)先級越高,每個sink的優(yōu)先級必須不相同
sync.sinkgroups.g-1.processor.priority.k-1?=?5
sync.sinkgroups.g-1.processor.priority.k-2?=?10
#設置為10秒,當然可以根據(jù)你的實際狀況更改成更快或者很慢
sync.sinkgroups.g-1.processor.maxpenalty?=?10000
##########?數(shù)據(jù)通道的定義
#?數(shù)據(jù)量不大,直接放內(nèi)存。其實還可以放在JDBC,kafka或者磁盤文件等
sync.channels.c-1.type?=?memory
#?通道隊列的最大長度
sync.channels.c-1.capacity?=?100000
#?putList和takeList隊列的最大長度,sink從capacity中抓取batchsize個event,放到這個隊列。所以此參數(shù)最好比capacity小,比sink的batchsize大。
#?官方定義:The?maximum?number?of?events?the?channel?will?take?from?a?source?or?give?to?a?sink?per?transaction.
sync.channels.c-1.transactionCapacity?=?1000
sync.channels.c-1.byteCapacityBufferPercentage?=?20
###?默認值的默認值等于JVM可用的最大內(nèi)存的80%,可以不配置
#?sync.channels.c-1.byteCapacity?=?800000
#########sql?source#################
#?source?s-1用到的通道,和sink的通道要保持一致,否則就GG了
sync.sources.s-1.channels=c-1
#########?For?each?one?of?the?sources,?the?type?is?defined
sync.sources.s-1.type?=?org.keedio.flume.source.SQLSource
sync.sources.s-1.hibernate.connection.url?=?jdbc:mysql://192.168.1.10/testdb?useSSL=false
#########?Hibernate?Database?connection?properties
sync.sources.s-1.hibernate.connection.user?=?test
sync.sources.s-1.hibernate.connection.password?=?123456
sync.sources.s-1.hibernate.connection.autocommit?=?true
sync.sources.s-1.hibernate.dialect?=?org.hibernate.dialect.MySQL5Dialect
sync.sources.s-1.hibernate.connection.driver_class?=?com.mysql.jdbc.Driver
sync.sources.s-1.run.query.delay=10000
sync.sources.s-1.status.file.path?=?/home/test/apache-flume-1.8.0-bin/status
#?用上${YYYYMM}環(huán)境變量,是因為我用的測試表示一個月表,每個月的數(shù)據(jù)會放到相應的表里。使用方式見上面的啟動說明
sync.sources.s-1.status.file.name?=?test_${YYYYMM}.status
########?Custom?query
sync.sources.s-1.start.from?=?0
sync.sources.s-1.custom.query?=?select?*?from?t_test_${YYYYMM}?where?id?>?$@$?order?by?id?asc
sync.sources.s-1.batch.size?=?100
sync.sources.s-1.max.rows?=?100
sync.sources.s-1.hibernate.connection.provider_class?=?org.hibernate.connection.C3P0ConnectionProvider
sync.sources.s-1.hibernate.c3p0.min_size=5
sync.sources.s-1.hibernate.c3p0.max_size=20
#########?sinks?1
#?sink?k-1用到的通道,和source的通道要保持一致,否則取不到數(shù)據(jù)
sync.sinks.k-1.channel?=?c-1
sync.sinks.k-1.type?=?org.apache.flume.sink.kafka.KafkaSink
sync.sinks.k-1.kafka.topic?=?sync-test
sync.sinks.k-1.kafka.bootstrap.servers?=?localhost:9092
sync.sinks.k-1.kafka.producer.acks?=?1
#?每批次處理的event數(shù)量
sync.sinks.k-1.kafka.flumeBatchSize??=?100
#########?sinks?2
#?sink?k-2用到的通道,和source的通道要保持一致,否則取不到數(shù)據(jù)
sync.sinks.k-2.channel?=?c-1
sync.sinks.k-2.type?=?org.apache.flume.sink.kafka.KafkaSink
sync.sinks.k-2.kafka.topic?=?sync-test
sync.sinks.k-2.kafka.bootstrap.servers?=?localhost:9092
sync.sinks.k-2.kafka.producer.acks?=?1
sync.sinks.k-2.kafka.flumeBatchSize??=?100
flume各部分參數(shù)含義
batchData的大小見參數(shù):batchSize
PutList和TakeList的大小見參數(shù):transactionCapactiy
Channel總?cè)萘看笮∫妳?shù):capacity
問題記錄
異常:Exception in thread "PollableSourceRunner-SQLSource-src-1" java.lang.AbstractMethodError: org.keedio.flume.source.SQLSource.getMaxBackOffSleepInterval()J
分析:由于我用的是flume1.8,而flume-ng-sql-1.4.3插件對應的flume-ng-core版本是1.5.2,1.8版本里的PollableSource接口多了兩個方法 getBackOffSleepIncrement(); getMaxBackOffSleepInterval();在失敗補償暫停線程處理時,需要用到這個方法。
解決方法:更新flume-ng-sql-1.4.3里依賴的flume-ng-core版本為1.8.0,并在源代碼【SQLSource.java】里添加這兩個方法即可。
@Override
public?long?getBackOffSleepIncrement()?{
return?1000;
}
@Override
public?long?getMaxBackOffSleepInterval()?{
return?5000;
}
總結(jié)
以上是生活随笔為你收集整理的mysql增量同步kafka_MySQL数据实时增量同步到Kafka - Flume的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 借贷宝七天高炮上征信嘛 现已接入征信系统
- 下一篇: 钛白粉上市公司有哪些