首页后端开发PHPPHP 框架 Hyperf 实现处理超时未支付订单和延时队列

PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

时间2024-02-02 03:41:03发布访客分类PHP浏览786
导读:收集整理的这篇文章主要介绍了PHP 框架 Hyperf 实现处理超时未支付订单和延时队列,觉得挺不错的,现在分享给大家,也给大家做个参考。延时队列@H_360_11@DelayPRoducer.Phpamqpbuilder.PhpAmqpB...
收集整理的这篇文章主要介绍了PHP 框架 Hyperf 实现处理超时未支付订单和延时队列,觉得挺不错的,现在分享给大家,也给大家做个参考。

延时队列

  • @H_360_11@DelayPRoducer.Php

  • amqpbuilder.Php

AmqpBuilder.php

?phpdeclare(strict_tyPEs = 1);
    namespace App\components\Amqp;
    use Hyperf\Amqp\Builder\Builder;
    use Hyperf\Amqp\Builder\QueueBuilder;
class AmqpBuilder extends QueueBuilder{
    /**     * @param array|\PhPAMqpLib\Wire\AMQPTable $arguments     *     * @return \Hyperf\Amqp\Builder\Builder     */    public function setarguments($arguments) : Builder    {
            $this->
    arguments = array_merge($this->
    arguments, $arguments);
            return $this;
    }
    /**     * 设置延时队列相关参数     *     * @param string $queueName     * @param int    $XMessageTTL     * @param string $xDeadLetterExchange     * @param string $xDeadLetterRoutingKey     *     * @return $this     */    public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self    {
            $this->
    setArguments([            'x-message-ttl'             =>
     ['I', $xMessageTtl * 1000], // 毫秒            'x-dead-letter-exchange'    =>
     ['S', $xDeadLetterExchange],            'x-dead-letter-routing-key' =>
     ['S', $xDeadLetterRoutingKey],        ]);
            $this->
    setQueue($queueName);
            return $this;
    }
}
    

DelayProducer.php

?phpdeclare(strict_types = 1);
    namespace App\Components\Amqp;
    use Hyperf\Amqp\Annotation\Producer;
    use Hyperf\Amqp\Builder;
    use Hyperf\Amqp\Message\ProducerMessageinterface;
    use Hyperf\Di\Annotation\AnnotationCollector;
    use PhpAmqpLib\Message\AMQPMessage;
    use Throwable;
class DelayProducer extends Builder{
    /**     * @param ProducerMessageInterface $producerMessage     * @param AmqpBuilder              $queueBuilder     * @param bool                     $confirm     * @param int                      $timeout     *     * @return bool     * @throws \Throwable     */    public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool    {
        return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)        {
                return $this->
    produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
        }
    );
    }
    /**     * @param ProducerMessageInterface $producerMessage     * @param AmqpBuilder              $queueBuilder     * @param bool                     $confirm     * @param int                      $timeout     *     * @return bool     * @throws \Throwable     */    private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool    {
            $result = false;
            $this->
    injectMessageProperty($producerMessage);
            $message = new AMQPMessage($producerMessage->
    payload(), $producerMessage->
    getProperties());
            $pool    = $this->
    getConnectionPool($producerMessage->
    getPoolName());
            /** @VAR \Hyperf\Amqp\Connection $connection */        $connection = $pool->
    get();
        if ($confirm) {
                $channel = $connection->
    getConfirmChannel();
        }
 else {
                $channel = $connection->
    getChannel();
        }
            $channel->
    set_ack_handler(function () use (&
$result)        {
                $result = true;
        }
    );
        try {
                // 处理延时队列            $exchangeBuilder = $producerMessage->
    getExchangeBuilder();
                // 队列定义            $channel->
    queue_declare($queueBuilder->
    getQueue(), $queueBuilder->
    isPassive(), $queueBuilder->
    isDurable(), $queueBuilder->
    isExclusive(), $queueBuilder->
    isAutoDelete(), $queueBuilder->
    isNowait(), $queueBuilder->
    getArguments(), $queueBuilder->
    getTicket());
                // 路由定义            $channel->
    exchange_declare($exchangeBuilder->
    getExchange(), $exchangeBuilder->
    getType(), $exchangeBuilder->
    isPassive(), $exchangeBuilder->
    isDurable(), $exchangeBuilder->
    isAutoDelete(), $exchangeBuilder->
    isInternal(), $exchangeBuilder->
    isNowaIT(), $exchangeBuilder->
    getArguments(), $exchangeBuilder->
    getTicket());
                // 队列绑定            $channel->
    queue_bind($queueBuilder->
    getQueue(), $producerMessage->
    getExchange(), $producerMessage->
    getRoutingKey());
                // 消息发送            $channel->
    basic_publish($message, $producerMessage->
    getExchange(), $producerMessage->
    getRoutingKey());
                $channel->
    wait_for_pending_acks_returns($timeout);
        }
 catch (Throwable $exception) {
                // Reconnect the connection before release.            $connection->
    reconnect();
                throw $exception;
        }
        finally {
                $connection->
    release();
        }
            return $confirm ? $result : true;
    }
    /**     * @param ProducerMessageInterface $producerMessage     */    private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void    {
        if (class_exists(AnnotationCollector::class)) {
                /** @var \Hyperf\Amqp\Annotation\Producer $annotation */            $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
            if ($annotation) {
                    $annotation->
    routingKey &
    &
     $producerMessage->
    setRoutingKey($annotation->
    routingKey);
                    $annotation->
    exchange &
    &
     $producerMessage->
    setExchange($annotation->
    exchange);
            }
        }
    }
}
    

处理超时订单

  • Orderqueueconsumer.Php

  • Orderqueueproducer.Php

Orderqueueproducer.php

?phpdeclare(strict_types = 1);
    namespace App\Amqp\Producer;
    use Hyperf\Amqp\Annotation\Producer;
    use Hyperf\Amqp\Builder\ExchangeBuilder;
    use Hyperf\Amqp\Message\ProducerMessage;
/** * @Producer(exchange="order_exchange", routingKey="order_exchange") */class OrderQueueProducer extends ProducerMessage{
    public function __construct($data)    {
            $this->
    payload = $data;
    }
    public function getExchangeBuilder() : ExchangeBuilder    {
            return parent::getExchangeBuilder();
 // TODO: Change the autogenerated stub    }
}
    

Orderqueueconsumer.php

?phpdeclare(strict_types = 1);
    namespace App\Amqp\Consumer;
    use App\Service\CityTransport\OrderService;
    use Hyperf\Amqp\Result;
    use Hyperf\Amqp\Annotation\Consumer;
    use Hyperf\Amqp\Message\ConsumerMessage;
/** * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1) */class OrderQueueConsumer extends ConsumerMessage{
    public function consume($data) : string    {
       ##业务处理    }
    public function isEnable() : bool    {
            return true;
    }
}
    

Demo

$builder = new AmqpBuilder();
            $builder->
    setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');
            $que = ApplicationContext::getContainer()->
    get(DelayProducer::class);
            var_dump($que->
    produce(new OrderQueueProducer(['order_sn' =>
     (string)mt_rand(10000, 90000)]), $builder))

推荐教程:《PHP教程》

以上就是PHP 框架 Hyperf 实现处理超时未支付订单和延时队列的详细内容,更多请关注其它相关文章!

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!


若转载请注明出处: PHP 框架 Hyperf 实现处理超时未支付订单和延时队列
本文地址: https://pptw.com/jishu/596313.html
php session垃圾回收机制 PHP面向对象到底是啥?十分钟通俗易懂图文教程

游客 回复需填写必要信息