Kafka系列2-producer和consumer报错
1)啟動生產者進程:
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
輸入消息:
this is msg
生產者進程報錯:
[plain] view plain copy
[2016-06-03 11:33:47,934] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient) [2016-06-03 11:33:49,554] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient) [2016-06-03 11:33:51,177] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient) [2016-06-03 11:33:53,398] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient)2)啟動消費者進程:
bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning
消費者進程報錯:
plain] view plain copy
-------------------------------------------------------------------- 注:如果你對python感興趣,我這有個學習Python基地,里面有很多學習資料,感興趣的+Q群:895817687 --------------------------------------------------------------------[2016-06-03 11:34:53,574] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,218.30.64.194,9092)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafkaproducerSyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-06-03 11:34:53,651] WARN [console-consumer-72675_zzs-1464924871670-2192d80a-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,218.30.64.194,9092))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafkaproducerSyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) ... 3 more [2016-06-03 11:35:14,916] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [BrokerEndPoint(0,218.30.64.194,9092)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafkaproducerSyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-06-03 11:35:14,918] WARN [console-consumer-72675_zzs-1464924871670-2192d80a-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,218.30.64.194,9092))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafkaproducerSyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) ... 3 more2 使用localhost啟動生產和消費進程:
1)啟動生產者進程:
[plain] view plain copy
[root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test this is msg [2016-06-03 11:44:16,932] WARN Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2016-06-03 11:44:18,255] WARN Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2016-06-03 11:44:18,648] WARN Error while fetching metadata with correlation id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2016-06-03 11:44:18,801] WARN Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2016-06-03 11:44:18,928] WARN Error while fetching metadata with correlation id 4 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2016-06-03 11:44:19,035] WARN Error while fetching metadata with correlation id 5 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2016-06-03 11:44:19,180] WARN Error while fetching metadata with correlation id 6 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2016-06-03 11:44:19,308] WARN Error while fetching metadata with correlation id 7 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)2)啟動消費者進程:
[plain] view plain copy
[root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning [2016-06-03 11:45:18,330] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,218.30.64.194,9092)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafkaproducerSyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-06-03 11:45:18,541] WARN [console-consumer-42554_zzs-1464925496436-ae2ee9c7-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,218.30.64.194,9092))] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafkaproducerSyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) ... 3 more3.解決問題
1)查看Kafka的配置文件,cat config/server.properties
[plain] view plain copy
zookeeper.connect=localhost:2181連接的zookeeper的為localhost,所以需要用localhost啟動生產和消費進程
2)查看kafka啟動的日志,發現
[plain] view plain copy
Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(218.30.64.194,9092,PLAINTEXT) (kafka.utils.ZkUtils)[plain] view plain copy
為什么啟動的broker的ip是 218.30.64.194
==> 沒有綁定Kafka啟動監聽的host信息
vi config/server.properties
[plain] view plain copy
listeners=PLAINTEXT://localhost:9092
3)重新啟動zookeeper、kafka、consumer、producer
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
在producer中輸入消息,可以在producer中消費:
[plain] view plain copy
[root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
this is msg
this is msg2
[plain] view plain copy
[root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
this is msg
this is msg2
this is msg3
this is mgs4
問題解決!
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的Kafka系列2-producer和consumer报错的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SSH批量部署服务
- 下一篇: 生产环境常见的HTTP状态码列表