首页后端开发PHPPHP中使用kafka的操作是什么?

PHP中使用kafka的操作是什么?

时间2024-03-26 15:52:03发布访客分类PHP浏览1056
导读:Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。这篇文章主要给大家介绍PHP中使用kafka的操作,文中示例代码介绍的非常详细,对大家学习和理解kafka的使用有一定的帮助,感兴趣的朋友接下来一起...

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。这篇文章主要给大家介绍PHP中使用kafka的操作,文中示例代码介绍的非常详细,对大家学习和理解kafka的使用有一定的帮助,感兴趣的朋友接下来一起跟随小编看看吧。

本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类.

以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php

?php
namespace App\Tools;
    
 
use Illuminate\Config\Repository;
    
 
use Illuminate\Support\Facades\DB;
    
use Monolog\Logger;
    
use Monolog\Handler\StreamHandler;
    
 
use Illuminate\Http\Request;

 
class Kafka
{
    
  public $broker_list = '127.0.0.1';
    //配置kafka,可以用逗号隔开多个kafka
  public $topic = 'test';
    //管道名称
  public $partition = 0;
    
 
  protected $producer = null;
    
  protected $consumer = null;

 
  public function __construct()
  {
    
    if (empty($this->
broker_list)) {
    
      throw new InvalidConfigException("broker not config");

    }
    
    $rk = new \RdKafka\Producer();

    if (empty($rk)) {
    
      throw new InvalidConfigException("producer error");

    }
    
    $rk->
    setLogLevel(LOG_DEBUG);
    
    if (!$rk->
    addBrokers($this->
broker_list)) {
    
      throw new InvalidConfigException("producer error");

    }
    
    $this->
    producer = $rk;

  }

 
  /**
   * 生产者
   * @param array $messages
   * @return mixed
   */
  public function send($messages = [],$topic)
  {
    
    $topic = $this->
    producer->
    newTopic($topic);
    
    return $topic->
    produce(RD_KAFKA_PARTITION_UA, $this->
    partition, json_encode($messages));

  }

 
  /**
   * 消费者
   */
  public function consumer($object, $callback){
    
    $conf = new \RdKafka\Conf();
    
    $conf->
    set('group.id', 0);
    
    $conf->
    set('metadata.broker.list', $this->
    broker_list);
    
 
    $topicConf = new \RdKafka\TopicConf();
    
    $topicConf->
    set('auto.offset.reset', 'smallest');
    
 
    $conf->
    setDefaultTopicConf($topicConf);
    
 
    $consumer = new \RdKafka\KafkaConsumer($conf);
    
 
    $consumer->
    subscribe([$this->
    topic]);
    
 
    echo "waiting for messages.....\n";

    while(true) {
    
      $message = $consumer->
    consume(120*1000);
    
      switch ($message->
err) {
    
        case RD_KAFKA_RESP_ERR_NO_ERROR:
          echo "message payload....";
    
          $object->
    $callback($message->
    payload);
    
          break;

      }
    
      sleep(1);

    }

  }

}
    
?>
    

在控制器中如何使用:

首先再头部导入这个类:use App\Tools\Kafka;

下面是使用生产者实例:

public function test(){
    
 
   $topic = 'tool';
    //输入使用管道名称
   $data['shop_id'] = 58;
    
   $data['bar_code']=586;
    
   $data['goods_num'] = 1;
    
   $data['goods_unit'] = '个';
    
 
$Kafka = new Kafka();
    
$Error_Msg = $Kafka->
    send($data,$topic);
    //传入数组会自动转换json
var_dump($Error_Msg);

 
 
  }
    

下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:

?php
 
$conf = new RdKafka\Conf();
    
 
$conf->
    set('group.id', 'myConsumerGroup');
    
 
$rk = new RdKafka\Consumer($conf);
    
$rk->
    addBrokers("localhost:9092");
    
 
$topicConf = new RdKafka\TopicConf();
    
$topicConf->
    set('auto.commit.interval.ms', 100);
    
$topicConf->
    set('offset.store.method', 'file');
    
$topicConf->
    set('offset.store.path', sys_get_temp_dir());
    
$topicConf->
    set('auto.offset.reset', 'smallest');
    
 
$topic = $rk->
    newTopic("tool", $topicConf);
    //读取的管道
 
// Start consuming partition 0
$topic->
    consumeStart(0, RD_KAFKA_OFFSET_STORED);

 
while (true) {
    
  $message = $topic->
    consume(0, 120*10000);
    
  switch ($message->
err) {
    
    case RD_KAFKA_RESP_ERR_NO_ERROR:
    //没有错误打印信息
      $message = json_decode(json_encode($message),true);
    
      $data = json_decode($message['payload'],true);
    
      var_dump($data);
    
      break;
    
    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
      echo "等待接收信息\n";
    
      break;
    
    case RD_KAFKA_RESP_ERR__TIMED_OUT:
      echo "超时\n";
    
      break;
    
    default:
      throw new \Exception($message->
    errstr(), $message->
    err);
    
      break;

  }
    
 sleep(1);

}
    
 
?>
    

关于php中使用kafka的操作就介绍到这,上述示例具有一定的借鉴价值,感兴趣的朋友可以参考,希望能对大家有帮助,想要了解更多kafka的使用,大家可以关注其它的相关文章。

文本转载自脚本之家

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!


若转载请注明出处: PHP中使用kafka的操作是什么?
本文地址: https://pptw.com/jishu/653601.html
webpack压缩打包html资源的方法是什么? Golang语言中自定义类型有什么,怎么样定义类型

游客 回复需填写必要信息