PHP实现RabbitMQ消息列队的方法是怎样
业务场景
项目公司是主php做开发的,框架为thinkphp。众所周知,php本身的运行效率存在一定的缺陷,所以如果有一个很复杂很耗时的业务时,必须开发一个常驻内存的程序。首先我想到了php的workerman与swoole,但是这里应上面的标题哈,想将耗时任务交给另一个服务器,同时列队处理。所以这里我想独立部署一个rabbitmq服务器用于处理列队任务。
当rabbitmq服务器我们准备好了,建立了一个持久化命名为ceshi的列队,如下:
项目上生产者和消费者的开发我这里全部采用tinkphp6+workerman,为便于管理。这里这么做也是因为发现workerman中对rabbitmq的文档解释太少了!
所以开始踩坑!
1、首先部署好thinkphp6框架
过程去看
2、安装workerman扩展
过程去看
3、生产者
配置一个workerman类
创建的send类代码如下:
?php namespace app\workerman; use bunny\channel; use workerman\rabbitmq\client; use think\worker\server; class send extends server { //websocket地址,一会用于测试。 protected $socket = 'websocket://127.0.0.1:2345'; /** * 收到信息 * @param $connection * @param $data */ public function onmessage($connection, $data) { //websocket发送过来的消息 $connection-> send('我收到你的信息了:'.$data); //rabbitmq配置 $options = [ 'host'=> '127.0.0.1',//rabbitmq ip 'port'=> 5672,//rabbitmq 通讯端口 'user'=> 'admin',//rabbitmq 账号 'password'=> '123456'//rabbitmq 密码 ]; (new client($options))-> connect()-> then(function (client $client) { return $client-> channel(); } )-> then(function (channel $channel) { /** * 创建队列(queue) * name: ceshi // 队列名称 * passive: false // 如果设置true存在则返回ok,否则就报错。设置false存在返回ok,不存在则自动创建 * durable: true // 是否持久化,设置false是存放到内存中rabbitmq重启后会丢失, * 设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的queue * exclusive: false // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除 * auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除 */ return $channel-> queuedeclare('ceshi', false, true, false, false)-> then(function () use ($channel) { return $channel; } ); } )-> then(function (channel $channel) use($data){ echo "发送消息内容:".$data."\n"; /** * 发送消息 * body 发送的数据 * headers 数据头,建议 ['content_type' => 'text/plain'],这样消费端是springboot注解接收直接是字符串类型 * exchange 交换器名称 * routingkey 路由key * mandatory * immediate * @return bool|promiseinterface|int */ return $channel-> publish($data, ['content_type' => 'text/plain'], '', 'ceshi')-> then(function () use ($channel) { return $channel; } ); } )-> then(function (channel $channel) { //echo " [x] sent 'hello world!'\n"; $client = $channel-> getclient(); return $channel-> close()-> then(function () use ($client) { return $client; } ); } )-> then(function (client $client) { $client-> disconnect(); } ); } /** * 当连接建立时触发的回调函数 * @param $connection */ public function onconnect($connection) { } /** * 当连接断开时触发的回调函数 * @param $connection */ public function onclose($connection) { } /** * 当客户端的连接上发生错误时触发 * @param $connection * @param $code * @param $msg */ public function onerror($connection, $code, $msg) { echo "error $code $msg\n"; } /** * 每个进程启动 * @param $worker */ public function onworkerstart($worker) { } }
上述都ok以后咱们可以项目路径下通过命令启动这个生产者:
php think worker:server
测试发送数据:
通过这个网站
连接【ws://127.0.0.1:2345】后发送数据!
前往rabbitmq控制台
列队中有一条消息产生并且等待了!
这个时候你可能问,如果我发送数据不想通过ws发送而是接口发送怎么办?
笨思路呗:接口给内置服务器发消息-> 内置服务去发消息给rabbitmq
将协议改为tcp
然后重新启动服务
然后去tp6创建一个路由接口
接口代码
?php namespace app\controller; use app\basecontroller; class index extends basecontroller { public function index(string $msg) { //连接本地tcp服务 $client = stream_socket_client('tcp://127.0.0.1:2345', $errno, $errmsg, 1); //发送字符串 fwrite($client, $msg."\n"); //断开服务 fclose($client); return 'ok'; } }
执行结果:
说明接口成功的将数据发送给了本地内置的tcp服务。
同时,内置服务将收到的数据给了rabbitmq服务列队中。
生产者完成。
4、消费者
同生产者一样新创建一个thinkphp6及安装workerman扩展,注意端口别和生产者冲突!这里我设置的是2346端口
创建的receive类代码如下:
?php namespace app\workerman; use bunny\channel; use bunny\message; use workerman\rabbitmq\client; use think\worker\server; class receive extends server { protected $socket = 'tcp://127.0.0.1:2346'; /** * 收到信息 * @param $connection * @param $data */ public function onmessage($connection, $data) { } /** * 当连接建立时触发的回调函数 * @param $connection */ public function onconnect($connection) { } /** * 当连接断开时触发的回调函数 * @param $connection */ public function onclose($connection) { } /** * 当客户端的连接上发生错误时触发 * @param $connection * @param $code * @param $msg */ public function onerror($connection, $code, $msg) { echo "error $code $msg\n"; } /** * 每个进程启动 * @param $worker */ public function onworkerstart($worker) { //rabbitmq配置 $options = [ 'host'=> '127.0.0.1',//rabbitmq ip 'port'=> 5672,//rabbitmq 通讯端口 'user'=> 'admin',//rabbitmq 账号 'password'=> '123456'//rabbitmq 密码 ]; (new client($options))-> connect()-> then(function (client $client) { return $client-> channel(); } )-> then(function (channel $channel) { /** * 创建队列(queue) * name: ceshi // 队列名称 * passive: false // 如果设置true存在则返回ok,否则就报错。设置false存在返回ok,不存在则自动创建 * durable: true // 是否持久化,设置false是存放到内存中rabbitmq重启后会丢失, * 设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的queue * exclusive: false // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除 * auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除 */ return $channel-> queuedeclare('ceshi', false, true, false, false)-> then(function () use ($channel) { return $channel; } ); } )-> then(function (channel $channel) { echo ' [*] waiting for messages. to exit press ctrl+c', "\n"; $channel-> consume( function (message $message, channel $channel, client $client) { echo "接收消息内容:", $message-> content, "\n"; } , 'ceshi', '', false, true ); } ); } }
都ok以后咱们可以项目路径下通过命令启动这个消费者:
php think worker:server
此时应该会自动消费掉rabbitmq中等待的消息!
到这里消费者也就结束啦!
5、整体测试
接下来我用cmd来启动两个服务,然后用接口发送消息和消费测试!
至于具体怎么灵活应用自行开拓大脑哦~
比如php项目有些业务吃力,可以去做个java的消费端,让java来完成任务~
到此这篇关于“PHP实现RabbitMQ消息列队的方法是怎样”的文章就介绍到这了,感谢各位的阅读,更多相关PHP实现RabbitMQ消息列队的方法是怎样内容,欢迎关注网络资讯频道,小编将为大家输出更多高质量的实用文章!
声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!
若转载请注明出处: PHP实现RabbitMQ消息列队的方法是怎样
本文地址: https://pptw.com/jishu/652026.html