基于OGG Datahub插件将Oracle数据同步上云
摘要:隨著數據規模的不斷擴大,傳統的RDBMS難以滿足OLAP的需求,本文將介紹如何將Oracle的數據實時同步到阿里云的大數據處理平臺當中,并利用大數據工具對數據進行分析。
一、背景介紹
隨著數據規模的不斷擴大,傳統的RDBMS難以滿足OLAP的需求,本文將介紹如何將Oracle的數據實時同步到阿里云的大數據處理平臺當中,并利用大數據工具對數據進行分析。
OGG(Oracle GoldenGate)是一個基于日志的結構化數據備份工具,一般用于Oracle數據庫之間的主從備份以及Oracle數據庫到其他數據庫(DB2, MySQL等)的同步。下面是Oracle官方提供的一個OGG的整體架構圖,從圖中可以看出OGG的部署分為源端和目標端兩部分組成,主要有Manager,Extract,Pump,Collector,Replicat這么一些組件。
- Manager:在源端和目標端都會有且只有一個Manager進程存在,負責管理其他進程的啟停和監控等;
- Extract:負責從源端數據庫表或者事務日志中捕獲數據,有初始加載和增量同步兩種模式可以配置,初始加載模式是直接將源表數據同步到目標端,而增量同步就是分析源端數據庫的日志,將變動的記錄傳到目標端,本文介紹的是增量同步的模式;
- Pump:Extract從源端抽取的數據會先寫到本地磁盤的Trail文件,Pump進程會負責將Trail文件的數據投遞到目標端;
- Collector:目標端負責接收來自源端的數據,生成Trail文件
- Replicat:負責讀取目標端的Trail文件,轉化為相應的DDL和DML語句作用到目標數據庫,實現數據同步。
本文介紹的Oracle數據同步是通過OGG的Datahub插件實現的,該Datahub插件在架構圖中處于Replicat的位置,會分析Trail文件,將數據的變化記錄寫入Datahub中,可以使用流計算對datahub中的數據進行實時分析,也可以將數據歸檔到MaxCompute中進行離線處理。
二、安裝步驟
0. 環境要求
- 源端已安裝好Oracle
- 源端已安裝好OGG(建議版本Oracle GoldenGate V12.1.2.1)
- 目標端已安裝好OGG Adapters(建議版本Oracle GoldenGate Application Adapters 12.1.2.1)
- java 7
(下面將介紹Oracle/OGG相關安裝和配置過程,Oracle的安裝將不做介紹,另外需要注意的是:Oracle/OGG相關參數配置以熟悉Oracle/OGG的運維人員配置為準,本示例只是提供一個可運行的樣本,Oracle所使用的版本為ORA11g)
1. 源端OGG安裝
下載OGG安裝包解壓后有如下目錄:
drwxr-xr-x install drwxrwxr-x response -rwxr-xr-x runInstaller drwxr-xr-x stage目前oracle一般采取response安裝的方式,在response/oggcore.rsp中配置安裝依賴,具體如下:
oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2 # 需要目前與oracle版本對應 INSTALL_OPTION=ORA11g # goldegate主目錄 SOFTWARE_LOCATION=/home/oracle/u01/ggate # 初始不啟動manager START_MANAGER=false # manger端口 MANAGER_PORT=7839 # 對應oracle的主目錄 DATABASE_LOCATION=/home/oracle/u01/app/oracle/product/11.2.0/dbhome_1 # 暫可不配置 INVENTORY_LOCATION= # 分組(目前暫時將oracle和ogg用同一個賬號ogg_test,實際可以給ogg單獨賬號) UNIX_GROUP_NAME=oinstall執行命令:
runInstaller -silent -responseFile {YOUR_OGG_INSTALL_FILE_PATH}/response/oggcore.rsp本示例中,安裝后OGG的目錄在/home/oracle/u01/ggate,安裝日志在/home/oracle/u01/ggate/cfgtoollogs/oui目錄下,當silentInstall{時間}.log文件里出現如下提示,表明安裝成功:
The installation of Oracle GoldenGate Core was successful.執行/home/oracle/u01/ggate/ggsci命令,并在提示符下鍵入命令:CREATE SUBDIRS,從而生成ogg需要的各種目錄(dir打頭的那些)。
至此,源端OGG安裝完成。
2. 源端Oracle配置
以dba分身進入sqlplus:sqlplus / as sysdba
# 創建獨立的表空間 create tablespace ATMV datafile '/home/oracle/u01/app/oracle/oradata/uprr/ATMV.dbf' size 100m autoextend on next 50m maxsize unlimited;# 創建ogg_test用戶,密碼也為ogg_test create user ogg_test identified by ogg_test default tablespace ATMV;# 給ogg_test賦予充分的權限 grant connect,resource,dba to ogg_test;# 檢查附加日志情況 Select SUPPLEMENTAL_LOG_DATA_MIN, SUPPLEMENTAL_LOG_DATA_PK, SUPPLEMENTAL_LOG_DATA_UI, SUPPLEMENTAL_LOG_DATA_FK, SUPPLEMENTAL_LOG_DATA_ALL from v$database;# 增加數據庫附加日志及回退 alter database add supplemental log data; alter database add supplemental log data (primary key, unique,foreign key) columns; # rollback alter database drop supplemental log data (primary key, unique,foreign key) columns; alter database drop supplemental log data;# 全字段模式,注意:在該模式下的delete操作也只有主鍵值 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; # 開啟數據庫強制日志模式 alter database force logging; # 執行marker_setup.sql 腳本 @marker_setup.sql # 執行@ddl_setup.sql @ddl_setup.sql # 執行role_setup.sql @role_setup.sql # 給ogg用戶賦權 grant GGS_GGSUSER_ROLE to ogg_test; # 執行@ddl_enable.sql,開啟DDL trigger @ddl_enable.sql # 執行優化腳本 @ddl_pin ogg_test # 安裝sequence support @sequence.sql # alter table sys.seq$ add supplemental log data (primary key) columns;3. OGG源端mgr配置
以下是通過ggsci對ogg進行配置
配置mgr
edit params mgr
啟動mgr(運行日志在ggate/dirrpt中)
start mgr
查看mgr狀態
info mgr
查看mgr配置
view params mgr
4. OGG源端extract配置
以下是通過ggsci對ogg進行配置
配置extract(名字可任取,extract是組名)
edit params extract
增加extract進程(ext后的名字要跟上面extract對應,本例中extract是組名)
add ext extract,tranlog, begin now
刪除某廢棄進程DP_TEST
delete ext DP_TEST
添加抽取進程,每個隊列文件大小為200m
add exttrail ./dirdat/st,ext extract, megabytes 200
啟動抽取進程(運行日志在ggate/dirrpt中)
start extract extract
至此,extract配置完成,數據庫的一條變更可以在ggate/dirdat目錄下的文件中看到
5. 生成def文件
源端ggsci起來后執行如下命令,生成defgen文件,并且拷貝到目標端dirdef下
edit params defgen
在shell中執行如下命令,生成ogg_test.def
./defgen paramfile ./dirprm/defgen.prm
6. 目標端OGG安裝和配置
解壓adapter包
將源端中dirdef/ogg_test.def文件拷貝到adapter的dirdef下
執行ggsci起來后執行如下命令,創建必須目錄
create subdirs
編輯mgr配置
edit params mgr
啟動mgr
start mgr
7. 源端ogg pump配置
啟動ggsci后執行如下操作:
編輯pump配置
edit params pump
添加投遞進程,從某一個隊列開始投
add ext pump,exttrailsource ./dirdat/st
備注:投遞進程,每個隊文件大小為200m
add rmttrail ./dirdat/st,ext pump,megabytes 200
啟動pump
start pump
啟動后,結合上面adapter的配置,可以在目標端的dirdat目錄下看到過來的trailfile
8. Datahub插件安裝和配置
依賴環境:jdk1.7。
配置好JAVA_HOME, LD_LIBRARY_PATH,可以將環境變量配置到~/.bash_profile中,例如
修改環境變量后,請重啟adapter的mgr進程
下載datahub-ogg-plugin.tar.gz并解壓:
修改conf路徑下的javaue.properties文件,將{YOUR_HOME}替換為解壓后的路徑
gg.handlerlist=ggdatahubgg.handler.ggdatahub.type=com.aliyun.odps.ogg.handler.datahub.DatahubHandler gg.handler.ggdatahub.configureFileName={YOUR_HOME}/datahub-ogg-plugin/conf/configure.xmlgoldengate.userexit.nochkpt=false goldengate.userexit.timestamp=utcgg.classpath={YOUR_HOME}/datahub-ogg-plugin/lib/* gg.log.level=debugjvm.bootoptions=-Xmx512m -Dlog4j.configuration=file:{YOUR_HOME}/datahub-ogg-plugin/conf/log4j.properties -Djava.class.path=ggjava/ggjava.jar修改conf路徑下的log4j.properties文件,將{YOUR_HOME}替換為解壓后的路徑
修改conf路徑下的configure.xml文件,修改方式見文件中的注釋
xml version="1.0" encoding="UTF-8" <configue><defaultOracleConfigure><!-- oracle sid, 必選--><sid>100</sid><!-- oracle schema, 可以被mapping中的oracleSchema覆蓋, 兩者必須有一個非空--><schema>ogg_test</schema></defaultOracleConfigure><defalutDatahubConfigure><!-- datahub endpoint, 必填--><endPoint>YOUR_DATAHUB_ENDPOINT</endPoint><!-- datahub project, 可以被mapping中的datahubProject, 兩者必須有一個非空--><project>YOUR_DATAHUB_PROJECT</project><!-- datahub accessId, 可以被mapping中的datahubAccessId覆蓋, 兩者必須有一個非空--><accessId>YOUR_DATAHUB_ACCESS_ID</accessId><!-- datahub accessKey, 可以被mapping中的datahubAccessKey覆蓋, 兩者必須有一個非空--><accessKey>YOUR_DATAHUB_ACCESS_KEY</accessKey><!-- 數據變更類型同步到datahub對應的字段,可以被columnMapping中的ctypeColumn覆蓋 --><ctypeColumn>optype</ctypeColumn><!-- 數據變更時間同步到datahub對應的字段,可以被columnMapping中的ctimeColumn覆蓋 --><ctimeColumn>readtime</ctimeColumn><!-- 數據變更序號同步到datahub對應的字段, 按數據變更先后遞增, 不保證連續, 可以被columnMapping中的cidColumn覆蓋 --><cidColumn>record_id</cidColumn> <!-- 額外增加的常量列,每條record該列值為指定值,格式為c1=xxx,c2=xxx,可以被columnMapping中的constColumnMap覆蓋--><constColumnMap></constColumnMap></defalutDatahubConfigure><!-- 默認最嚴格,不落文件 直接退出 無限重試--><!-- 運行每批上次的最多紀錄數, 可選, 默認1000--><batchSize>1000</batchSize><!-- 默認時間字段轉換格式, 可選, 默認yyyy-MM-dd HH:mm:ss--><defaultDateFormat>yyyy-MM-dd HH:mm:ss</defaultDateFormat><!-- 臟數據是否繼續, 可選, 默認false--><dirtyDataContinue>true</dirtyDataContinue><!-- 臟數據文件, 可選, 默認datahub_ogg_plugin.dirty--><dirtyDataFile>datahub_ogg_plugin.dirty</dirtyDataFile><!-- 臟數據文件最大size, 單位M, 可選, 默認500--><dirtyDataFileMaxSize>200</dirtyDataFileMaxSize><!-- 重試次數, -1:無限重試 0:不重試 n:重試次數, 可選, 默認-1--><retryTimes>0</retryTimes><!-- 重試間隔, 單位毫秒, 可選, 默認3000--><retryInterval>4000</retryInterval><!-- 點位文件, 可選, 默認datahub_ogg_plugin.chk--><checkPointFileName>datahub_ogg_plugin.chk</checkPointFileName><mappings><mapping><!-- oracle schema, 見上描述--><oracleSchema></oracleSchema><!-- oracle table, 必選--><oracleTable>t_person</oracleTable><!-- datahub project, 見上描述--><datahubProject></datahubProject><!-- datahub AccessId, 見上描述--><datahubAccessId></datahubAccessId><!-- datahub AccessKey, 見上描述--><datahubAccessKey></datahubAccessKey><!-- datahub topic, 必選--><datahubTopic>t_person</datahubTopic><ctypeColumn></ctypeColumn><ctimeColumn></ctimeColumn><cidColumn></cidColumn><constColumnMap></constColumnMap><columnMapping><!--src:oracle字段名稱, 必須;dest:datahub field, 必須;destOld:變更前數據落到datahub的field, 可選;isShardColumn: 是否作為shard的hashkey, 可選, 默認為false, 可以被shardId覆蓋isDateFormat: timestamp字段是否采用DateFormat格式轉換, 默認true. 如果是false, 源端數據必須是longdateFormat: timestamp字段的轉換格式, 不填就用默認值--><column src="id" dest="id" isShardColumn="true" isDateFormat="false" dateFormat="yyyy-MM-dd HH:mm:ss"/><column src="name" dest="name" isShardColumn="true"/><column src="age" dest="age"/><column src="address" dest="address"/><column src="comments" dest="comments"/><column src="sex" dest="sex"/><column src="temp" dest="temp" destOld="temp1"/></columnMapping><!--指定shard id, 優先生效, 可選--><shardId>1</shardId></mapping></mappings> </configue>在ggsci下啟動datahub writer
edit params dhwriter
extract dhwriter getEnv (JAVA_HOME) getEnv (LD_LIBRARY_PATH) getEnv (PATH) CUSEREXIT ./libggjava_ue.so CUSEREXIT PASSTHRU INCLUDEUPDATEBEFORES, PARAMS "{YOUR_HOME}/datahub-ogg-plugin/conf/javaue.properties" sourcedefs ./dirdef/ogg_test.def table OGG_TEST.*;添加dhwriter
add extract dhwriter, exttrailsource ./dirdat/st
啟動dhwriter
start dhwriter
三、使用場景
這里會用一個簡單的示例來說明數據的使用方法,例如我們在Oracle數據庫有一張商品訂單表orders(oid int, pid int, num int),該表有三列,分別為訂單ID, 商品ID和商品數量。
將這個表通過OGG Datahub進行增量數據同步之前,我們需要先將源表已有的數據通過DataX同步到MaxCompute中。增量同步的關鍵步驟如下:
(1)在Datahub上創建相應的Topic,Topic的schema為(string record_id, string optype, string readtime, bigint oid_before, bigint oid_after, bigint pid_before, bigint pid_after, bigint num_before, bigint num_after);
(2)OGG Datahub的插件按照上述的安裝流程部署配置,其中列的Mapping配置如下:
其中optype和readtime字段是記錄數據庫的數據變更類型和時間,optype有"I", "D", "U"三種取值,分別對應為“增”,“刪”,“改”三種數據變更操作。
(3)OGG Datahub插件部署好成功運行后,插件會源源不斷的將源表的數據變更記錄輸送至datahub中,例如我們在源訂單表中新增一條記錄(1,2,1),datahub里收到的記錄如下:
修改這條數據,比如把num改為20,datahub則會收到的一條變更數據記錄,如下:
+-------+------------+------------+------------+------------+------------+------------+------------+------------+ | record_id | optype | readtime | oid_before | oid_after | pid_before | pid_after | num_before | num_after | +--------+------------+------------+------------+------------+------------+------------+------------+------------+ | 14810373343080000 | U | 2016-12-06 15:15:58.000253 | 1 | 1 | 2 | 2 | 1 | 20 |實時計算
在前一天的離線計算的基礎數據上,我們可以寫一個StreamCompute流計算的分析程序,很容易地對數據進行實時匯總,例如實時統計當前總的訂單數,每種商品的銷售量等。處理思路就是對于每一條到來的變更數據,可以拿到變化的數值,實時更新統計變量即可。
離線處理
為了便于后續的離線分析,我們也可以將Datahub里的數據歸檔到MaxCompute中,在MaxCompute中創建相應Schema的表:
create table orders_log(record_id string, optype string, readtime string, oid_before bigint, oid_after bigint, pid_before bigint, pid_after bigint, num_before bigint, num_after bigint);在Datahub上創建MaxCompute的數據歸檔,上述流入Datahub里的數據將自動同步到MaxCompute當中。建議將同步到MaxCompute中的數據按照時間段進行劃分,比如每一天的增量數據都對應一個獨立分區。這樣當天的數據同步完成后,我們可以處理對應的分區,拿到當天所有的數據變更,而與和前一天的全量數據進行合并后,即可得到當天的全量數據。為了簡單起見,先不考慮分區表的情況,以2016-12-06這天的增量數據為例,假設前一天的全量數據在表orders_base里面,datahub同步過來的增量數據在orders_log表中,將orders_base與orders_log做合并操作,可以得到2016-12-06這天的最終全量數據寫入表orders_result中。這個過程可以在MaxCompute上用如下這樣一條SQL完成。
INSERT OVERWRITE TABLE orders_result SELECT t.oid, t.pid, t.num FROM (SELECT oid, pid, num, '0' x_record_id, 1 AS x_optypeFROMorders_base UNION ALLSELECT decode(optype,'D',oid_before,oid_after) AS oid, decode(optype,'D', pid_before,pid_after) AS pid, num_after AS num, record_id x_record_id, decode(optype, 'D', 0, 1) AS x_optypeFROMorders_log) t JOIN(SELECToid, pid, max(record_id) x_max_modifiedFROM(SELECToid, pid, '0' record_idFROMorders_base UNION ALL SELECTdecode(optype,'D',oid_before,oid_after) AS oid, decode(optype,'D', pid_before,pid_after) AS pid, record_idFROMorders_log ) gGROUP BY oid , pid) s ON t.oid = s.oid AND t.pid = s.pid AND t.x_record_id = s.x_max_modified AND t.x_optype <> 0;四、常見問題
Q:目標端報錯 OGG-06551 Oracle GoldenGate Collector: Could not translate host name localhost into an Internet address.
A:目標端機器hostname在/etc/hosts里面重新設置localhost對應的ip
Q:找不到jvm相關的so包
A:將jvm的so路徑添加到LD_LIBRARY_PATH后,重啟mgr
Q:有了DDL語句,比如增加一列,源端ogg沒有問題,但是adapter端的ffwriter和jmswriter進程退出,且報錯: 2015-06-11 14:01:10 ERROR OGG-01161 Bad column index (5) specified for table OGG_TEST.T_PERSON, max columns = 5.
A:由于表結構改變,需要重做def文件,將重做的def文件放入dirdef后重啟即可
作者:冶善
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的基于OGG Datahub插件将Oracle数据同步上云的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 机器学习算法概述:随机森林逻辑回归
- 下一篇: PostgreSQL PostGIS 的