oracle表增量同步到hive分区表
本文以shell腳本,通過傳參的的形式,將服務器ip,oracle的庫名表名以及作為分區(qū)字段的字段名稱,hive的庫名,表名作為參數(shù)傳入,這樣可以做到靈活變更,提高通用性與方便性,通過定時器調度此腳本即可。
腳本包含三步:
? ?一:通過sqoop將oracle數(shù)據(jù)導入到hive臨時表,臨時表需創(chuàng)建,無分區(qū)表
? ?二:將hive臨時表數(shù)據(jù)insert到hive正式表,以傳入的分區(qū)字段作為分區(qū),此腳本中分區(qū)有 年xxxx,月xxxx-xx,日xxxx-xx-xx
? ?三:因為我有用到impala,所以在第三步加上了impala刷新操作,如不刷新,impala將識別不到新增hive數(shù)據(jù)? ?
#!/bin/bash
 #
# import table from oracle into hive
nargs=$#
 echo "argument num: $nargs "
today=`date +%Y-%m-%d`
 one_day=`date +%Y-%m-%d -d'-1 day'`
 coll_db=''
 coll_tab=''
 coll_host_ip=''
 coll_host_port=1521
 coll_tab_username=''
 coll_tab_passwd=''
hive_db=''
 hive_tab=''
 #hive_tab_cols=''
 hive_map_cols=''
start_dt=''
 end_dt=''
pt_col=''?
/usr/bin/kinit -kt /opt/yarn.keytab yarn
# argument parse
 argParse()
 {
 ?? ?echo "argument num: $nargs "
?? ?for ag in $@
 ?? ?do
 ?? ?#?? ?echo $ag
 ?? ??? ?arg_key=${ag%=*}
 ?? ??? ?arg_val=${ag#*=}
 #?? ??? ?echo "${ag%=*}--- ${ag#*=}"?? ??? ?
 #?? ??? ?echo "$arg_key---- $arg_val"
 ?? ??? ?case ${arg_key} in
 ?? ??? ??? ?"coll_host_ip") ?? ?coll_host_ip=$arg_val?? ?;;
 ?? ??? ??? ?"coll_db") ?? ??? ?coll_db=$arg_val?? ?;;
 ?? ??? ??? ?"coll_tab") ?? ??? ?coll_tab=$arg_val?? ?;;
 ?? ??? ??? ?"hive_db") ?? ??? ?hive_db=$arg_val?? ?;;
 ?? ??? ??? ?"hive_tab") ?? ??? ?hive_tab=$arg_val?? ?;;
 ?? ??? ??? ?#"hive_tab_cols") ?? ?hive_tab_cols=$arg_val?? ?;;
 ?? ??? ??? ?"hive_map_cols") ?? ?hive_map_cols=$arg_val?? ?;;
 ?? ??? ??? ?"start_dt") ?? ??? ?start_dt=$arg_val?? ?;;
 ?? ??? ??? ?"end_dt") ?? ??? ?end_dt=$arg_val?? ?;;
 ?? ??? ??? ?"pt_col") ?? ??? ?pt_col=$arg_val?? ?;;
 ?? ??? ?esac
 ?? ?done
 }
# parse the arguments key:value paire
 argParse $@
 # pring argument
 printArgs()
 {
 ?? ?echo "coll_host_ip:$coll_host_ip"
 ?? ?echo "coll_db:$coll_db"
 ?? ?echo "coll_tab:$coll_tab"
 ?? ?echo "hive_db:$hive_db"
 ?? ?echo "hive_tab:$hive_tab"
 ?? ?#echo "hive_tab_cols:$hive_tab_cols"
 ?? ?echo "hive_map_cols:$hive_map_cols"
 ?? ?echo "start_dt:$start_dt"
 ?? ?echo "end_dt:$end_dt"
 ?? ?echo "pt_col:$pt_col"
 ? ? ? ? echo "one_day:$one_day"
 ? ? ? ? echo "today:$today"
 }
# print the value of argument
 printArgs
# sqoop import table of mysql to hive tmp table
 sqoopImpTempTab()
 {
 ?? ?echo ?"@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ [`date +\"%F %T\"`] sqoop import start @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ "
 ? ? ? ? if [ -n "${hive_map_cols}" ]
 ? ? ? ? then
 ?? ??? ?sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true -Dmapreduce.job.queuename=bf_yarn_pool.production \
 ? ? ? ? ? ? ? ? ? ? ? ? --connect jdbc:oracle:thin:@$coll_host_ip:$coll_host_port/$coll_db \
 ? ? ? ? ? ? ? ? ? ? ? ? --table $coll_tab --username $coll_tab_username --password $coll_tab_passwd \
 ? ? ? ? ? ? ? ? ? ? ? ? --delete-target-dir \
 ? ? ? ? ? ? ?? ??? ?--hive-import --hive-overwrite --hive-database tmp_${hive_db} --hive-table tmp_${hive_tab} --hive-drop-import-delims -m 1 \
 ? ? ? ? ? ? ?? ??? ?--where "${pt_col} >=to_date('${one_day}','yyyy-mm-dd') and ${pt_col}<to_date('${today}','yyyy-mm-dd')" --fields-terminated-by '\001' \
 ?? ??? ??? ?--split-by ${hive_map_cols} \
 ? ? ? ? ? ? ?? ??? ?--null-string '\\N' --null-non-string '\\N'?
 ? ? ? ? else
 ?? ??? ?sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true -Dmapreduce.job.queuename=bf_yarn_pool.production \
 ? ? ? ? ? ? ? ? ? ? ? ? --connect jdbc:oracle:thin:@$coll_host_ip:$coll_host_port/$coll_db \
 ? ? ? ? ?? ? ? ? ? ?--table $coll_tab --username $coll_tab_username --password $coll_tab_passwd \
 ?? ??? ??? ?--delete-target-dir \
 ? ? ? ? ? ? ? ? ? ? ? ? --hive-import --hive-overwrite --hive-database tmp_${hive_db} --hive-table tmp_${hive_tab} --hive-drop-import-delims -m 1 \
 ? ? ? ? ? ? ? ? ? ? ? ? --where "${pt_col} >=to_date('${one_day}','yyyy-mm-dd') and ${pt_col}<to_date('${today}','yyyy-mm-dd')" --fields-terminated-by '\001' \
 ? ? ? ? ? ? ? ? ? ? ? ? --null-string '\\N' --null-non-string '\\N' #&> /dev/null?
? ? ? ? fi
 ? ? ? ? RET=$?
 ? ? ? ? if [ $RET -eq 0 ]; then
 ? ? ? ? ? ? ? ? echo "`date +\"%F %T\"` [INFO] sqoop import database:$hive_db table:temp_$hive_tab successfully."
 ? ? ? ? else
 ? ? ? ? ? ? ? ? echo "`date +\"%F %T\"` [ERROR] sqoop import database:$hive_db table:temp_$hive_tab error."
 ? ? ? ? ? ? ? ? exit 5
 ? ? ? ? fi
 ? ? ? ? echo "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ [`date +\"%F %T\"`] sqoop import end @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ "
}
 sqoopImpTempTab
# store tmp table to hive rcfile table
 hiveStoreAsRc()
 {
?? ?#pt_col=create_time
 ? ? ? ? echo '---------------------------------------------------------- hive store as rcfile start ?-------------------------------------------------------------------------------------'
?? ?hsql="use ${hive_db};set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;
 ? ? ? ? ? ? ? ? set mapreduce.map.memory.mb=15000;set mapreduce.reduce.memory.mb=15000; set hive.merge.mapredfiles=true;set hive.exec.max.created.files=100000;
 ? ? ? ? ? ? ? ? SET hive.exec.max.dynamic.partitions=100000;SET hive.exec.max.dynamic.partitions.pernode=100000;from tmp_${hive_db}.tmp_${hive_tab} \
 ?? ??? ?INSERT OVERWRITE TABLE ${hive_db}.${hive_tab} PARTITION(pk_year,pk_month,pk_day) select *,substr(${pt_col}, 1, 4),substr(${pt_col}, 1, 7),substr(${pt_col}, 1, 10) \
 ?? ??? ?where ${pt_col} >='${one_day}' and ${pt_col}<'${today}' "
?? ?#phsql=$hsql" INSERT INTO TABLE ${hive_db}.${hive_tab} PARTITION(partition_key='${sdt:0:7}') select $hive_tab_cols where $pt_col > '${sdt:0:7}';"?? ?
 ?? ?echo "#################### hsql: $hsql"
 ?? ?#hive -S -e "${hsql}"
 ?? ?beeline --hiveconf mapreduce.job.queuename=bf_yarn_pool.production --silent=true --showHeader=false --showWarnings=false -u 'jdbc:hive2://localhost:10000/default;' -n yarn -p *******? -e "${hsql}"?? ?
?? ?RET=$?
 ?? ?if [ $RET -eq 0 ]; then
 ? ? ? ? ?? ?echo "`date +\"%F %T\"` [INFO] ${hive_db}.${hive_tab} ?store successfully."?
 ? ? ? ??? ? ?? ?#exit 0
 ?? ?else
 ? ? ? ??? ??? ?echo "`date +\"%F %T\"` [ERROR] ${hive_db}.${hive_tab} ?store failure."?
 ? ? ? ??? ??? ?exit 5
 ?? ?fi
 ? ? ? ? echo '---------------------------------------------------------- hive store as rcfile end ?-------------------------------------------------------------------------------------'
 }
hiveStoreAsRc
# impala table refresh
 impTabRefrsh()
 {
 ?? ?echo '********************************************************** impala refresh table start *****************************************************************************************'?? ?
 ? ? ? ? beeline --silent=true --showHeader=false --showWarnings=false -u 'jdbc:hive2://localhost:21050/default;' -n yarn -p ******* -e "refresh ${hive_db}.${hive_tab}"
?? ?RET=$?
?? ?if [ ${RET} -eq 0 ]; then
 ?? ??? ?echo "`date +\"%F %T\"` [INFO] impala:refresh ${hive_db}.${hive_tab} success!"
 ?? ??? ?exit 0
 ?? ?else ? ?
 ?? ??? ?echo "`date +\"%F %T\"` [ERROR] impala:refresh ${hive_db}.${hive_tab} failure!"
 ?? ??? ?exit 5
 ?? ?fi?? ?
 ?? ?echo '********************************************************** impala refresh table end *****************************************************************************************'?? ?
 }
impTabRefrsh?
 ?
總結
以上是生活随笔為你收集整理的oracle表增量同步到hive分区表的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: *基于类平衡自我训练的无监督域自适应用于
- 下一篇: 关于web出现此问题:index:25
