大数据统计分析平台之一、Kafka单机搭建
1、zookeeper搭建
Kafka集群依賴zookeeper,需要提前搭建好zookeeper
單機模式(7步)(集群模式進階請移步:http://blog.51cto.com/nileader/795230)?Step1:
cd /usr/local/softwarejdk-8u161-linux-x64.rpm
鏈接:https://pan.baidu.com/s/1i6iHIDJ 密碼:bgcc
rpm -ivh jdk-8u161-linux-x64.rpm
vi /etc/profile
JAVA_HOME=/usr/java/jdk1.8.0_161
JRE_HOME=/usr/java/jdk1.8.0_161/jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH
source /etc/profile
echo $PATH
?
Step2:
# 下載zookeeper
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz
# 如果下載不到,可以使用迅雷,或者使用百度云盤
鏈接:https://pan.baidu.com/s/1MXYd4UlKWvqB6EcVLyF8cg 密碼:an6t
?
# 解壓
tar -zxvf zookeeper-3.4.11.tar.gz
# 移動一下
mv zookeeper-3.4.11 /usr/local/zookeeper-3.4.11
?
Step3:重命名 zoo_sample.cfg文件
?mv /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg??/usr/local/zookeeper-3.4.11/conf/zoo.cfg?Step4:vi /usr/local/zookeeper-3.4.11/conf/zoo.cfg,修改
dataDir=/usr/local/zookeeper-3.4.11/dataStep5:創建數據目錄
mkdir? /usr/local/zookeeper-3.4.11/data
Step6:啟動zookeeper:執行
/usr/local/zookeeper-3.4.11/bin/zkServer.sh?startStep7:檢測是否成功啟動:執行
/usr/local/zookeeper-3.4.11/bin/zkCli.sh或者
yum install nc -y
echo?stat| nc?localhost?2181
================================================================================================================
2、下載Kafka
下載地址:http://kafka.apache.org/downloads.html # mkdir -p /usr/local/software# cd /usr/local/software
# wget http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz
# 百度云下載地址:
鏈接:https://pan.baidu.com/s/1Kp0uD_5YjGKOLkbW_igm2g 密碼:v1q7
kafka_2.12-1.0.0.tgz ?? //其中2.12-1.0.0為Scala的版本,kafka-1.0.0-src.tgz為kafka版本 3、解壓 # tar zxf kafka_2.12-1.0.0.tgz -C /usr/local/ # cd /usr/local/ # mv kafka_2.12-1.0.0/ kafka/ 4、配置 mkdir -p /usr/local/kafka/kafkaLogs # vi /usr/local/kafka/config/server.properties
# broker的ID,集群中每個broker ID不可相同
broker.id=0
# 監聽器,端口號和port一致即可
listeners=PLAINTEXT:/10.10.6.225/:9092
# Broker的監聽端口
port=9092
# 必須填寫當前服務器IP地址
host.name=10.10.6.225
# 必須填寫當前服務器IP地址
advertised.host.name=10.10.6.225
# 暫未配置集群
zookeeper.connect=10.10.6.225:2181
# 消息持久化目錄
log.dirs=/usr/local/kafka/kafkaLogs
# 可以刪除主題
delete.topic.enable=true
# 關閉自動創建topic
auto.create.topics.enable=false
?
5、配置Kafka的環境變量 # vi /etc/profileexport KAFKA_HOME=/usr/local/kafkaexport PATH=$PATH:$KAFKA_HOME/bin # source /etc/profile# vi /etc/hosts
# es為主機名 ,這里一定要注意,是主機名!!!!重要的話說三次!!!!!!!!
127.0.0.1 es
10.10.6.225 es
6、啟動與停止Kafka # kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 官方推薦啟動方式: # /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
但這種方式退出shell后會自動斷開
停止:
kafka-server-stop.sh 7、驗證 # jps2608 Kafka2236 QuorumPeerMain
2687 Jps 看到Kafka的進程,說明Kafka已經啟動 8、創建topic 創建名為test,partitions為3,replication為3的topic # kafka-topics.sh --create --zookeeper 10.10.6.225:2181 --partitions 1 --replication-factor 1 --topic test 查看topic狀態 # kafka-topics.sh --describe --zookeeper 10.10.6.225:2181 --topic test
Topic:test????? PartitionCount:1??????? ReplicationFactor:1???? Configs:
?? Topic: test???? Partition: 0??? Leader: 0?????? Replicas: 0???? Isr: 0 刪除topic 執行如下命令 # kafka-topics.sh --delete --zookeeper 10.10.6.225:2181 --topic test 9、測試使用Kafka 發送消息 # kafka-console-producer.sh --broker-list 10.10.6.225:9092 --topic test 輸入以下信息: This is a message This is another message 接收消息 # kafka-console-consumer.sh --bootstrap-server 10.10.6.225:9092 --topic test --from-beginning
若看到上輸入的信息說明已經搭建成功。 更復雜配置參考: https://www.cnblogs.com/wangxiaoqiangs/p/7831990.html 黃海添加于2018-02-11?夜 鏈接:https://pan.baidu.com/s/1i6HnIzr 密碼:1soq KafkaProducer.py # http://kafka-python.readthedocs.io/en/master/ # 安裝辦法: # C:\Users\Administrator>pip install kafka-python # Collecting kafka-python # Downloading kafka_python-1.4.1-py2.py3-none-any.whl (235kB) # 100% |████████████████████████████████| 235kB 150kB/s # Installing collected packages: kafka-python # Successfully installed kafka-python-1.4.1 # http://blog.csdn.net/evankaka/article/details/52421314 from kafka import KafkaProducer
from Util.MySQLHelper import *
import json
producer = KafkaProducer(bootstrap_servers='10.10.6.225:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
db = MySQLHelper()
sql = "select ID,RESOURCE_ID_INT,RESOURCE_ID_CHAR,RESOURCE_TITLE,RESOURCE_TYPE_NAME,RESOURCE_FORMAT,RESOURCE_PAGE,CAST(CREATE_TIME AS CHAR) AS CREATE_TIME,DOWN_COUNT,FILE_ID,RESOURCE_TYPE,STRUCTURE_ID,PERSON_ID,PERSON_NAME,IDENTITY_ID from t_resource_info limit 100"
dt = db.query(sql)
print(len(dt))
for row in dt:
producer.send('t_resource_info', row)
producer.flush()
print('恭喜,完成!')
?
不依賴于MYSQL的數據提交:
import json from kafka import KafkaProducer import datetime# kafka的服務器位置 kafka_servers = '10.10.6.194:9092'# 日期的轉換器 class DateEncoder(json.JSONEncoder):def default(self, obj):if isinstance(obj, datetime.datetime):return obj.strftime('%Y-%m-%d %H:%M:%S')elif isinstance(obj, datetime.date):return obj.strftime("%Y-%m-%d")else:return json.JSONEncoder.default(self, obj)# 黃海定義的輸出信息的辦法,帶當前時間 def logInfo(msg):i = datetime.datetime.now()print(" %s %s" % (i, msg))# 統一的topic名稱 topicName = 'test'dt=[{"id":1,"name":"劉備"},{"id":2,"name":"關羽"},{"id":3,"name":"張飛"}]# kafka的生產者 producer = KafkaProducer(bootstrap_servers=kafka_servers)# # 將字段大寫轉為小寫 for row in dt:new_dics = {}for k, v in row.items():new_dics[k.lower()] = vjstr = json.dumps(new_dics, cls=DateEncoder)producer.send(topic=topicName, partition=0, value=jstr.encode('utf-8')) # 提交一下 producer.flush() print('恭喜,完成!')?
?
KafkaConsumer.py
from kafka import KafkaConsumer import timedef log(str):t = time.strftime(r"%Y-%m-%d_%H-%M-%S", time.localtime())print("[%s]%s" % (t, str))log('start consumer') # 消費192.168.120.11:9092上的world 這個Topic,指定consumer group是consumer-20171017 consumer = KafkaConsumer('foobar', bootstrap_servers=['localhost:9092']) for msg in consumer:recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)log(recv)?如果是想讀取kafka記得的所有消費記錄:
from kafka import KafkaConsumer import time# kafka的服務器位置 kafka_servers = '10.10.6.194:9092' # 統一的topic名稱 topicName = 'test'def log(str):t = time.strftime(r"%Y-%m-%d_%H-%M-%S", time.localtime())print("[%s]%s" % (t, str))log('啟動消費者...') # auto_offset_reset='earliest' 這個參數很重要,如果加上了,就是kafka記錄的最后一條位置,如果不加,就是以后要插入的數據了。 #consumer = KafkaConsumer(topicName, auto_offset_reset='earliest', bootstrap_servers=kafka_servers) consumer = KafkaConsumer(topicName, bootstrap_servers=kafka_servers) for msg in consumer:recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)log(recv)?
轉載于:https://www.cnblogs.com/littlehb/p/8438401.html
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的大数据统计分析平台之一、Kafka单机搭建的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 九阴真经 第二层 第9天
- 下一篇: 01进制之间的转换