PHP中使用kafka的操作是什么?
导读:Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。这篇文章主要给大家介绍PHP中使用kafka的操作,文中示例代码介绍的非常详细,对大家学习和理解kafka的使用有一定的帮助,感兴趣的朋友接下来一起...
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。这篇文章主要给大家介绍PHP中使用kafka的操作,文中示例代码介绍的非常详细,对大家学习和理解kafka的使用有一定的帮助,感兴趣的朋友接下来一起跟随小编看看吧。
本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类.
以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php
?php namespace App\Tools; use Illuminate\Config\Repository; use Illuminate\Support\Facades\DB; use Monolog\Logger; use Monolog\Handler\StreamHandler; use Illuminate\Http\Request; class Kafka { public $broker_list = '127.0.0.1'; //配置kafka,可以用逗号隔开多个kafka public $topic = 'test'; //管道名称 public $partition = 0; protected $producer = null; protected $consumer = null; public function __construct() { if (empty($this-> broker_list)) { throw new InvalidConfigException("broker not config"); } $rk = new \RdKafka\Producer(); if (empty($rk)) { throw new InvalidConfigException("producer error"); } $rk-> setLogLevel(LOG_DEBUG); if (!$rk-> addBrokers($this-> broker_list)) { throw new InvalidConfigException("producer error"); } $this-> producer = $rk; } /** * 生产者 * @param array $messages * @return mixed */ public function send($messages = [],$topic) { $topic = $this-> producer-> newTopic($topic); return $topic-> produce(RD_KAFKA_PARTITION_UA, $this-> partition, json_encode($messages)); } /** * 消费者 */ public function consumer($object, $callback){ $conf = new \RdKafka\Conf(); $conf-> set('group.id', 0); $conf-> set('metadata.broker.list', $this-> broker_list); $topicConf = new \RdKafka\TopicConf(); $topicConf-> set('auto.offset.reset', 'smallest'); $conf-> setDefaultTopicConf($topicConf); $consumer = new \RdKafka\KafkaConsumer($conf); $consumer-> subscribe([$this-> topic]); echo "waiting for messages.....\n"; while(true) { $message = $consumer-> consume(120*1000); switch ($message-> err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo "message payload...."; $object-> $callback($message-> payload); break; } sleep(1); } } } ?>
在控制器中如何使用:
首先再头部导入这个类:use App\Tools\Kafka;
下面是使用生产者实例:
public function test(){ $topic = 'tool'; //输入使用管道名称 $data['shop_id'] = 58; $data['bar_code']=586; $data['goods_num'] = 1; $data['goods_unit'] = '个'; $Kafka = new Kafka(); $Error_Msg = $Kafka-> send($data,$topic); //传入数组会自动转换json var_dump($Error_Msg); }
下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:
?php $conf = new RdKafka\Conf(); $conf-> set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf); $rk-> addBrokers("localhost:9092"); $topicConf = new RdKafka\TopicConf(); $topicConf-> set('auto.commit.interval.ms', 100); $topicConf-> set('offset.store.method', 'file'); $topicConf-> set('offset.store.path', sys_get_temp_dir()); $topicConf-> set('auto.offset.reset', 'smallest'); $topic = $rk-> newTopic("tool", $topicConf); //读取的管道 // Start consuming partition 0 $topic-> consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic-> consume(0, 120*10000); switch ($message-> err) { case RD_KAFKA_RESP_ERR_NO_ERROR: //没有错误打印信息 $message = json_decode(json_encode($message),true); $data = json_decode($message['payload'],true); var_dump($data); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "等待接收信息\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "超时\n"; break; default: throw new \Exception($message-> errstr(), $message-> err); break; } sleep(1); } ?>
关于php中使用kafka的操作就介绍到这,上述示例具有一定的借鉴价值,感兴趣的朋友可以参考,希望能对大家有帮助,想要了解更多kafka的使用,大家可以关注其它的相关文章。
文本转载自脚本之家
声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!
若转载请注明出处: PHP中使用kafka的操作是什么?
本文地址: https://pptw.com/jishu/653601.html