首页后端开发PHP基于 Hyperf + RabbitMQ + WebSocket 实现消息推送

基于 Hyperf + RabbitMQ + WebSocket 实现消息推送

时间2024-02-02 04:52:03发布访客分类PHP浏览988
导读:收集整理的这篇文章主要介绍了基于 Hyperf + RabbitMQ + WebSocket 实现消息推送,觉得挺不错的,现在分享给大家,也给大家做个参考。基于 Hyperf+ WebSocket +RabbITMQ 实现的一个简单大屏幕的...
收集整理的这篇文章主要介绍了基于 Hyperf + RabbitMQ + WebSocket 实现消息推送,觉得挺不错的,现在分享给大家,也给大家做个参考。

基于 Hyperf+ WebSocket +RabbITMQ 实现的一个简单大屏幕的消息推送。

思路

利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

WebSocket 服务

composer require hyPErf/websocket-server

配置文件 [config/autoload/server.php]

?phpreturn [    'mode' =>
     SWOOLE_PROCESS,    'servers' =>
     [        [            'name' =>
     'http',            'type' =>
     Server::SERVER_HTTP,            'host' =>
     '0.0.0.0',            'port' =>
     11111,            'sock_type' =>
     SWOOLE_SOCK_TCP,            'callbacks' =>
     [                SwooleEvent::ON_REQUEST =>
     [Hyperf\HttpServer\Server::class, 'onRequest'],            ],        ],        [            'name' =>
     'ws',            'type' =>
     Server::SERVER_WEBSOCKET,            'host' =>
     '0.0.0.0',            'port' =>
     12222,            'sock_type' =>
     SWOOLE_SOCK_TCP,            'callbacks' =>
     [                SwooleEvent::ON_HAND_SHAKE =>
     [Hyperf\WebSocketServer\Server::class, 'onHandShake'],                SwooleEvent::ON_MESSAGE =>
     [Hyperf\WebSocketServer\Server::class, 'onMessage'],                SwooleEvent::ON_CLOSE =>
     [Hyperf\WebSocketServer\Server::class, 'onClose'],            ],        ],    ],

WebSocket 服务器端代码示例

?phpdeclare(strict_types=1);
    /** * This file is part of Hyperf. * * @link     https://www.hyperf.io * @document https://doc.hyperf.io * @contact  group@hyperf.io * @license  https://github.COM/hyperf-cloud/hyperf/blob/master/LICENSE */namespace App\Controller;
    use Hyperf\Contract\OnCloseinterface;
    use Hyperf\Contract\OnMessageInterface;
    use Hyperf\Contract\OnOpenInterface;
    use Swoole\Http\Request;
    use Swoole\Server;
    use Swoole\Websocket\Frame;
    use Swoole\WebSocket\Server as WebSocketServer;
class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface{
    /**     * 发送消息     * @param WebSocketServer $server     * @param Frame $frame     */    public function onMessage(WebSocketServer $server, Frame $frame): void    {
            //心跳刷新缓存        $redis = $this->
    container->
    get(\Redis::class);
            //获取所有的客户端id        $fdList = $redis->
    sMembers('websocket_sjd_1');
            //如果当前客户端在客户端集合中,就刷新        if (in_array($frame->
fd, $fdList)) {
                $redis->
    sAdd('websocket_sjd_1', $frame->
    fd);
                $redis->
    expire('websocket_sjd_1', 7200);
        }
            $server->
    push($frame->
    fd, '@R_777_1679@: ' . $frame->
    data);
    }
    /**     * 客户端失去链接     * @param Server $server     * @param int $fd     * @param int $reactorId     */    public function onClose(Server $server, int $fd, int $reactorId): void    {
            //删掉客户端id        $redis = $this->
    container->
    get(\Redis::class);
            //移除集合中指定的value        $redis->
    sRem('websocket_sjd_1', $fd);
            var_dump('closed');
    }
    /**     * 客户端链接     * @param WebSocketServer $server     * @param Request $request     */    public function onOpen(WebSocketServer $server, Request $request): void    {
            //保存客户端id        $redis = $this->
    container->
    get(\Redis::class);
            $res1 = $redis->
    sAdd('websocket_sjd_1', $request->
    fd);
            VAR_dump($res1);
            $res = $redis->
    expire('websocket_sjd_1', 7200);
            var_dump($res);
            $server->
    push($request->
    fd, 'Opened');
    }
}
    

WebSocket 前端代码

function WebSockettest() {
        if ("WebSocket" in window) {
                console.LOG("您的浏览器支持 WebSocket!");
                var num = 0            // 打开一个 web socket            var ws = new WebSocket("ws://127.0.0.1:12222");
            ws.onopen = function () {
                    // Web Socket 已连接上,使用 send() 方法发送数据                //alert("数据发送中...");
                    //ws.send("发送数据");
            }
    ;
            window.setInterval(function () {
 //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开                var ping = {
"type": "ping"}
    ;
                    ws.send(JSON.stringify(ping));
            }
    , 5000);
            ws.onmessage = function (evt) {
                    var d = JSON.parse(evt.data);
                    console.log(d);
                if (d.code == 300) {
                    $(".address").text(d.address)                }
                if (d.code == 200) {
                        var v = d.data                    console.log(v);
                        num++                    var str = `div class="item">
                                        p>
${
v.recordOutTime}
    /p>
                                        p>
${
v.userOutName}
    /p>
                                        p>
${
v.userOutNum}
    /p>
                                        p>
${
v.doorOutName}
    /p>
                                    /div>
    `                    $(".tableHead").after(str)                    if (num >
 7) {
                        num--                        $(".table .item:nth-last-child(1)").remove()                    }
                }
            }
    ;
            ws.error = function (e) {
                console.log(e)                alert(e)            }
            ws.onclose = function () {
                    // 关闭 websocket                alert("连接已关闭...");
            }
    ;
        }
 else {
                alert("您的浏览器不支持 WebSocket!");
        }
    }
    

AMQP 组件

composer require hyperf/amqp

配置文件 [config/autoload/amqp.php]

?phpreturn [    'default' =>
     [        'host' =>
     'localhost',        'port' =>
     5672,        'user' =>
     'guest',        'password' =>
     'guest',        'vhost' =>
     '/',        'pool' =>
     [            'min_connections' =>
     1,            'max_connections' =>
     10,            'connect_timeout' =>
     10.0,            'wait_timeout' =>
     3.0,            'heartbeat' =>
     -1,        ],        'params' =>
     [            'insist' =>
     false,            'login_method' =>
     'AMQPLAIN',            'login_response' =>
     null,            'locale' =>
     'en_US',            'connection_timeout' =>
     3.0,            'read_write_timeout' =>
     6.0,            'context' =>
     null,            'keepalive' =>
     false,            'heartbeat' =>
     3,        ],    ],];
    

MQ 消费者代码

?phpdeclare(strict_types=1);
    namespace App\Amqp\Consumer;
    use Hyperf\Amqp\Annotation\Consumer;
    use Hyperf\Amqp\Message\ConsumerMessage;
    use Hyperf\Amqp\Result;
    use Hyperf\Server\Server;
    use Hyperf\Server\ServerFactory;
/** * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1) */class DemoConsumer extends ConsumerMessage{
    /**     * rabbmitMQ消费端代码     * @param $data     * @return string     */    public function consume($data): string    {
            print_r($data);
            //获取集合中所有的value        $redis = $this->
    container->
    get(\Redis::class);
            $fdList=$redis->
    sMembers('websocket_sjd_1');
            $server=$this->
    container->
    get(ServerFactory::class)->
    getServer()->
    getServer();
            foreach($fdList as $key=>
$v){
            if(!empty($v)){
                    $server->
    push((int)$v, $data);
            }
        }
            return Result::ACK;
    }
}
    

控制器代码

  /**     * test     * @return array     */    public function test()    {
            $data = array(            'code' =>
     200,            'data' =>
     [                'userOutName' =>
     'cCFlow',                'userOutNum' =>
     '9999',                'recordOutTime' =>
     date("Y-m-d H:i:s", time()),                'doorOutName' =>
     '教师公寓',            ]        );
            $data = \GuzzleHttp\json_encode($data);
            $message = new DemoProducer($data);
            $producer = ApplicationContext::getContainer()->
    get(Producer::class);
            $result = $producer->
    produce($message);
            var_dump($result);
            $user = $this->
    request->
    input('user', 'Hyperf');
            $method = $this->
    request->
    getMethod();
            return [            'method' =>
     $method,            'message' =>
 "{
$user}
    .",        ];
    }
    

最终效果

推荐教程:《PHP教程》

以上就是基于 Hyperf + RabbitMQ + WebSocket 实现消息推送的详细内容,更多请关注其它相关文章!

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!


若转载请注明出处: 基于 Hyperf + RabbitMQ + WebSocket 实现消息推送
本文地址: https://pptw.com/jishu/596384.html
PHP中如何使用PDO修改数据? ​PHP去掉字符串中的“#”

游客 回复需填写必要信息