首页后端开发PHPPHP 简单实现延时操作

PHP 简单实现延时操作

时间2024-02-02 06:38:03发布访客分类PHP浏览100
导读:收集整理的这篇文章主要介绍了PHP 简单实现延时操作,觉得挺不错的,现在分享给大家,也给大家做个参考。 场景在业务中有时会碰到延迟...
收集整理的这篇文章主要介绍了PHP 简单实现延时操作,觉得挺不错的,现在分享给大家,也给大家做个参考。


场景

在业务中有时会碰到延迟操作,如下单后半小时未支付则取消订单、下单后十五分钟未支付则发短信提醒等等。那这样的需求如何去实现呢。

相关学习推荐:PHP编程从入门到精通

实现方式

  • 第一个简单的方式就是用一个后台进程死循环去查订单,根据下单时间去做不同的操作
  • 第二种就是使用消息队列的定时消息,下单之后发送定时消息,不同的定时队列去处理不同的逻辑
  • 第三种可以使用框架提供的一些既有功能去做

实现代码

我们以订单创建15分钟后未支付,给用户发送邮件为场景进行学习

准备工作:

  1. 简单的订单表:order
  2. 各种需要的composer包
  3. rabbITMq本地服务
  4. 开通阿里云RocketMq服务

第一种

  • 代码逻辑很简单就直接死循环就行了
  • 启动这个脚本进程,可以用suPErvisor配置
  • 部分代码
//创建订单的逻辑/** * 随机创建订单 */$order = [    'order_number' =>
     mt_rand(100,10000).date("YmdHis"),    'user_id' =>
     mt_rand(1, 100),    'order_amount' =>
     mt_rand(100, 1000),];
        /**@VAR $manager Illuminate\Database\Capsule\Manager **/    $conn = $manager;
    $insertResult = $conn::table("order")    ->
    insert($order);
    PRint_r($insertResult);
    

延迟处理逻辑

while(true) {
        // 未支付订单列表    $orderList = $conn::table("order")        ->
    where("created_time",  '=', date("Y-m-d H:i:s", strtotime("-15 minutes")))        ->
    where('sended_need_pay_notify', '=', 2)        ->
    where('status', '=', 1)        ->
    select(['user_id', 'id'])        ->
    orderBy("id", 'asc')        ->
    get();
        $orderList = json_decode(json_encode($orderList), true);
    foreach ($orderList as $orderInfo) {
            sendEmail($orderInfo['user_id']);
            $conn::table('order')            ->
    where('id', '=', $orderInfo['id'])            ->
    update(['sended_need_pay_notify' =>
     1]);
            LOGs("update-success-orderId-". $orderInfo['id']."-userId-".$orderInfo['user_id']);
    }
        sleep(10);
}
    

执行处理脚本

gaoz@nobodyMBP delay_mq_demo % php First_while_handler.phpsend email to 73 success ...2020-06-24 11:37:36:update-success-orderId-3-userId-73

这种方式吧实现简单,但是不优雅,同时大批量订单产生也会遇到问题。

第二种

  • 比如使用阿里云的MQ服务,目前rocketMq与rabbitMq版本支持延迟消息,但是rabbit的延时消息收费太高了
  • 这里先使用rocketMq的延迟消息去实现
  • 需要开通阿里云的服务
// 创建订单的逻辑try        {
                /**             * 随机创建订单             */            $order = [                'order_number' =>
     mt_rand(100,10000).date("YmdHis"),                'user_id' =>
     mt_rand(1, 100),                'order_amount' =>
     mt_rand(100, 1000),            ];
                /**@var $manager Illuminate\Database\Capsule\Manager **/            $conn = $manager;
                $insertId = $conn::table("order")                ->
    insertGetId($order);
                $body = json_encode(['order_id' =>
     $insertId, 'created_time' =>
     date("Y-m-d H:i:s")]);
                $publishMessage = new TopicMessage(                $body            );
                // 设置消息KEY            $publishMessage->
    setMessageKey("messageKey");
                // 定时消息, 定时时间为3分钟后            $publishMessage->
    setStartDeliverTime(time() * 1000 + 3 * 60 * 1000);
                $result = $this->
    producer->
    publishMessage($publishMessage);
                print "Send mq message success. msgId is:" . $result->
    getMessageid() . ", bodyMD5 is:" . $result            -            >
    getMessageBodyMD5() . "\n";
        }
 catch (\Exception $e) {
                print_r($e->
    getMessage() . "\n");
        }
    

消费逻辑 同样是在消费者中处理

foreach ($messages as $message) {
                    $receiptHandles[] = $message->
    getReceiptHandle();
                    $messageBody = $message->
    getMessageBody();
                    $orderInfo = json_decode($messageBody, true);
                if (!empty($orderInfo['order_id'])) {
                        $orderId = $orderInfo['order_id'];
                        /**@var $manager Illuminate\Database\Capsule\Manager * */                    $conn = $manager;
                        $orderInfo = $conn::table("order")                        ->
    select(['id', 'user_id'])                        ->
    where('id', '=', $orderId)                        ->
    where('status', '=', 1)                        ->
    first();
                    if (!empty($orderInfo)) {
                            $orderInfo = json_decode(json_encode($orderInfo), true);
                            sendEmail($orderInfo['user_id']);
                            $conn::table('order')                            ->
    where('id', '=', $orderInfo['id'])                            ->
    update(['sended_need_pay_notify' =>
     1]);
                            logs("update-success-orderId-" . $orderInfo['id'] .                         "-userId-" . $orderInfo['user_id']);
                    }
                }
            }
    

启动生产一条消息

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_producer.php Send mq message success. msgId is:76Cf2135696C3D4EAC698A9FA1E1879D, bodyMD5 is:63448B50AA7B8AF47B07AA7CE807E3D3gaoz@nobodyMBP delay_mq_demo %

启动消费者慢慢等待

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_consumer.php No message, contine long polling!RequestId:5EF752583441411C74869BA9No message, contine long polling!RequestId:5EF7525B3441411C74869FE2No message, contine long polling!RequestId:5EF7525E3441411C7486A42CNo message, contine long polling!RequestId:5EF752613441411C7486A7D9consume finish, messages:send email to 95 success ...2020-06-27 12:08:05:update-success-orderId-8-userId-95 Array(    [0] =>
     76CF2135696C3D4EAC698A9FA1E1879D-MCAxNTkzMjY2NzkxNDM5IDMwMDAwMCAzIDAgYmpzaGFyZTUtMDggNSAw)    ack

这种方式有现有的服务可以使用,减少开发时间

第三种 使用rabbitMq去实现

  • 查阅文档没有找到rabbitMq支持延迟队列的原生功能,但是可以通过消息的TTL+死信队列实现
  • 私信队列就是用来存放没有被消费或者消费失败等消息的队列
  • 当设置消息的有效期内没有被消费消息就会被转发到死信队列
  • 通过设置消息的有效期实现延时功能
// 生产者$exchange = 'order15min_notify_exchange';
    $queue = 'order15minx_notify_queue';
    $dlxExchange = "dlx_order15min_exchange";
    $dlxQueue = "dlx_order15min_queue";
    $connection = new AMQPStreamConnection(getenv('RABBIT_HOST'), getenv('RABBIT_PORT'), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
    $channel = $connection->
    channel();
    $channel->
    exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
    $channel->
    exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);
    // 设置队列的过期时间// 正常队列$table = new \PhPAMqpLib\Wire\AMQPTable();
    // 消息有效期$table->
    set('x-message-ttl', 3*60*1000);
    $table->
    set("x-dead-letter-exchange", $dlxExchange);
    $channel->
    queue_declare($queue, false, true, false, false, false, $table);
    $channel->
    queue_bind($queue, $exchange);
    // 死信队列$channel->
    queue_declare($dlxQueue, false, true, false, false, false);
    $channel->
    queue_bind($dlxQueue, $dlxExchange);
    /** * 随机创建订单 */$order = [    'order_number' =>
     mt_rand(100,10000).date("YmdHis"),    'user_id' =>
     mt_rand(1, 100),    'order_amount' =>
     mt_rand(100, 1000),];
    /**@var $manager Illuminate\Database\Capsule\Manager **/$conn = $manager;
    $insertId = $conn::table("order")    ->
    insertGetId($order);
    $messageBody = json_encode(['order_id' =>
     $insertId, 'created_time' =>
     date("Y-m-d H:i:s")]);
        $message = new AMQPMessage($messageBody, array('content_type' =>
     'text/plain', 'delivery_mode' =>
     AMQPMessage::DELIVERY_MODE_PERSISTENT));
        $channel->
    basic_publish($message, $exchange);
    

消费者

$dlxExchange = "dlx_order15min_exchange";
    $dlxQueue = "dlx_order15min_queue";
    $connection = new AMQPStreamConnection(getenv('RABBIT_HOST'), getenv('RABBIT_PORT'), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
    $channel = $connection->
    channel();
    $channel->
    queue_declare($dlxQueue, false, true, false, false);
    $channel->
    exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);
    $channel->
    queue_bind($dlxQueue, $dlxExchange);
/** * @param \PhpAmqpLib\Message\AMQPMessage $message */function process_message($message){
        echo "\n--------\n";
        echo $message->
    body;
        echo "\n--------\n";
        $orderInfo = json_decode($message->
    body, true);
    if (!empty($orderInfo['order_id'])) {
            $orderId = $orderInfo['order_id'];
            /**@var $conn Illuminate\Database\Capsule\Manager * */        $conn = getdb();
            $orderInfo = $conn::table("order")            ->
    select(['id', 'user_id'])            ->
    where('id', '=', $orderId)            ->
    where('status', '=', 1)            ->
    first();
        if (!empty($orderInfo)) {
                $orderInfo = json_decode(json_encode($orderInfo), true);
                sendEmail($orderInfo['user_id']);
                $conn::table('order')                ->
    where('id', '=', $orderInfo['id'])                ->
    update(['sended_need_pay_notify' =>
     1]);
                logs("update-success-orderId-" . $orderInfo['id'] . "-userId-" . $orderInfo['user_id']);
        }
    }
        $message->
    delivery_info['channel']->
    basic_ack(        $message->
    delivery_info['delivery_tag']);
}
    $channel->
    basic_consume($dlxQueue, $consumerTag, false, false, false, false, 'process_message');
    

启动消费者

gaoz@nobodyMBP delay_mq_demo % php rabbit_mq_handler_consumer.php--------{
"order_id":7,"created_time":"2020-06-27 11:50:08"}
    --------send email to 2 success ...2020-06-27 11:56:55:update-success-orderId-7-userId-2

分别启动消费者、生产者就可以了,这里面消息的流转可以看到

消息先进入到正常队列,过期后进入了死信队列而被消费

第四种

  • 使用laravel自带的Queue去实现
  • 这里没有整理详细代码,后面更新出来
  • 可以查看官方文档 队列《Laravel 5.7 中文文档》

代码示例:github.COM/nobody05/delay_mq_demo

以上就是PHP 简单实现延时操作的详细内容,更多请关注其它相关文章!

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!


若转载请注明出处: PHP 简单实现延时操作
本文地址: https://pptw.com/jishu/596490.html
学习PHP死循环写法和作用 全方位解读php8.0版本优化与改进

游客 回复需填写必要信息