如何基于Hyperf实现RabbitMQ+WebSocket消息推送
导读:收集整理的这篇文章主要介绍了如何基于Hyperf实现RabbitMQ+WebSocket消息推送,觉得挺不错的,现在分享给大家,也给大家做个参考。介绍基于 Hyperf+ WebSocket +RabbITMQ 实现的一个简单大屏幕的消息推...
收集整理的这篇文章主要介绍了如何基于Hyperf实现RabbitMQ+WebSocket消息推送,觉得挺不错的,现在分享给大家,也给大家做个参考。介绍基于 Hyperf+ WebSocket +RabbITMQ 实现的一个简单大屏幕的消息推送。
思路
利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,
保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。
WebSocket 服务
composer require hyPErf/websocket-server
配置文件 [config/autoload/server.php]
?phpreturn [ 'mode' =>
SWOOLE_PROCESS, 'servers' =>
[ [ 'name' =>
'http', 'type' =>
Server::SERVER_HTTP, 'host' =>
'0.0.0.0', 'port' =>
11111, 'sock_type' =>
SWOOLE_SOCK_TCP, 'callbacks' =>
[ SwooleEvent::ON_REQUEST =>
[Hyperf\HttpServer\Server::class, 'onRequest'], ], ], [ 'name' =>
'ws', 'type' =>
Server::SERVER_WEBSOCKET, 'host' =>
'0.0.0.0', 'port' =>
12222, 'sock_type' =>
SWOOLE_SOCK_TCP, 'callbacks' =>
[ SwooleEvent::ON_HAND_SHAKE =>
[Hyperf\WebSocketServer\Server::class, 'onHandShake'], SwooleEvent::ON_MESSAGE =>
[Hyperf\WebSocketServer\Server::class, 'onMessage'], SwooleEvent::ON_CLOSE =>
[Hyperf\WebSocketServer\Server::class, 'onClose'], ], ], ],WebSocket 服务器端代码示例
?phpdeclare(strict_types=1);
/** * This file is part of Hyperf. * * @link https://www.hyperf.io * @document https://doc.hyperf.io * @contact group@hyperf.io * @license https://github.COM/hyperf-cloud/hyperf/blob/master/LICENSE */namespace App\Controller;
use Hyperf\Contract\OnCloseinterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;
class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface{
/** * 发送消息 * @param WebSocketServer $server * @param Frame $frame */ public function onMessage(WebSocketServer $server, Frame $frame): void {
//心跳刷新缓存 $redis = $this->
container->
get(\Redis::class);
//获取所有的客户端id $fdList = $redis->
sMembers('websocket_sjd_1');
//如果当前客户端在客户端集合中,就刷新 if (in_array($frame->
fd, $fdList)) {
$redis->
sAdd('websocket_sjd_1', $frame->
fd);
$redis->
expire('websocket_sjd_1', 7200);
}
$server->
push($frame->
fd, 'recv: ' . $frame->
data);
}
/** * 客户端失去链接 * @param Server $server * @param int $fd * @param int $reactorId */ public function onClose(Server $server, int $fd, int $reactorId): void {
//删掉客户端id $redis = $this->
container->
get(\Redis::class);
//移除集合中指定的value $redis->
sRem('websocket_sjd_1', $fd);
var_dump('closed');
}
/** * 客户端链接 * @param WebSocketServer $server * @param Request $request */ public function onOpen(WebSocketServer $server, Request $request): void {
//保存客户端id $redis = $this->
container->
get(\Redis::class);
$res1 = $redis->
sAdd('websocket_sjd_1', $request->
fd);
VAR_dump($res1);
$res = $redis->
expire('websocket_sjd_1', 7200);
var_dump($res);
$server->
push($request->
fd, 'Opened');
}
}
WebSocket 前端代码
function WebSockettest() {
if ("WebSocket" in window) {
console.LOG("您的浏览器支持 WebSocket!");
var num = 0 // 打开一个 web socket var ws = new WebSocket("ws://127.0.0.1:12222");
ws.onopen = function () {
// Web Socket 已连接上,使用 send() 方法发送数据 //alert("数据发送中...");
//ws.send("发送数据");
}
;
window.setInterval(function () {
//每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开 var ping = {
"type": "ping"}
;
ws.send(JSON.stringify(ping));
}
, 5000);
ws.onmessage = function (evt) {
var d = JSON.parse(evt.data);
console.log(d);
if (d.code == 300) {
$(".address").text(d.address) }
if (d.code == 200) {
var v = d.data console.log(v);
num++ var str = `div class="item">
p>
${
v.recordOutTime}
/p>
p>
${
v.userOutName}
/p>
p>
${
v.userOutNum}
/p>
p>
${
v.doorOutName}
/p>
/div>
` $(".tableHead").after(str) if (num >
7) {
num-- $(".table .item:nth-last-child(1)").remove() }
}
}
;
ws.error = function (e) {
console.log(e) alert(e) }
ws.onclose = function () {
// 关闭 websocket alert("连接已关闭...");
}
;
}
else {
alert("您的浏览器不支持 WebSocket!");
}
}
AMQP 组件
composer require hyperf/amqp
配置文件 [config/autoload/amqp.php]
?phpreturn [ 'default' =>
[ 'host' =>
'localhost', 'port' =>
5672, 'user' =>
'guest', 'password' =>
'guest', 'vhost' =>
'/', 'pool' =>
[ 'min_connections' =>
1, 'max_connections' =>
10, 'connect_timeout' =>
10.0, 'wait_timeout' =>
3.0, 'heartbeat' =>
-1, ], 'params' =>
[ 'insist' =>
false, 'login_method' =>
'AMQPLAIN', 'login_response' =>
null, 'locale' =>
'en_US', 'connection_timeout' =>
3.0, 'read_write_timeout' =>
6.0, 'context' =>
null, 'keepalive' =>
false, 'heartbeat' =>
3, ], ],];
MQ 消费者代码
?phpdeclare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Server\Server;
use Hyperf\Server\ServerFactory;
/** * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1) */class DemoConsumer extends ConsumerMessage{
/** * rabbmitMQ消费端代码 * @param $data * @return string */ public function consume($data): string {
print_r($data);
//获取集合中所有的value $redis = $this->
container->
get(\Redis::class);
$fdList=$redis->
sMembers('websocket_sjd_1');
$server=$this->
container->
get(ServerFactory::class)->
getServer()->
getServer();
foreach($fdList as $key=>
$v){
if(!empty($v)){
$server->
push((int)$v, $data);
}
}
return Result::ACK;
}
}
控制器代码
/** * test * @return array */ public function test() {
$data = array( 'code' =>
200, 'data' =>
[ 'userOutName' =>
'cCFlow', 'userOutNum' =>
'9999', 'recordOutTime' =>
date("Y-m-d H:i:s", time()), 'doorOutName' =>
'教师公寓', ] );
$data = \GuzzleHttp\json_encode($data);
$message = new DemoProducer($data);
$producer = ApplicationContext::getContainer()->
get(Producer::class);
$result = $producer->
produce($message);
var_dump($result);
$user = $this->
request->
input('user', 'Hyperf');
$method = $this->
request->
getMethod();
return [ 'method' =>
$method, 'message' =>
"{
$user}
.", ];
}
最终效果
推荐:《PHP教程》
以上就是如何基于Hyperf实现RabbitMQ+WebSocket消息推送的详细内容,更多请关注其它相关文章!
声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!
若转载请注明出处: 如何基于Hyperf实现RabbitMQ+WebSocket消息推送
本文地址: https://pptw.com/jishu/596280.html
