.NET Core 下使用 Kafka
安裝
CentOS 安裝 kafka
Kafka : http://kafka.apache.org/downloads
ZooLeeper : https://zookeeper.apache.org/releases.html
下載并解壓
#?下載,并解壓 $?wget?https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz $?tar?-zxvf??kafka_2.12-2.1.1.tgz $?mv?kafka_2.12-2.1.1.tgz?/data/kafka#?下載?zookeeper,解壓 $?wget?https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz $?tar?-zxvf?apache-zookeeper-3.5.8-bin.tar.gz $?mv?apache-zookeeper-3.5.8-bin?/data/zookeeper啟動 ZooKeeper
#?復制配置模版 $?cd?/data/kafka/conf $?cp?zoo_sample.cfg?zoo.cfg#?看看配置需不需要改 $?vim?zoo.cfg#?命令 $?./bin/zkServer.sh?start????#?啟動 $?./bin/zkServer.sh?status???#?狀態 $?./bin/zkServer.sh?stop?????#?停止 $?./bin/zkServer.sh?restart??#?重啟#?使用客戶端測試 $?./bin/zkCli.sh?-server?localhost:2181 $?quit啟動 Kafka
#?備份配置 $?cd?/data/kafka $?cp?config/server.properties?config/server.properties_copy#?修改配置 $?vim?/data/kafka/config/server.properties#?集群配置下,每個?broker?的?id?是必須不同的 #?broker.id=0#?監聽地址設置(內網) #?listeners=PLAINTEXT://ip:9092#?對外提供服務的IP、端口 #?advertised.listeners=PLAINTEXT://106.75.84.97:9092#?修改每個topic的默認分區參數num.partitions,默認是1,具體合適的取值需要根據服務器配置進程確定,UCloud.ukafka?=?3 #?num.partitions=3#?zookeeper?配置 #?zookeeper.connect=localhost:2181#?通過配置啟動?kafka $??./bin/kafka-server-start.sh??config/server.properties&#?狀態查看 $?ps?-ef|grep?kafka $?jpsdocker下安裝Kafka
docker?pull?wurstmeister/zookeeper docker?run?-d?--name?zookeeper?-p?2181:2181?wurstmeister/zookeeperdocker?pull?wurstmeister/kafka docker?run?-d?--name?kafka?--publish?9092:9092?--link?zookeeper?--env?KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181?--env?KAFKA_ADVERTISED_HOST_NAME=192.168.1.111?--env?KAFKA_ADVERTISED_PORT=9092?wurstmeister/kafka介紹
Broker:消息中間件處理節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群。
Topic:一類消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能夠同時負責多個topic的分發。
Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。
Segment:partition物理上由多個segment組成,下面2.2和2.3有詳細說明。
offset:每個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中。partition中的每個消息都有一個連續的序列號叫做offset,用于partition唯一標識一條消息。
kafka partition 和 consumer 數目關系
如果consumer比partition多是浪費,因為kafka的設計是在一個partition上是不允許并發的,所以consumer數不要大于partition數 。
如果consumer比partition少,一個consumer會對應于多個partitions,這里主要合理分配consumer數和partition數,否則會導致partition里面的數據被取的不均勻 。最好partiton數目是consumer數目的整數倍,所以partition數目很重要,比如取24,就很容易設定consumer數目 。
如果consumer從多個partition讀到數據,不保證數據間的順序性,kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不同
增減consumer,broker,partition會導致rebalance,所以rebalance后consumer對應的partition會發生變化
快速開始
在 .NET Core 項目中安裝組件
Install-Package Confluent.Kafka開源地址:https://github.com/confluentinc/confluent-kafka-dotnet
添加IKafkaService服務接口
public?interface?IKafkaService {///?<summary>///?發送消息至指定主題///?</summary>///?<typeparam?name="TMessage"></typeparam>///?<param?name="topicName"></param>///?<param?name="message"></param>///?<returns></returns>Task?PublishAsync<TMessage>(string?topicName,?TMessage?message)?where?TMessage?:?class;///?<summary>///?從指定主題訂閱消息///?</summary>///?<typeparam?name="TMessage"></typeparam>///?<param?name="topics"></param>///?<param?name="messageFunc"></param>///?<param?name="cancellationToken"></param>///?<returns></returns>Task?SubscribeAsync<TMessage>(IEnumerable<string>?topics,?Action<TMessage>?messageFunc,?CancellationToken?cancellationToken)?where?TMessage?:?class; }實現IKafkaService
public?class?KafkaService?:?IKafkaService {public?async?Task?PublishAsync<TMessage>(string?topicName,?TMessage?message)?where?TMessage?:?class{var?config?=?new?ProducerConfig{BootstrapServers?=?"127.0.0.1:9092"};using?var?producer?=?new?ProducerBuilder<string,?string>(config).Build();await?producer.ProduceAsync(topicName,?new?Message<string,?string>{Key?=?Guid.NewGuid().ToString(),Value?=?message.SerializeToJson()});}public?async?Task?SubscribeAsync<TMessage>(IEnumerable<string>?topics,?Action<TMessage>?messageFunc,?CancellationToken?cancellationToken)?where?TMessage?:?class{var?config?=?new?ConsumerConfig{BootstrapServers?=?"127.0.0.1:9092",GroupId?=?"consumer",EnableAutoCommit?=?false,StatisticsIntervalMs?=?5000,SessionTimeoutMs?=?6000,AutoOffsetReset?=?AutoOffsetReset.Earliest,EnablePartitionEof?=?true};//const?int?commitPeriod?=?5;using?var?consumer?=?new?ConsumerBuilder<Ignore,?string>(config).SetErrorHandler((_,?e)?=>{Console.WriteLine($"Error:?{e.Reason}");}).SetStatisticsHandler((_,?json)?=>{Console.WriteLine($"?-?{DateTime.Now:yyyy-MM-dd?HH:mm:ss}?>?消息監聽中..");}).SetPartitionsAssignedHandler((c,?partitions)?=>{string?partitionsStr?=?string.Join(",?",?partitions);Console.WriteLine($"?-?分配的?kafka?分區:?{partitionsStr}");}).SetPartitionsRevokedHandler((c,?partitions)?=>{string?partitionsStr?=?string.Join(",?",?partitions);Console.WriteLine($"?-?回收了?kafka?的分區:?{partitionsStr}");}).Build();consumer.Subscribe(topics);try{while?(true){try{var?consumeResult?=?consumer.Consume(cancellationToken);Console.WriteLine($"Consumed?message?'{consumeResult.Message?.Value}'?at:?'{consumeResult?.TopicPartitionOffset}'.");if?(consumeResult.IsPartitionEOF){Console.WriteLine($"?-?{DateTime.Now:yyyy-MM-dd?HH:mm:ss}?已經到底了:{consumeResult.Topic},?partition?{consumeResult.Partition},?offset?{consumeResult.Offset}.");continue;}TMessage?messageResult?=?null;try{messageResult?=?JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value);}catch?(Exception?ex){var?errorMessage?=?$"?-?{DateTime.Now:yyyy-MM-dd?HH:mm:ss}【Exception 消息反序列化失敗,Value:{consumeResult.Message.Value}】?:{ex.StackTrace?.ToString()}";Console.WriteLine(errorMessage);messageResult?=?null;}if?(messageResult?!=?null/*?&&?consumeResult.Offset?%?commitPeriod?==?0*/){messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch?(KafkaException?e){Console.WriteLine(e.Message);}}}catch?(ConsumeException?e){Console.WriteLine($"Consume?error:?{e.Error.Reason}");}}}catch?(OperationCanceledException){Console.WriteLine("Closing?consumer.");consumer.Close();}await?Task.CompletedTask;} }注入IKafkaService,在需要使用的地方直接調用即可。
public?class?MessageService?:?IMessageService,?ITransientDependency {private?readonly?IKafkaService?_kafkaService;public?MessageService(IKafkaService?kafkaService){_kafkaService?=?kafkaService;}public?async?Task?RequestTraceAdded(XxxEventData?eventData){await?_kafkaService.PublishAsync(eventData.TopicName,?eventData);} }以上相當于一個生產者,當我們消息隊列發出后,還需一個消費者進行消費,所以可以使用一個控制臺項目接收消息來處理業務。
var?cts?=?new?CancellationTokenSource(); Console.CancelKeyPress?+=?(_,?e)?=> {e.Cancel?=?true;cts.Cancel(); };await?kafkaService.SubscribeAsync<XxxEventData>(topics,?async?(eventData)?=> {//?Your?logicConsole.WriteLine($"?-?{eventData.EventTime:yyyy-MM-dd?HH:mm:ss}?【{eventData.TopicName}】-?>?已處理"); },?cts.Token);在IKafkaService中已經寫了訂閱消息的接口,這里也是注入后直接使用即可。
生產者消費者示例
生產者
static?async?Task?Main(string[]?args) {if?(args.Length?!=?2){Console.WriteLine("Usage:?..?brokerList?topicName");//?127.0.0.1:9092?helloTopicreturn;}var?brokerList?=?args.First();var?topicName?=?args.Last();var?config?=?new?ProducerConfig?{?BootstrapServers?=?brokerList?};using?var?producer?=?new?ProducerBuilder<string,?string>(config).Build();Console.WriteLine("\n-----------------------------------------------------------------------");Console.WriteLine($"Producer?{producer.Name}?producing?on?topic?{topicName}.");Console.WriteLine("-----------------------------------------------------------------------");Console.WriteLine("To?create?a?kafka?message?with?UTF-8?encoded?key?and?value:");Console.WriteLine(">?key?value<Enter>");Console.WriteLine("To?create?a?kafka?message?with?a?null?key?and?UTF-8?encoded?value:");Console.WriteLine(">?value<enter>");Console.WriteLine("Ctrl-C?to?quit.\n");var?cancelled?=?false;Console.CancelKeyPress?+=?(_,?e)?=>{e.Cancel?=?true;cancelled?=?true;};while?(!cancelled){Console.Write(">?");var?text?=?string.Empty;try{text?=?Console.ReadLine();}catch?(IOException){break;}if?(string.IsNullOrWhiteSpace(text)){break;}var?key?=?string.Empty;var?val?=?text;var?index?=?text.IndexOf("?");if?(index?!=?-1){key?=?text.Substring(0,?index);val?=?text.Substring(index?+?1);}try{var?deliveryResult?=?await?producer.ProduceAsync(topicName,?new?Message<string,?string>{Key?=?key,Value?=?val});Console.WriteLine($"delivered?to:?{deliveryResult.TopicPartitionOffset}");}catch?(ProduceException<string,?string>?e){Console.WriteLine($"failed?to?deliver?message:?{e.Message}?[{e.Error.Code}]");}} }消費者
static?void?Main(string[]?args) {if?(args.Length?!=?2){Console.WriteLine("Usage:?..?brokerList?topicName");//?127.0.0.1:9092?helloTopicreturn;}var?brokerList?=?args.First();var?topicName?=?args.Last();Console.WriteLine($"Started?consumer,?Ctrl-C?to?stop?consuming");var?cts?=?new?CancellationTokenSource();Console.CancelKeyPress?+=?(_,?e)?=>{e.Cancel?=?true;cts.Cancel();};var?config?=?new?ConsumerConfig{BootstrapServers?=?brokerList,GroupId?=?"consumer",EnableAutoCommit?=?false,StatisticsIntervalMs?=?5000,SessionTimeoutMs?=?6000,AutoOffsetReset?=?AutoOffsetReset.Earliest,EnablePartitionEof?=?true};const?int?commitPeriod?=?5;using?var?consumer?=?new?ConsumerBuilder<Ignore,?string>(config).SetErrorHandler((_,?e)?=>{Console.WriteLine($"Error:?{e.Reason}");}).SetStatisticsHandler((_,?json)?=>{Console.WriteLine($"?-?{DateTime.Now:yyyy-MM-dd?HH:mm:ss}?>?monitoring..");//Console.WriteLine($"Statistics:?{json}");}).SetPartitionsAssignedHandler((c,?partitions)?=>{Console.WriteLine($"Assigned?partitions:?[{string.Join(",?",?partitions)}]");}).SetPartitionsRevokedHandler((c,?partitions)?=>{Console.WriteLine($"Revoking?assignment:?[{string.Join(",?",?partitions)}]");}).Build();consumer.Subscribe(topicName);try{while?(true){try{var?consumeResult?=?consumer.Consume(cts.Token);if?(consumeResult.IsPartitionEOF){Console.WriteLine($"Reached?end?of?topic?{consumeResult.Topic},?partition?{consumeResult.Partition},?offset?{consumeResult.Offset}.");continue;}Console.WriteLine($"Received?message?at?{consumeResult.TopicPartitionOffset}:?{consumeResult.Message.Value}");if?(consumeResult.Offset?%?commitPeriod?==?0){try{consumer.Commit(consumeResult);}catch?(KafkaException?e){Console.WriteLine($"Commit?error:?{e.Error.Reason}");}}}catch?(ConsumeException?e){Console.WriteLine($"Consume?error:?{e.Error.Reason}");}}}catch?(OperationCanceledException){Console.WriteLine("Closing?consumer.");consumer.Close();} }總結
以上是生活随笔為你收集整理的.NET Core 下使用 Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 跟我一起学.NetCore之Asp.Ne
- 下一篇: 发现一款.NET Core开源爬虫神器: