ClickHouse表引擎之Integration系列
? Integration系統(tǒng)表引擎主要用于將外部數(shù)據(jù)導(dǎo)入到ClickHouse中,或者在ClickHouse中直接操作外部數(shù)據(jù)源。
1 Kafka
1.1 Kafka引擎
? 將Kafka Topic中的數(shù)據(jù)直接導(dǎo)入到ClickHouse。
? 語(yǔ)法如下:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] (name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],... ) ENGINE = Kafka() SETTINGSkafka_broker_list = 'host:port',kafka_topic_list = 'topic1,topic2,...',kafka_group_name = 'group_name',kafka_format = 'data_format'[,][kafka_row_delimiter = 'delimiter_symbol',][kafka_schema = '',][kafka_num_consumers = N,][kafka_max_block_size = 0,][kafka_skip_broken_messages = N,][kafka_commit_every_batch = 0]? 參數(shù)說(shuō)明:
? ①必需的參數(shù)
| kafka_broker_list | Kafka broker列表,以逗號(hào)分隔 |
| kafka_topic_list | Kafka topic列表 |
| kafka_group_name | Kafka消費(fèi)者組,如果不希望消息在集群中重復(fù),使用相同的組名 |
| kafka_format | 消息格式。使用與SQL格式函數(shù)相同的符號(hào),例如JSONEachRow |
? ②可選參數(shù)
| kafka_row_delimiter | 分隔符字符,用于一行的結(jié)束標(biāo)識(shí)符號(hào) |
| kafka_schema | 如果kafka_format參數(shù)需要schema定義,則通過(guò)該參數(shù)來(lái)支持 |
| kafka_num_consumers | 每張表的消費(fèi)者個(gè)數(shù)。默認(rèn)值:1。如果一個(gè)使用者的吞吐量不足,則指定更多使用者。使用者的總數(shù)不應(yīng)該超過(guò)主題中的分區(qū)數(shù),因?yàn)槊總€(gè)分區(qū)只能分配一個(gè)使用者。 |
| kafka_max_block_size | 輪詢(xún)的最大批處理大小 |
| kafka_skip_broken_messages | 忽略無(wú)效記錄的條數(shù)。默認(rèn)值:0 |
| kafka_commit_every_batch | 在編寫(xiě)整個(gè)塊之后提交每個(gè)使用和處理的批而不是單個(gè)提交(默認(rèn)值:0) |
? 測(cè)試:(1)建表
CREATE TABLE test_kafka (\timestamp UInt64,\level String,\message String\) ENGINE = Kafka() SETTINGS kafka_broker_list = 'ambari01:6667,ambari02:6667,ambari03:6667',\kafka_topic_list = 'test',\kafka_group_name = 'group1',\kafka_format = 'JSONEachRow',\kafka_row_delimiter = '\n'? 注意:如果后面在查詢(xún)過(guò)程中報(bào)如下錯(cuò)誤。是因?yàn)橛行┮姘姹敬嬖诘?#xff0c;消息中數(shù)據(jù)之間的分割符號(hào)未指定,導(dǎo)致無(wú)法處理。解決辦法: 添加 kafka_row_delimiter = ‘\n’。
Cannot parse input: expected { before: \0: (at row 2)? (2)在kafka建立一個(gè)新的topic
sh kafka-topics.sh --create --zookeeper ambari01:2181,ambari02:2181,ambari03:2181 --replication-factor 1 --partitions 3 --topic test? (3)在kafka建立發(fā)布者console-producer
sh kafka-console-producer.sh --broker-list ambari01:6667,ambari02:6667,ambari03:6667 --topic test? (4)發(fā)送數(shù)據(jù)
{"timestamp":1515897460,"level":"one","message":"aa"}? 注意:由于一個(gè)kafka的partition 只能由一個(gè) group consumer 消費(fèi),所以clickhouse 節(jié)點(diǎn)數(shù)需要大于 topic 的 partition 數(shù)。
? (5)第一次查詢(xún)
SELECT * FROM test_kafka ┌──timestamp─┬─level─┬─message─┐ │ 1515897460 │ one │ aa │ └────────────┴───────┴─────────┘? (6)第二次查詢(xún)
SELECT * FROM test_kafka Ok.? 發(fā)現(xiàn)第二次查詢(xún)的時(shí)候沒(méi)有數(shù)據(jù)了,因?yàn)?Kafka引擎 表只是 kafka 流的一個(gè)視圖而已,當(dāng)數(shù)據(jù)被 select 了一次之后,這個(gè)數(shù)據(jù)就會(huì)被認(rèn)為已經(jīng)消費(fèi)了,下次 select 就不會(huì)再出現(xiàn)。所以Kafka表單獨(dú)使用是沒(méi)什么用的,一般是用來(lái)和 MaterialView 配合,將Kafka表里面的數(shù)據(jù)自動(dòng)導(dǎo)入到 MaterialView 里面。
? (7)與 MaterialView 集成
? 我們現(xiàn)在每一節(jié)點(diǎn)建一個(gè) MaterialView 保存 Kafka 里面的數(shù)據(jù), 再建一個(gè)全局的Distributed表。
CREATE MATERIALIZED VIEW test_kafka_view ENGINE = SummingMergeTree() PARTITION BY day ORDER BY (day, level) AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as count FROM test_kafka GROUP BY day, level;? (6)再次發(fā)送數(shù)據(jù)
{"timestamp":1515897461,"level":"2","message":'bb'} {"timestamp":1515897462,"level":"3","message":'cc'} {"timestamp":1515897462,"level":"3","message":'ee'} {"timestamp":1515897463,"level":"4","message":'dd'}? (7)查詢(xún)數(shù)據(jù)
SELECT * FROM test_kafka Ok.0 rows in set. Elapsed: 2.686 sec. --------------------------------------- SELECT * FROM test_kafka_view Ok.0 rows in set. Elapsed: 0.002 sec.? 發(fā)現(xiàn)沒(méi)有數(shù)據(jù),原因:kafka 引擎默認(rèn)消費(fèi)根據(jù)條數(shù)與時(shí)間進(jìn)行入庫(kù),不然肯定是沒(méi)效率的。其中對(duì)應(yīng)的參數(shù)有兩個(gè)。 max_insert_block_size(默認(rèn)值為: 1048576),stream_flush_interval_ms(默認(rèn)值為: 7500)這兩個(gè)參數(shù)都是全局性的。
? 業(yè)務(wù)系統(tǒng)需要從kafka讀取數(shù)據(jù),按照官方文檔建好表后,也能看到數(shù)據(jù),但是延時(shí)很高。基本要延時(shí)15分鐘左右。kafka的數(shù)據(jù)大約每秒50條左右。基本規(guī)律是累計(jì)到65535行以后(最小的塊大小)才會(huì)在表中顯示數(shù)據(jù)。嘗試更改stream_flush_interval_ms 沒(méi)有作用,但是有不想改max_block_size,因?yàn)樾薷囊院笥绊懙饺炙斜?#xff0c;并且影響搜索效率。希望能每N秒保證不管block有沒(méi)有寫(xiě)滿都flush一次。
? 雖然ClickHouse和 Kafka的配合可以說(shuō)是十分的便利,只有配置好,但是相當(dāng)?shù)木窒扌詫?duì) kafka 數(shù)據(jù)格式的支持也有限。下面介紹WaterDrop這個(gè)中間件將Kafka的數(shù)據(jù)接入ClickHouse。
?
1.2 WaterDrop
? WaterDrop: 是一個(gè)非常易用,高性能、支持實(shí)時(shí)流式和離線批處理的海量數(shù)據(jù)處理產(chǎn)品,架構(gòu)于Apache Spark和 Apache Flink之上。github地址:https://github.com/InterestingLab/waterdrop
? ①下載并解壓
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.4.3/waterdrop-1.4.3.zip unzip waterdrop-1.4.3.zip? ②修改配置文件waterdrop-env.sh
vim /opt/module/waterdrop-1.4.3/config/waterdrop-env.sh SPARK_HOME=/usr/jdp/3.2.0.0-108/spark2 #配置為spark的路徑? ③增加配置文件test.conf
spark {spark.streaming.batchDuration = 5spark.app.name = "test_waterdrop"spark.ui.port = 14020spark.executor.instances = 3spark.executor.cores = 1spark.executor.memory = "1g" }input {kafkaStream {topics = "test_wd"consumer.bootstrap.servers = "10.0.0.50:6667,10.0.0.52:6667,10.0.0.53:6667"consumer.zookeeper.connect = "10.0.0.50:2181,10.0.0.52:2181,10.0.0.53:2181"consumer.group.id = "group1"consumer.failOnDataLoss = falseconsumer.auto.offset.reset = latestconsumer.rebalance.max.retries = 100} } filter {json{source_field = "raw_message"} }output {clickhouse {host = "10.0.0.50:8123"database = "test"table = "test_wd"fields = ["act","b_t","s_t"]username = "admin"password = "admin"retry_codes = [209, 210 ,1002]retry = 10bulk_size = 1000} }? ④創(chuàng)建Clickhouse表
create table test.test_wd( act String, b_t String, s_t Date) ENGINE = MergeTree() partition by s_t order by s_t;? ⑤啟動(dòng)寫(xiě)入程序
cd /data/work/waterdrop-1.4.1 sh /opt/module/waterdrop-1.4.3/bin/start-waterdrop.sh --master yarn --deploy-mode client --config /opt/module/waterdrop-1.4.3/config/test.conf? ⑥插入數(shù)據(jù)
{"act":"aaaa","b_t":"100","s_t":"2019-12-22"} {"act":"bxc","b_t":"200","s_t":"2020-01-01"} {"act":"dd","b_t":"50","s_t":"2020-02-01"}? ⑦查看表數(shù)據(jù)
SELECT * FROM test_wd ┌─act─┬─b_t─┬────────s_t─┐ │ dd │ 50 │ 2020-02-01 │ └─────┴─────┴────────────┘ ┌─act──┬─b_t─┬────────s_t─┐ │ aaaa │ 100 │ 2019-12-22 │ └──────┴─────┴────────────┘ ┌─act─┬─b_t─┬────────s_t─┐ │ bxc │ 200 │ 2020-01-01 │ └─────┴─────┴────────────┘2 MySQL
? 將Mysql作為存儲(chǔ)引擎,可以對(duì)存儲(chǔ)在遠(yuǎn)程 MySQL 服務(wù)器上的數(shù)據(jù)執(zhí)行 select查詢(xún)
? 語(yǔ)法:
MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);? 參數(shù)說(shuō)明
| host:port | MySQL 服務(wù)器地址 |
| database | 數(shù)據(jù)庫(kù)的名稱(chēng) |
| table | 表名稱(chēng) |
| user | 數(shù)據(jù)庫(kù)用戶(hù) |
| password | 用戶(hù)密碼 |
| replace_query | 將 INSERT INTO 查詢(xún)是否替換為 REPLACE INTO 的標(biāo)志。如果 replace_query=1,則替換查詢(xún) |
| on_duplicate_clause | 將 ON DUPLICATE KEY UPDATE on_duplicate_clause 表達(dá)式添加到 INSERT 查詢(xún)語(yǔ)句中。 |
? 測(cè)試:
? 在Mysql中建表,并插入數(shù)據(jù)
CREATE TABLE `user` (`id` int(11) NOT NULL,`username` varchar(50) DEFAULT NULL,`sex` varchar(5) DEFAULT NULL )INSERT INTO user values(11,"zs","0"); INSERT INTO user values(12,"ls","0"); INSERT INTO user values(13,"ww","0"); INSERT INTO user values(14,"ll","1");? 創(chuàng)建ClickHouse表,insert_time字段為默認(rèn)字段
CREATE TABLE test.from_mysql(\id UInt64,\username String,\sex String,\insert_time Date DEFAULT toDate(now())\ ) ENGINE = MergeTree()\ PARTITION BY insert_time \ ORDER BY (id,username)? 插入數(shù)據(jù)
INSERT INTO test.from_mysql (id,username,sex) SELECT id, username,sex FROM mysql('10.0.0.42:3306','test', 'user', 'root', 'admin');? 查詢(xún)數(shù)據(jù)
SELECT * FROM from_mysql ┌─id─┬─username─┬─sex─┬─insert_time─┐ │ 11 │ zs │ 0 │ 2020-05-24 │ │ 12 │ ls │ 0 │ 2020-05-24 │ │ 13 │ ww │ 0 │ 2020-05-24 │ │ 14 │ ll │ 1 │ 2020-05-24 │ └────┴──────────┴─────┴─────────────┘4 rows in set. Elapsed: 0.003 sec.3 HDFS
? 用戶(hù)通過(guò)執(zhí)行SQL語(yǔ)句,可以在ClickHouse中直接讀取HDFS的文件,也可以將讀取的數(shù)據(jù)導(dǎo)入到ClickHouse本地表。
? HDFS引擎:ENGINE = HDFS(URI, format)。URI:HDFS的URI,format:存儲(chǔ)格式,格式鏈接https://clickhouse.tech/docs/en/interfaces/formats/#formats
3.1 查詢(xún)文件
? 這種使用場(chǎng)景相當(dāng)于把HDFS做為ClickHouse的外部存儲(chǔ),當(dāng)查詢(xún)數(shù)據(jù)時(shí),直接訪問(wèn)HDFS的文件,而不是把HDFS文件導(dǎo)入到ClickHouse再進(jìn)行查詢(xún)。相對(duì)于ClickHouse的本地存儲(chǔ)查詢(xún),速度較慢。
? 在HDFS上新建一個(gè)數(shù)據(jù)文件:user.csv,上傳hadoop fs -cat /user/test/user.csv,內(nèi)容如下:
1,zs,18 2,ls,19 4,wu,25 3,zl,22? 在ClickHouse上創(chuàng)建一個(gè)訪問(wèn)user.csv文件的表:
CREATE TABLE test_hdfs_csv(\id UInt64,\name String,\age UInt8\ )ENGINE = HDFS('hdfs://ambari01:8020/user/test/user.csv', 'CSV')? 查詢(xún)hdfs_books_csv表
SELECT * FROM test_hdfs_csv ┌─id─┬─name─┬─age─┐ │ 1 │ zs │ 18 │ │ 2 │ ls │ 19 │ │ 4 │ wu │ 25 │ │ 3 │ zl │ 22 │ └────┴──────┴─────┘3.2 從HDFS導(dǎo)入數(shù)據(jù)
? 從HDFS導(dǎo)入數(shù)據(jù),數(shù)據(jù)在ClickHouse本地表,建本地表
CREATE TABLE test_hdfs_local(\id UInt64,\name String,\age UInt8\ )ENGINE = Log? 在數(shù)據(jù)存儲(chǔ)目錄下可以找到這個(gè)表的文件夾
/data/clickhouse/data/test/test_hdfs_local? 從HDFS導(dǎo)入數(shù)據(jù)
INSERT INTO test_hdfs_local SELECT * FROM test_hdfs_csv? 查詢(xún)
SELECT * FROM test_hdfs_local ┌─id─┬─name─┬─age─┐ │ 1 │ zs │ 18 │ │ 2 │ ls │ 19 │ │ 4 │ wu │ 25 │ │ 3 │ zl │ 22 │ └────┴──────┴─────┘總結(jié)
以上是生活随笔為你收集整理的ClickHouse表引擎之Integration系列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: linux 跨服务器备份,用Backup
- 下一篇: 清华镜像源安装 NGboost XGbo