Tranquility
- 與Kafka集群交互
- Druid使用Tranquility Kafka
本文以Kafka為例,介紹在E-MapReduce中如何使用Tranquility從Kafka集群采集數(shù)據(jù),并實(shí)時(shí)推送至Druid集群。
Tranquility是一個(gè)以push方式向Druid實(shí)時(shí)發(fā)送數(shù)據(jù)的應(yīng)用。它替用戶解決了分區(qū)、多副本、服務(wù)發(fā)現(xiàn)、防止數(shù)據(jù)丟失等多個(gè)問(wèn)題,簡(jiǎn)化了用戶使用Druid的難度。它支持多種數(shù)據(jù)來(lái)源,包括Samza、Spark、Storm、Kafka、Flink等等。
與Kafka集群交互
首先是Druid集群與Kafka集群的交互。兩個(gè)集群交互的配置方式大體和Hadoop集群類似,均需要設(shè)置連通性、hosts等。對(duì)于非安全Kafka集群,請(qǐng)按照以下步驟操作:? ? ? ? ? ? 之后將該配置文件同步到 Druid 集群的所有節(jié)點(diǎn)上,放置于某一個(gè)目錄下面(例如/tmp/kafka/kafka_client_jaas.conf)。
5、在 Druid 配置頁(yè)面的 overlord.jvm 里新增如下選項(xiàng):
Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf6、在 Druid 配置頁(yè)面的 middleManager.runtime 里配置druid.indexer.runner.javaOpts=-Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf和其他JVM啟動(dòng)參數(shù)。
7、重啟Druid服務(wù)。Druid使用Tranquility Kafka
由于Tranquility是一個(gè)服務(wù),它對(duì)于Kafka來(lái)說(shuō)是消費(fèi)者,對(duì)于Druid來(lái)說(shuō)是客戶端。您可以使用中立的機(jī)器來(lái)運(yùn)行Tranquility,只要這臺(tái)機(jī)器能夠同時(shí)連通 Kafka 集群和 Druid 集群即可。
1、Kafka端創(chuàng)建一個(gè)名為 pageViews 的 topic。
-- 如果開啟了kafka 高安全:export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"--./bin/kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2:2181,emr-header-3:2181/kafka-1.0.1 --partitions 1 --replication-factor 1 --topic pageViews2、下載 Tranquility 安裝包,并解壓至某一路徑下。
3、配置 datasource。
這里假設(shè)您的 topic name 為 pageViews,并且每條 topic 都是如下形式的 json 文件:
{"time": "2018-05-23T11:59:43Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}{"time": "2018-05-23T11:59:44Z", "url": "/", "user": "bob", "latencyMs": 11}{"time": "2018-05-23T11:59:45Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}
對(duì)應(yīng)的 dataSrouce 的配置如下:
{"dataSources" : {"pageViews-kafka" : {"spec" : {"dataSchema" : {"dataSource" : "pageViews-kafka","parser" : {"type" : "string","parseSpec" : {"timestampSpec" : {"column" : "time","format" : "auto"},"dimensionsSpec" : {"dimensions" : ["url", "user"],"dimensionExclusions" : ["timestamp","value"]},"format" : "json"}},"granularitySpec" : {"type" : "uniform","segmentGranularity" : "hour","queryGranularity" : "none"},"metricsSpec" : [{"name": "views", "type": "count"},{"name": "latencyMs", "type": "doubleSum", "fieldName": "latencyMs"}]},"ioConfig" : {"type" : "realtime"},"tuningConfig" : {"type" : "realtime","maxRowsInMemory" : "100000","intermediatePersistPeriod" : "PT10M","windowPeriod" : "PT10M"}},"properties" : {"task.partitions" : "1","task.replicants" : "1","topicPattern" : "pageViews"}}},"properties" : {"zookeeper.connect" : "localhost","druid.discovery.curator.path" : "/druid/discovery","druid.selectors.indexing.serviceName" : "druid/overlord","commit.periodMillis" : "15000","consumer.numThreads" : "2","kafka.zookeeper.connect" : "emr-header-1.cluster-500148518:2181,emr-header-2.cluster-500148518:2181, emr-header-3.cluster-500148518:2181/kafka-1.0.1","kafka.group.id" : "tranquility-kafka",}}4、運(yùn)行如下命令啟動(dòng) Tranquility。
./bin/tranquility kafka -configFile5、在 Kafka 端啟動(dòng) producer 并發(fā)送一些數(shù)據(jù)。
./bin/kafka-console-producer.sh --broker-list emr-worker-1:9092,emr-worker-2:9092,emr-worker-3:9092 --topic pageViews輸入:
{"time": "2018-05-24T09:26:12Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}{"time": "2018-05-24T09:26:13Z", "url": "/", "user": "bob", "latencyMs": 11}{"time": "2018-05-24T09:26:14Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}在Tranquility日志中查看相應(yīng)的消息,在Druid端則可以看到啟動(dòng)了相應(yīng)的實(shí)時(shí)索引 task。
—————————————————————————————— 原文:https://help.aliyun.com/document_detail/72704.html?
?
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/wynjauu/articles/10369007.html
總結(jié)
以上是生活随笔為你收集整理的Tranquility的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
 
                            
                        - 上一篇: Struts2上传文件的大小设置
- 下一篇: 我的开发框架(WinForm)
