Flink数据清洗(Kafka事实表+Redis维度表)
調研
從網上的調研來看,其實整個百度有清洗流程的只有[1]其他都是抄的[1]中的內容。
?
實驗流程
這個流程的話,不要去研究redis的Flink SQL Client的操作方法,因為在mvn repository中
沒有看到flink-sql-connector-redis之類 的jar
所以該流程適可而止吧
####################################################################
Redis數據準備
127.0.0.1:6379> hset areas AREA_US US
127.0.0.1:6379> hset areas AREA_CT TW,HK
127.0.0.1:6379> hset areas AREA_IN IN
127.0.0.1:6379> hset areas AREA_AR PK,SA,KW
127.0.0.1:6379> hset areas AREA_IN IN
127.0.0.1:6379> hgetall areas
1) "AREA_US"
2) "US"
3) "AREA_CT"
4) "TW,HK"
5) "AREA_IN"
6) "IN"
7) "AREA_AR"
8) "PK,SA,KW"
?
本實驗的redis對象是沒有密碼的,如果事先設置了密碼,可以根據[14]去除
?
Redis代碼中的注意事項
代碼中有這么一句話:
this.jedis = new Jedis("127.0.0.1", 6379);
注意,這里的127.0.0.1如果改成redis所在節點的域名的話,必須是該redis支持外網訪問,否則此處不要修改,會導致數據讀取失敗
####################################################################
本實驗注意事項
①redis相關的jar依賴其實目前官方沒有在維護了.所以不要做太深入的鉆研
②需要導入flink-shaded-hadoop-3-uber-3.1.1.7.0.3.0-79-7.0.jar
Project Structure->Global Libraries中間一列導入上述的jar
否則會報錯找不到hdfs這個file system
####################################################################
數據清洗目標
kafka(存放事實表)中數據示范:
{"dt":"2021-01-11 12:30:32","countryCode":"PK","data":[{"type":"s3","score":0.8,"level":"C"},{"type":"s5","score":0.1,"level":"C"}]}
格式化后如下:
{"dt": "2021-01-11 12:30:32","countryCode": "PK","data": [{"type": "s3","score": 0.8,"level": "C"},{"type": "s5","score": 0.1,"level": "C"}] }這樣的一條數據,根據countryCode轉化為redis(存放維度表)中的具體地區AREA_AR
后面list中的數據打散,最終想要的效果如下:
{"area":"AREA_AR","dt":"2021-01-11 12:30:32","score":0.8,"level":"C","type":"s3"}
{"area":"AREA_AR","dt":"2021-01-11 12:30:32","score":0.1,"level":"C","type":"s5"}
也就是想要根據上述要求,把一條數據轉化為兩條數據
####################################################################
完整實驗操作與代碼
https://gitee.com/appleyuchi/Flink_Code/tree/master/flink清洗數據案例/FlinkProj
####################################################################
可能涉及到的Kafka操作
| 操作 | 命令 | 備注 |
| 查看topic | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 | 無 ?
|
| 往allData這個 topic發送 json 消息 | $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic allData | 這里可能碰到[2]中的報錯,注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴格保持一致 [2]中的報錯還可能是某個節點的kafka掛掉導致的. ? 可能碰到[3] 注意關閉防火墻 ? ? |
| 使用kafka自帶消費端測試下消費 | $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic allData | 如果kafka自帶消費者測試有問題,那么就不用繼續往下面做了, 此時如果使用Flink SQL Client來消費也必然會出現問題 |
| 清除topic中所有數據[13] | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic allData | 需要$KAFKA/config/server.properties設置 delete.topic.enable=true |
?
Reference:
[1]【19】Flink 實戰案例開發(一):數據清洗(完整代碼+數據,依賴有問題)
[2]Flink清洗Kafka數據存入MySQL測試(數據好像不太完整)
[3]Flink案例開發之數據清洗、數據報表展現(與[1]內容重復)
[4]Flink繼續實踐:從日志清洗到實時統計內容PV等多個指標(代碼不完整)
[5]Flink清洗日志服務SLS的數據并求ACU&PCU(工程文件不完整)
下面的最后考慮(博主說是完整的.下面的實驗的原始出處其實是[1])
[6]Flink入門及實戰(20)- 數據清洗實時ETL(1)
[7]Flink入門及實戰(21)- 數據清洗實時ETL(2)
[8]Flink入門及實戰(22)- 數據清洗實時ETL(3)[10]Flink 清理過期 Checkpoint 目錄的正確姿勢
[11]Flink學習(二):實驗一數據清洗(代碼不完整,涉及到了elasticsearch)
[12]網站日志實時分析之Flink處理實時熱門和PVUV統計(缺數據)
[13]Is there a way to delete all the data from a topic or delete the topic before every run?
[14]redis設置密碼
?
總結
以上是生活随笔為你收集整理的Flink数据清洗(Kafka事实表+Redis维度表)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 五氯酚酸钠的用途
- 下一篇: ElastieSearch安装以及与Mo