生产上完成TopN统计流程
背景
現(xiàn)有城市信息和產(chǎn)品信息兩張表在MySQL中,另外有用戶點(diǎn)擊產(chǎn)品日志以文本形式存在hdfs上,現(xiàn)要求統(tǒng)計(jì)每個(gè)個(gè)城市區(qū)域下點(diǎn)擊量前三的產(chǎn)品名,具體信息見(jiàn)下方。
mysql> show tables; +---------------------------------+ | Tables_in_d7 | +---------------------------------+ | city_info | | product_info | | result_product_area_clicks_top3 | +---------------------------------+ 3 rows in set (0.00 sec)mysql> desc city_info; +-----------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-----------+--------------+------+-----+---------+-------+ | city_id | int(11) | YES | | NULL | | | city_name | varchar(255) | YES | | NULL | | | area | varchar(255) | YES | | NULL | | +-----------+--------------+------+-----+---------+-------+ 3 rows in set (0.00 sec)mysql> select * from city_info; +---------+-----------+------+ | city_id | city_name | area | +---------+-----------+------+ | 1 | BEIJING | NC | | 2 | SHANGHAI | EC | | 3 | NANJING | EC | | 4 | GUANGZHOU | SC | | 5 | SANYA | SC | | 6 | WUHAN | CC | | 7 | CHANGSHA | CC | | 8 | XIAN | NW | | 9 | CHENGDU | SW | | 10 | HAERBIN | NE | +---------+-----------+------+ 10 rows in set (0.00 sec)mysql> desc product_info; +--------------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +--------------+--------------+------+-----+---------+-------+ | product_id | int(11) | YES | | NULL | | | product_name | varchar(255) | YES | | NULL | | | extend_info | varchar(255) | YES | | NULL | | +--------------+--------------+------+-----+---------+-------+ 3 rows in set (0.00 sec)mysql> select * from product_info limit 10; <-- product_info總數(shù)100 +------------+--------------+----------------------+ | product_id | product_name | extend_info | +------------+--------------+----------------------+ | 1 | product1 | {"product_status":1} | | 2 | product2 | {"product_status":1} | | 3 | product3 | {"product_status":1} | | 4 | product4 | {"product_status":1} | | 5 | product5 | {"product_status":1} | | 6 | product6 | {"product_status":1} | | 7 | product7 | {"product_status":1} | | 8 | product8 | {"product_status":1} | | 9 | product9 | {"product_status":0} | | 10 | product10 | {"product_status":1} | +------------+--------------+----------------------+ 10 rows in set (0.00 sec)[hadoop@hadoop001 data]$ more user_click.txt 95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:01:56,1(city_id),72(product_id) 95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:52:26,1,68 95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:17:03,1,40 95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:32:07,1,21 95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:26:06,1,63 95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:03:11,1,60 95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:43:43,1,30 95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:09:58,1,96 95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:18:45,1,71 95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:42:39,1,8 95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:24:30,1,6 95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:29:49,1,26 95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:24:12,1,83 95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:07:50,1,62 95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:19:31,1,61 95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:40:51,1,46 .... [hadoop@hadoop001 data]$ wc -l user_click.txt 11448 user_click.txt 復(fù)制代碼解決思路
1)city_info表和product_info表通過(guò)sqoop放到Hive里面
2)通過(guò)user_click關(guān)聯(lián)Hive里面的city_info和product_info
3)再使用窗口函數(shù)求分組內(nèi)的TOPN將結(jié)果sqoop導(dǎo)入MySQL
4)shell腳本封裝這個(gè)業(yè)務(wù)線的所有代碼的思路,需要提及的一點(diǎn),因?yàn)閏ity_info/product_info數(shù)據(jù)變動(dòng)少,所以通過(guò)其他的腳本導(dǎo)入,這個(gè)shell腳本不涉及,但我下面步驟依然會(huì)寫(xiě)出來(lái)。
5)使用crontab觸發(fā),每天凌晨2點(diǎn)開(kāi)始執(zhí)行
注意點(diǎn):
a) 每次創(chuàng)建的臨時(shí)表,在執(zhí)行之前一定要先刪除,要使用if not exits
b) 關(guān)鍵的執(zhí)行要有日志輸出
c) shell腳本如何解決冪等性問(wèn)題
MySQL導(dǎo)入Hive
在sqoop部署篇講到過(guò)怎么部署和使用sqoop,這里不在說(shuō)明,直接上代碼。
# 這里給出hive里的city_info的表結(jié)構(gòu) hive (d7)> create table city_info(city_id int,city_name string,area string ) row format delimited fields terminated by '\t';# 導(dǎo)入city_info [hadoop@hadoop001 ~]$ sqoop import \ --connect "jdbc:mysql://localhost:3306/d7" \ --username root \ --password root \ --table city_info \ --split-by 'city_id' \ --fields-terminated-by '\t' \ --hive-import \ --hive-database d7 \ --target-dir '/user/hive/warehouse/d7.db/city_info' \ --delete-target-dir \ -m 2# 這里給出hive里的product_info的表結(jié)構(gòu) hive (d7)> create table product_info(product_id int,product_name string,extend_info string ) row format delimited fields terminated by '\t';# 導(dǎo)入product_info [hadoop@hadoop001 ~]$ sqoop import \ --connect "jdbc:mysql://localhost:3306/d7" \ --username root \ --password root \ --table product_info \ --split-by 'product_id' \ --fields-terminated-by '\t' \ --hive-import \ --hive-database d7 \ --target-dir '/user/hive/warehouse/d7.db/product_info' \ --delete-target-dir \ -m 2 復(fù)制代碼ps:如果你第一次用sqoop的話,這里肯定會(huì)有兩個(gè)坑。這里暫且不說(shuō),下篇文章解答。
user_click加載數(shù)據(jù)
生產(chǎn)上hive的user_click表肯定是個(gè)一直數(shù)據(jù)增長(zhǎng)的表,所以該表肯定是個(gè)分區(qū)表。但是一般來(lái)說(shuō)清洗好的前一天數(shù)據(jù)會(huì)直接放在user_click表存放hdfs上路徑上,比如分區(qū)表存放路徑為hdfs://hadoop001:9000/user/hive/warehouse/d7.db/user_click,那么生產(chǎn)上會(huì)將2016-05-05日志清洗好并在該路徑上創(chuàng)建分區(qū)路徑。這時(shí)候你查詢分區(qū)表不會(huì)出現(xiàn)該分區(qū)數(shù)據(jù),該怎么高效的將數(shù)據(jù)刷新到分區(qū)表呢?請(qǐng)看下方代碼
# 先給出user_click表結(jié)構(gòu) hive (d7)> create table user_click(user_id int,session_id string,action_time string,city_id int,product_id int ) partitioned by(day string) row format delimited fields terminated by ',';# 刷新分區(qū)表,另一種刷新方式不推薦,過(guò)于暴力 hive (d7)> alter table user_click add if not exists partition(day='2016-05-05'); 復(fù)制代碼三表關(guān)聯(lián)生成臨時(shí)表
臨時(shí)表有區(qū)域名,產(chǎn)品名,點(diǎn)擊量三個(gè)字段。
hive (d7)> drop table if exists tmp_product_area_clicks; hive (d7)> create table tmp_product_area_clicks as> select b.area,c.product_name,count(1) as click_count from user_click a> left join city_info b on a.city_id=b.city_id> left join product_info c on a.product_id=c.product_id > where a.day='2016-05-05'> group by b.area,c.product_name 復(fù)制代碼窗口函數(shù)得到TopN結(jié)果
使用row_number()函數(shù)
hive (d7)> drop table if exists result_product_area_clicks_top3; hive (d7)> create table result_product_area_clicks_top3> row format delimited fields terminated by '\t' as> select * from ( > select > "2016-05-05" day,product_id,product_name,area,click_count, <-- 日期會(huì)在腳本中更改> row_number() over(partition by area order by click_count desc) rank> from tmp_product_area_clicks> ) t where t.rank<=3; 復(fù)制代碼Hive導(dǎo)出MySQL
# 我們事先在MySQL創(chuàng)建好結(jié)果表,下面為表結(jié)構(gòu) create table result_product_area_clicks_top3( day varchar(15), product_id int(11), product_name varchar(50), area varchar(10), click_count int(11), rank int(10) )# 為了冪等性,會(huì)將MySQL結(jié)果表該日期的數(shù)據(jù)先刪掉 # 日期會(huì)在腳本中更改 mysql> delete from result_product_area_clicks_top3 where day='2016-05-05'; [hadoop@hadoop001 ~]$ sqoop export \ --connect jdbc:mysql://localhost:3306/d7 \ --password root \ --username root \ --table result_product_area_clicks_top3\ --export-dir /user/hive/warehouse/d7_hive.db/result_product_area_clicks_top3 \ --columns "day,product_id,product_name,area,click_count,rank" \ --fields-terminated-by '\t' \ -m 2 復(fù)制代碼shell腳本編寫(xiě)
hive離線是一天一次,是今天某個(gè)時(shí)間去運(yùn)行昨天的數(shù)據(jù),所以要在shell腳本中獲取前一天,該命令為'date --date '1 day ago' +%Y-%m-%d'。下面就是shell腳本代碼。
[hadoop@hadoop001 ~]$ vim top3.sh CURRENT=`date +%Y-%m-%d_%H:%M:%S` USE_DAY=`date --date '1 day ago' +%Y-%m-%d` echo '當(dāng)前使用的日期為:'$USE_DAY''echo ''$CURRENT',開(kāi)始刷新分區(qū)' HIVE_PARTITION_SQL="alter table d7.user_click add if not exists partition(day='${USE_DAY}');" hive -e "${HIVE_PARTITION_SQL}"echo ''$CURRENT',開(kāi)始創(chuàng)建臨時(shí)表,其中數(shù)據(jù)為每個(gè)區(qū)域下每個(gè)產(chǎn)品的點(diǎn)擊數(shù)' HIVE_TMP_SQL="drop table if exists tmp_product_area_clicks; create table tmp_product_area_clicks as select b.area,c.product_name,count(1) as click_count from user_click a left join city_info b on a.city_id=b.city_id left join product_info c on a.product_id=c.product_id where a.day='${USE_DAY}' group by b.area,c.product_name;" hive -e "${HIVE_TMP_SQL}"echo ''$CURRENT',開(kāi)始創(chuàng)建結(jié)果表,其中數(shù)據(jù)為每個(gè)區(qū)域下每個(gè)產(chǎn)品的前三點(diǎn)擊數(shù)' HIVE_RESULT_SQL="drop table if exists result_product_area_clicks_top3; create table result_product_area_clicks_top3 row format delimited fields terminated by '\t' as select * from ( select '${USE_DAY}' day,product_id,product_name,area,click_count, row_number() over(partition by area order by click_count desc) rank from tmp_product_area_clicks ) t where t.rank<=3;" hive -e "${HIVE_RESULT_SQL}"echo ''$CURRENT',保持冪等性,開(kāi)始刪除MySQL結(jié)果表中當(dāng)前'$USE_DAY'數(shù)據(jù)' MySQL_DETELE_SQL="delete from result_product_area_clicks_top3 where day='${USE_DAY}';" sudo mysql -uroot -proot -e "${MySQL_DETELE_SQL}"echo ''$CURRENT',開(kāi)始將Hive結(jié)果表導(dǎo)入MySQL' sqoop export \ --connect jdbc:mysql://localhost:3306/d7 \ --password root \ --username root \ --table result_product_area_clicks_top3\ --export-dir /user/hive/warehouse/d7_hive.db/result_product_area_clicks_top3 \ --columns "day,product_id,product_name,area,click_count,rank" \ --fields-terminated-by '\t' \ -m 2 echo ''$CURRENT',整個(gè)流程結(jié)束,請(qǐng)查看MySQL中數(shù)據(jù)是否導(dǎo)入'復(fù)制代碼定時(shí)后臺(tái)執(zhí)行
使用crontab來(lái)做定時(shí),具體見(jiàn)下方代碼
[hadoop@hadoop001 ~]$ crontab -e * 2 * * * nohup /home/hadoop/top3.sh >> /tmp/top3_logs.log 2>&1 & 復(fù)制代碼轉(zhuǎn)載于:https://juejin.im/post/5d37feb26fb9a07ece681119
總結(jié)
以上是生活随笔為你收集整理的生产上完成TopN统计流程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 记录6月28日的体验,自己现实的感触
- 下一篇: HDU1166-敌兵布阵