数仓开发之DIM层
目錄
一:DIM層設計要點
?二:DIM層大概實操流程
? ? ?2.1 讀取數據
? ?2.2 過濾數據
? ?2.3 寫出數據
?三:配置表
3.1 配置表設計
?四:實操流程
4.1?接收Kafka數據,過濾空值數據
4.2?動態拆分維度表功能
4.3??把流中的數據保存到對應的維度表
五:具體代碼實現
?5.1?接收Kafka數據,過濾空值數據
5.2?根據MySQL的配置表,動態進行分流
5.3?保存維度到HBase(Phoenix)
一:DIM層設計要點
(1)DIM層的設計依據是維度建模理論,該層存儲維度模型的維度表。
(2)DIM層的數據存儲在?HBase?表中。
DIM?層表是用于維度關聯的,要通過主鍵去獲取相關維度信息,這種場景下 K-V?類型數據庫的效率較高。常見的 K-V?類型數據庫有 Redis、HBase,而 Redis?的數據常駐內存,會給內存造成較大壓力,因而選用 HBase?存儲維度數據。
(3)DIM層表名的命名規范為dim_表名
?二:DIM層大概實操流程
? ? ?2.1 讀取數據
Kafka---topic_db(包含所有的46張業務表)
? ?2.2 過濾數據
過濾出所需要的維表數據
?? ??? ?過濾條件:在代碼中給定十幾張維表的表名
?? ??? ?問題:如果增加維表,需要修改代碼-重新編譯-打包-上傳、重啟任務
?? ??? ?優化1:不修改代碼、只重啟任務
?? ??? ??? ?配置信息中保存需要的維表信息,配置信息只在程序啟動的時候加載一次
?? ??? ?優化2:不修改代碼、不重啟任務
?? ??? ??? ?方向:讓程序在啟動以后還可以獲取配置信息中增加的內容
?? ??? ??? ?具體實施:
?? ??? ??? ??? ?1) 定時任務:每隔一段時間加載一次配置信息
?? ??? ??? ??? ??? ?將定時任務寫在Open方法
?? ??? ??? ??? ?2) 監控配置信息:一旦配置信息增加了數據,可以立馬獲取到
?? ??? ??? ??? ??? ?(1) MySQLBinlog:FlinkCDC監控直接創建流
?? ??? ??? ??? ??? ??? ?a.將配置信息處理成廣播流:缺點 -> 如果配置信息過大,冗余太多
?? ??? ??? ??? ??? ??? ?b.按照表名進行KeyBy處理:缺點 -> 有可能產生數據傾斜
?? ??? ??? ??? ??? ?(2) 文件:Flume->Kafka->Flink消費創建流
? ?2.3 寫出數據
將數據寫出到Phoenix、JdbcSink、自定義Sink
?三:配置表
本層的任務是將業務數據直接寫入到不同的 HBase?表中。那么如何讓程序知道流中的哪些數據是維度數據?維度數據又應該寫到 HBase?的哪些表中?為了解決這個問題,我們選擇在 MySQL?中構建一張配置表,通過 Flink?CDC 將配置表信息讀取到程序中。
3.1 配置表設計
1)字段解析
我們將為配置表設計五個字段
- source_table:作為數據源的業務數據表名?
- sink_table:作為數據目的地的 Phoenix?表名
- sink_columns:Phoenix?表字段
- sink_pk:Phoenix?表主鍵
- sink_extend:Phoenix?建表擴展,即建表時一些額外的配置語句
將 source_table 作為配置表的主鍵,可以通過它獲取唯一的目標表名、字段、主鍵和建表擴展,從而得到完整的 Phoenix?建表語句。
?數據格式:
{"before":null,"after": {"source_table":"aa","sink_table":"bb","sink_columns":"cc","sink_pk":"id","sink_extend":"xxx"},"source": {"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":165251303 9549,"snapshot":"false","db":"gmall-211126- config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0 ,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1652513039551,"transaction":null}2)在Mysql中創建數據庫建表并開啟Binlog
(1)創建數據庫 gmall_config ,注意:和 gmall?業務庫區分開
[atguigu@hadoop102 db_log]$ mysql -uroot -p000000 -e"create database gmall_config charset utf8 default collate utf8_general_ci"(2)在 gmall_config 庫中創建配置表 table_process
CREATE TABLE `table_process` (`source_table` varchar(200) NOT NULL COMMENT '來源表',`sink_table` varchar(200) DEFAULT NULL COMMENT '輸出表',`sink_columns` varchar(2000) DEFAULT NULL COMMENT '輸出字段',`sink_pk` varchar(200) DEFAULT NULL COMMENT '主鍵字段',`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表擴展',PRIMARY KEY (`source_table`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;(3)在MySQL配置文件中增加 gmall_config 開啟Binlog
[axing@hadoop107 ~]$ sudo vim /etc/my.cnf?(4)為了方便測試,目前就插入兩張表名數據,作為維度表
?四:實操流程
4.1?接收Kafka數據,過濾空值數據
對Maxwell抓取的數據進行ETL,有用的部分保留,沒用的過濾掉。
4.2?動態拆分維度表功能
由于Maxwell是把全部數據統一寫入一個Topic中, 這樣顯然不利于日后的數據處理。所以需要把各個維度表拆開處理。
在實時計算中一般把維度數據寫入存儲容器,一般是方便通過主鍵查詢的數據庫比如HBase,Redis,MySQL等。
這樣的配置不適合寫在配置文件中,因為這樣的話,業務端隨著需求變化每增加一張維度表表,就要修改配置重啟計算程序。
所以這里需要一種動態配置方案,把這種配置長期保存起來,一旦配置有變化,實時計算可以自動感知。這種可以有三個方案實現:
一種是用Zookeeper存儲,通過Watch感知數據變化;
另一種是用mysql數據庫存儲,周期性的同步;
再一種是用mysql數據庫存儲,使用廣播流。
這里選擇第三種方案,主要是MySQL對于配置數據初始化和維護管理,使用FlinkCDC讀取配置信息表,將配置流作為廣播流與主流進行連接。
4.3??把流中的數據保存到對應的維度表
?維度數據保存到HBase的表中。
五:具體代碼實現
?5.1?接收Kafka數據,過濾空值數據
1)創建 KafkaUtil?工具類
和 Kafka?交互要用到 Flink?提供的 FlinkKafkaConsumer、FlinkKafkaProducer 類,為了提高模板代碼的復用性,將其封裝到 KafkaUtil?工具類中。
此處從?Kafka?讀取數據,創建 getKafkaConsumer(String topic, String groupId) 方法
public class KafkaUtil {static String BOOTSTRAP_SERVERS = "hadoop102:9092, hadoop103:9092, hadoop104:9092";static String DEFAULT_TOPIC = "default_topic";public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {Properties prop = new Properties();prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic,new KafkaDeserializationSchema<String>() {@Overridepublic boolean isEndOfStream(String nextElement) {return false;}@Overridepublic String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {if(record != null && record.value() != null) {return new String(record.value());}return null;}@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}}, prop);return consumer; } }2)主程序
package com.atguigu.app.dim;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.app.func.TableProcessFunction; import com.atguigu.bean.TableProcess; import com.atguigu.utils.MyKafkaUtil; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class DimApp {public static void main(String[] args) throws Exception {//1.獲取執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//生產環境,并行度應設置為kafka主題的分區數/*//生產環境下使用://1.1 開啟checkpointenv.enableCheckpointing(5*6000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointTimeout(10*6000L);env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));//1.2 設置狀態后端env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop107:8020/211126/ck");System.setProperty("HADOOP_USER_NAME","atguigu");*///2.讀取kafka topic_db主題數據創建主流String topic ="topic_db";String groupId = "dim_app_211126";DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));//3.過濾掉非JSON數據以及保留新增、變化以及初始化數據并將數據轉換為JSON格式SingleOutputStreamOperator<JSONObject> filterJsonDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> collector) throws Exception {try {//將數據裝換為JSON格式JSONObject jsonObject = JSON.parseObject(value);//獲取數據中的操作類型字段String type = jsonObject.getString("type");//保留新增、變化、以及初始化數據if ("insert".equals(type) || "update".equals(type) || "bootstrap-insert".equals(type)) {collector.collect(jsonObject);}} catch (Exception e) {System.out.println("臟數據:" + value);//或者寫入側輸出流}}});//4.使用FlinkCDC讀取mysql配置信息表創建配置流MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("hadoop107").port(3306).username("root").password("000000").databaseList("gmall-config").tableList("gmall-config.table_process").startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();DataStreamSource<String> mysqlSourceDS = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"MysqlSource");//5.將配置流處理為廣播流MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);BroadcastStream<String> broadcastStream = mysqlSourceDS.broadcast(mapStateDescriptor);//6.連接主流和廣播流BroadcastConnectedStream<JSONObject, String> connectedStream = filterJsonDS.connect(broadcastStream);//7.處理連接流,根據配置信息處理主流數據SingleOutputStreamOperator<JSONObject> dimDS = connectedStream.process(new TableProcessFunction(mapStateDescriptor));//8.將數據寫出到PhoenixdimDS.print(">>>>>");//9.啟動任務env.execute();} }5.2?根據MySQL的配置表,動態進行分流
1)導入依賴
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.1.0</version></dependency><dependency><groupId>org.apache.phoenix</groupId><artifactId>phoenix-spark</artifactId><version>5.0.0-HBase-2.0</version><exclusions><exclusion><groupId>org.glassfish</groupId><artifactId>javax.el</artifactId></exclusion></exclusions></dependency><!-- 如果不引入 flink-table 相關依賴,則會報錯: Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter 引入以下依賴可以解決這個問題(引入某些其它的 flink-table相關依賴也可) --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.13.0</version></dependency>2)創建配置表實體類
package com.atguigu.gmall.realtime.bean; import lombok.Data;@Data public class TableProcess {//來源表String sourceTable;//輸出表String sinkTable;//輸出字段String sinkColumns;//主鍵字段String sinkPk;//建表擴展String sinkExtend; }3)編寫操作讀取配置表形成廣播流
// TODO 6. FlinkCDC 讀取配置流并廣播流// 6.1 FlinkCDC 讀取配置表信息MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("hadoop102").port(3306).databaseList("gmall_config") // set captured database.tableList("gmall_config.table_process") // set captured table.username("root").password("000000").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.startupOptions(StartupOptions.initial()).build();// 6.2 封裝為流DataStreamSource<String> mysqlDSSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MysqlSource");// 6.3 廣播配置流MapStateDescriptor<String, TableProcess> tableConfigDescriptor = new MapStateDescriptor<String, TableProcess>("table-process-state", String.class, TableProcess.class);BroadcastStream<String> broadcastDS = mysqlDSSource.broadcast(tableConfigDescriptor);// TODO 7. 連接流BroadcastConnectedStream<JSONObject, String> connectedStream = filterDS.connect(broadcastDS);4)定義一個項目中常用的配置常量類GmallConfig
package com.atguigu.gmall.realtime.common;public class GmallConfig {// Phoenix庫名public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";// Phoenix驅動public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";// Phoenix連接參數public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181"; }5)自定義函數MyBroadcastFunction
(1)定義類MyBroadcastFunction
package com.atguigu.gmall.realtime.app.func;import com.alibaba.fastjson.JSONObject; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;public class MyBroadcastFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, TableProcess> tableConfigDescriptor;public MyBroadcastFunction(MapStateDescriptor<String, TableProcess> tableConfigDescriptor) {this.tableConfigDescriptor = tableConfigDescriptor; }@Overridepublic void processElement(JSONObject jsonObj, ReadOnlyContext readOnlyContext, Collector<JSONObject> out) throws Exception {}@Overridepublic void processBroadcastElement(String jsonStr, Context context, Collector<JSONObject> out) throws Exception {} }?(2)自定義函數MyBroadcastFunction-open
// 定義Phoenix的連接 private Connection conn;@Overridepublic void open(Configuration parameter) throws Exception {super.open(parameter);Class.forName(GmallConfig.PHOENIX_DRIVER);conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);}(3)自定義函數MyBroadcastFunction-processBroadcastElement
@Overridepublic void processBroadcastElement(String jsonStr, Context context, Collector<JSONObject> out) throws Exception {JSONObject jsonObj = JSON.parseObject(jsonStr);BroadcastState<String, TableProcess> tableConfigState = context.getBroadcastState(tableConfigDescriptor);String op = jsonObj.getString("op");if ("d".equals(op)) {TableProcess before = jsonObj.getObject("before", TableProcess.class);String sourceTable = before.getSourceTable();tableConfigState.remove(sourceTable);} else {TableProcess config = jsonObj.getObject("after", TableProcess.class);String sourceTable = config.getSourceTable();String sinkTable = config.getSinkTable();String sinkColumns = config.getSinkColumns();String sinkPk = config.getSinkPk();String sinkExtend = config.getSinkExtend();tableConfigState.put(sourceTable, config);checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);}}(4)自定義函數MyBroadcastFunction-checkTable
在 Phoenix?建表之前要先創建命名空間 GMALL2022_REALTIM。
0: jdbc:phoenix:> create schema GMALL2022_REALTIME;checkTable() 方法如下
/*** Phoenix 建表函數** @param sinkTable 目標表名 eg. test* @param sinkColumns 目標表字段 eg. id,name,sex* @param sinkPk 目標表主鍵 eg. id* @param sinkExtend 目標表建表擴展字段 eg. ""* eg. create table if not exists mydb.test(id varchar primary key, name varchar, sex varchar)...*/private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {// 封裝建表 SQL StringBuilder sql = new StringBuilder();sql.append("create table if not exists " + GmallConfig.HBASE_SCHEMA+ "." + sinkTable + "(\n");String[] columnArr = sinkColumns.split(",");// 為主鍵及擴展字段賦默認值if (sinkPk == null) {sinkPk = "id";}if (sinkExtend == null) {sinkExtend = "";}// 遍歷添加字段信息for (int i = 0; i < columnArr.length; i++) {sql.append(columnArr[i] + " varchar");// 判斷當前字段是否為主鍵if (sinkPk.equals(columnArr[i])) {sql.append(" primary key");}// 如果當前字段不是最后一個字段,則追加","if (i < columnArr.length - 1) {sql.append(",\n");}}sql.append(")");sql.append(sinkExtend);String createStatement = sql.toString();// 為數據庫操作對象賦默認值,執行建表 SQL PreparedStatement preparedSt = null;try {preparedSt = conn.prepareStatement(createStatement);preparedSt.execute();} catch (SQLException sqlException) {sqlException.printStackTrace();System.out.println("建表語句\n" + createStatement + "\n執行異常");} finally {if (preparedSt != null) {try {preparedSt.close();} catch (SQLException sqlException) {sqlException.printStackTrace();throw new RuntimeException("數據庫操作對象釋放異常");}}}}(5)自定義函數MyBroadcastFunction-processElement()
@Overridepublic void processElement(JSONObject jsonObj, ReadOnlyContext readOnlyContext, Collector<JSONObject> out) throws Exception {ReadOnlyBroadcastState<String, TableProcess> tableConfigState = readOnlyContext.getBroadcastState(tableConfigDescriptor);// 獲取配置信息String sourceTable = jsonObj.getString("table");TableProcess tableConfig = tableConfigState.get(sourceTable);if (tableConfig != null) {JSONObject data = jsonObj.getJSONObject("data");String sinkTable = tableConfig.getSinkTable();// 根據 sinkColumns 過濾數據String sinkColumns = tableConfig.getSinkColumns();filterColumns(data, sinkColumns);// 將目標表名加入到主流數據中data.put("sinkTable", sinkTable);out.collect(data);}}(6)自定義函數MyBroadcastFunction-filterColumns(),校驗字段,過濾掉多余的字段
private void filterColumns(JSONObject data, String sinkColumns) {Set<Map.Entry<String, Object>> dataEntries = data.entrySet();dataEntries.removeIf(r -> !sinkColumns.contains(r.getKey()));}(7)主程序DimSinkApp中調用MyBroadcastFunction提取維度數據
// TODO 8. 處理維度表數據SingleOutputStreamOperator<JSONObject> dimDS = connectedStream.process(new MyBroadcastFunction(tableConfigDescriptor));5.3?保存維度到HBase(Phoenix)
1)程序流程分析
?
DimSink 繼承了RickSinkFunction,這個function得分兩條時間線:
一條是任務啟動時執行open操作(圖中紫線),我們可以把連接的初始化工作放在此處一次性執行;
另一條是隨著每條數據的到達反復執行invoke()(圖中黑線),在這里面我們要實現數據的保存,主要策略就是根據數據組合成sql提交給hbase。
2)創建 PhoenixUtil 工具類,在其中創建insertValues()方法
package com.atguigu.gmall.realtime.util;import com.alibaba.fastjson.JSONObject; import com.atguigu.gmall.realtime.common.GmallConfig; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang3.StringUtils;import java.sql.*; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set;public class PhoenixUtil {/*** Phoenix 表數據導入方法** @param conn 連接對象* @param sinkTable 寫入數據的 Phoenix 目標表名* @param data 待寫入的數據*/public static void insertValues(Connection conn, String sinkTable, JSONObject data) {// 獲取字段名Set<String> columns = data.keySet();// 獲取字段對應的值Collection<Object> values = data.values();// 拼接字段名String columnStr = StringUtils.join(columns, ",");// 拼接字段值String valueStr = StringUtils.join(values, "','");// 拼接插入語句String sql = "upsert into " + GmallConfig.HBASE_SCHEMA+ "." + sinkTable + "(" +columnStr + ") values ('" + valueStr + "')";// 為數據庫操作對象賦默認值PreparedStatement preparedSt = null;// 執行 SQLtry {preparedSt = conn.prepareStatement(sql);preparedSt.execute();// 提交事務conn.commit();} catch (SQLException sqlException) {sqlException.printStackTrace();throw new RuntimeException("數據庫操作對象獲取或執行異常");} finally {if (preparedSt != null) {try {preparedSt.close();} catch (SQLException sqlException) {sqlException.printStackTrace();throw new RuntimeException("數據庫操作對象釋放異常");}}} } }3)MyPhoenixSink
自定義 SinkFunction?子類 MyPhoenixSink,在其中調用 Phoenix?工具類的 insertValues(String sinkTable, JSONObject data) 方法,將維度數據寫出到 Phoenix?的維度表中。為了提升效率,減少頻繁創建銷毀連接帶來的性能損耗,創建連接池。
(1)添加德魯伊連接池依賴
<dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.16</version></dependency>(2)連接池創建工具類
package com.atguigu.gmall.realtime.util;import com.alibaba.druid.pool.DruidDataSource;public class DruidDSUtil {private static DruidDataSource druidDataSource;public static DruidDataSource createDataSource() {// 創建連接池druidDataSource = new DruidDataSource();// 設置驅動全類名druidDataSource.setDriverClassName(GmallConfig.PHOENIX_DRIVER);// 設置連接 urldruidDataSource.setUrl(GmallConfig.PHOENIX_SERVER);// 設置初始化連接池時池中連接的數量druidDataSource.setInitialSize(5);// 設置同時活躍的最大連接數druidDataSource.setMaxActive(20);// 設置空閑時的最小連接數,必須介于 0 和最大連接數之間,默認為 0druidDataSource.setMinIdle(1);// 設置沒有空余連接時的等待時間,超時拋出異常,-1 表示一直等待druidDataSource.setMaxWait(-1);// 驗證連接是否可用使用的 SQL 語句druidDataSource.setValidationQuery("select 1");// 指明連接是否被空閑連接回收器(如果有)進行檢驗,如果檢測失敗,則連接將被從池中去除// 注意,默認值為 true,如果沒有設置 validationQuery,則報錯// testWhileIdle is true, validationQuery not setdruidDataSource.setTestWhileIdle(true);// 借出連接時,是否測試,設置為 false,不測試,否則很影響性能druidDataSource.setTestOnBorrow(false);// 歸還連接時,是否測試druidDataSource.setTestOnReturn(false);// 設置空閑連接回收器每隔 30s 運行一次druidDataSource.setTimeBetweenEvictionRunsMillis(30 * 1000L);// 設置池中連接空閑 30min 被回收,默認值即為 30 mindruidDataSource.setMinEvictableIdleTimeMillis(30 * 60 * 1000L);return druidDataSource;} }(3)MyPhoenixSink 函數
package com.atguigu.gmall.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.fastjson.JSONObject; import com.atguigu.gmall.realtime.util.DruidDSUtil; import com.atguigu.gmall.realtime.util.PhoenixUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.SQLException;public class MyPhoenixSink extends RichSinkFunction<JSONObject> {private DruidDataSource druidDataSource;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 創建連接池druidDataSource = DruidDSUtil.createDataSource();}@Overridepublic void invoke(JSONObject jsonObj, Context context) throws Exception {// 獲取目標表表名String sinkTable = jsonObj.getString("sinkTable");// 獲取 id 字段的值String id = jsonObj.getString("id");// 清除 JSON 對象中的 sinkTable 字段// 以便可將該對象直接用于 HBase 表的數據寫入jsonObj.remove("sinkTable");// 獲取連接對象DruidPooledConnection conn = druidDataSource.getConnection();try {PhoenixUtil.insertValues(conn, sinkTable, jsonObj);} catch (Exception e) {System.out.println("維度數據寫入異常");e.printStackTrace();} finally {try {// 歸還數據庫連接對象conn.close();} catch (SQLException sqlException) {System.out.println("數據庫連接對象歸還異常");sqlException.printStackTrace();}}} }4)主程序 DimSinkApp?中調用 MyPhoenixSink
// TODO 9. 將數據寫入 Phoenix 表dimDS.addSink(new MyPhoenixSink());6)測試
(1)啟動HDFS、ZK、Kafka、Maxwell、HBase
(2)運行?IDEA 中的?DimSinkApp
(3)執行 mysql_to_kafka_init.sh 腳本
mysql_to_kafka_init.sh all(4)通過phoenix查看hbase的schema以及表情況
?附:整個流程的步驟以及所需要的進程
數據流:web/app -> nginx -> 業務服務器 -> Mysql(binlog) -> Maxwell -> Kafka(ODS) -> FlinkApp -> Phoenix程序:Mock -> Mysql(binlog) -> Maxwell -> Kafka(Zk) -> DimApp(FlinkCDC/Mysql) -> Phoenix(HBase/ZK/HDFS)/*** 需要啟動的進程:* dfs -> zookeeper -> kafka -> maxwell -> hbase -> phoenix(客戶端):bin/sqlline.py*/總結
- 上一篇: ubuntu 连接双显示器
- 下一篇: Unity 3D 接入 移动MM (3.