kafka php 教程,php的kafka踩坑(一)
最近項目上有一個需要用到消息隊列的功能,從網上找了一些php相關的kafka使用的教程和博客,大抵都是安裝php的拓展librdkafka(這里就不講這個拓展的安裝方法了,搜一下還是有很多教程的),然后直接用這個拓展進行開發,但是我直接用這個拓展開發的時候,不知道為啥運行不起來,一直報錯(應該是我太菜了,哈哈哈哈哈哈)......我從github上找了一些相關的包想直接用一下,但是發現很多包都是幾年前的了,基本上都是kafka 0.x 版本的。
后面我用了enqueue/rdkafka 這個包,鏈接:github.com/php-enqueue…
下面展示一下我測試的代碼,只是能運行起來,但是還沒達到我想要的預期
生產者
$connFactory = new RdKafkaConnectionFactory([
'global' => [
'metadata.broker.list' => '127.0.0.1:9092',
'socket.timeout.ms' => '50'
]
]);
$context = $connFactory->createContext();
$message = $context->createMessage('hello world!');
$topic = $context->createTopic('app');
$context->createProducer()->send($topic, $message);
復制代碼
消費者
$config = [
'global' => [
'group.id' => uniqid('', true),
'metadata.broker.list' => '127.0.0.1:9092',
'enable.auto.commit' => 'false',
],
'topic' => [
// 設置從最后一個offset開始讀取消息,不會讀取到之前的消息
'auto.offset.reset' => 'latest',
],
];
$connFactory = new RdKafkaConnectionFactory($config);
$context = $connFactory->createContext();
$topic = $context->createTopic('app');
$consumer = $context->createConsumer($topic);
while (true) {
$message = $consumer->receive(30 * 1000);
if (!$message instanceof RdKafkaMessage && !$message instanceof Message) {
var_dump($message);
continue;
}
$consumer->acknowledge($message);
var_dump($message->getBody());
}
復制代碼
這樣是能運行起來的,能正常發送和接受數據,我實際情況是想做一個隊列,生產者有多個,往一個topic進行數據生產,然后有多個消費者在消費,但是這樣寫有個問題,我在啟動了多個消費者的時候,每個消費者都會接受到生產者發送過來的消息,更像是群體發布群體訂閱的形式,不是我想要的結果,我去網上找了其他的教程,有人說只要設置group_id不同就可以了,但我的group_id全部都是隨機的,不太可能一樣,按理來說是能實現的,但是就是不行,也試過用Queue去操作,但是還是會所有消費者都收到消息。
目前就只了解到了這個地方,還需要花點時間再看看,如果有大神看到的話,求指點一下!!
總結
以上是生活随笔為你收集整理的kafka php 教程,php的kafka踩坑(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 二元一次函数最值问题_初二上学期,一次函
- 下一篇: 什么是python自动化测试_pytho