php kafka storm,php的kafka踩坑(二)
接上一篇文章,上次沒有解決的一個問題就是在做一個隊列的時候,存在多消費者消費到同一個消息的情況,今天終于解決了這個問題,問題的本質是因為運維給我創建的topic是有問題的,他創建的分區數量是0,我今天上容器看了一下,終于發現了,然后刪了自己重新建了一個,具體容器操作kafka的topic教程可以看我另一個文檔基于kafka容器操作topic。
在這里,我們從頭開始介紹一下topic(主題),partition(分區),group(分組),consumer(消費者),producer(生產者)的關系
producer,生產者,生產數據
consumer,消費者,消費數據
topic,簡單點說就是一個隊列,生產者生產數據和消費者消費數據都必須指定一個Topic,就是生產的數據要放到哪個隊列去給消費者消費
partition和group,一個topic可以配置多個partition,consumer消費數據時是按照group來消費的,kafka確保每個partition只能由同一個group中的同一個consumer消費,如果想要重復消費,那么需要其他的組來消費,所以同一個group的消費者數量應當小于等于partition數量。Zookeerper中保存這每個topic下的每個partition在每個group中消費的offset。(此段介紹引用這篇文章)
consumer讀取時,會指定讀取的group,同一個消息在同一個group下只會讀取到一次,如果要重復消費數據,需要新建group
如果group只有一個,并且有多個partition,一個consumer時,所有partition里的消息都會發往該consumer,如果consumer不止一個時,可能會存在有的consumer里面數據消費的多,有的消費的少,做多消費者的隊列就是用這個特性,當partition數量=consuerm時,消息可以達到負載均衡。
下面上一下下代碼,修改之后topic的partition是3個,依舊是基于 enqueue/rdkafka 這個包
生產者,不指定partition,kafka會自動分配
$connFactory = new RdKafkaConnectionFactory([
'global' => [
'metadata.broker.list' => '127.0.0.1:9092',
'socket.timeout.ms' => '50'
]
]);
$context = $connFactory->createContext();
$topic = $context->createQueue('app');
for ($i = 0; $i <= 5; $i++) {
$message = $context->createMessage('hello world!' . $i);
$context->createProducer()->send($topic, $message);
}
復制代碼消費者
$config = [
'global' => [
'group.id' => date('Ymd'), // 指定一個分區,分區名自定義,做隊列分區名必須一樣
'metadata.broker.list' => '127.0.0.1:9092',
'enable.auto.commit' => 'false',
],
'topic' => [
'auto.offset.reset' => 'latest',
],
];
$connFactory = new RdKafkaConnectionFactory($config);
$context = $connFactory->createContext();
$queue = $context->createQueue('app');
$consumer = $context->createConsumer($queue);
while (true) {
$message = $consumer->receive(30 * 1000);
if (!is_object($message)) {
continue;
}
var_dump($message->getBody());
$consumer->acknowledge($message);
$consumer->reject($message);
}
復制代碼
啟動三個消費者,運行一次生產者,可以得到以下結果
消費者1
string(13) "hello world!1"
string(13) "hello world!3"
string(13) "hello world!4"
string(13) "hello world!5"
復制代碼
消費者2
string(13) "hello world!2"
復制代碼
消費者3
string(13) "hello world!0"
復制代碼
到此就實現了kafka做隊列的需求了,本文內容就到這里,相關kafka知識可以看這篇文章。
關于找一找教程網
本站文章僅代表作者觀點,不代表本站立場,所有文章非營利性免費分享。
本站提供了軟件編程、網站開發技術、服務器運維、人工智能等等IT技術文章,希望廣大程序員努力學習,讓我們用科技改變世界。
[php的kafka踩坑(二)]http://www.zyiz.net/tech/detail-125723.html
總結
以上是生活随笔為你收集整理的php kafka storm,php的kafka踩坑(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: vrp车辆路径问题 php,车辆路径问题
- 下一篇: 可以检测心跳!华为新耳机消息流出 和P6