PHP 框架 Hyperf 实现处理超时未支付订单和延时队列
导读:收集整理的这篇文章主要介绍了PHP 框架 Hyperf 实现处理超时未支付订单和延时队列,觉得挺不错的,现在分享给大家,也给大家做个参考。延时队列@H_360_11@DelayPRoducer.Phpamqpbuilder.PhpAmqpB...
收集整理的这篇文章主要介绍了PHP 框架 Hyperf 实现处理超时未支付订单和延时队列,觉得挺不错的,现在分享给大家,也给大家做个参考。延时队列
@H_360_11@DelayPRoducer.Php
amqpbuilder.Php
AmqpBuilder.php
?phpdeclare(strict_tyPEs = 1);
namespace App\components\Amqp;
use Hyperf\Amqp\Builder\Builder;
use Hyperf\Amqp\Builder\QueueBuilder;
class AmqpBuilder extends QueueBuilder{
/** * @param array|\PhPAMqpLib\Wire\AMQPTable $arguments * * @return \Hyperf\Amqp\Builder\Builder */ public function setarguments($arguments) : Builder {
$this->
arguments = array_merge($this->
arguments, $arguments);
return $this;
}
/** * 设置延时队列相关参数 * * @param string $queueName * @param int $XMessageTTL * @param string $xDeadLetterExchange * @param string $xDeadLetterRoutingKey * * @return $this */ public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self {
$this->
setArguments([ 'x-message-ttl' =>
['I', $xMessageTtl * 1000], // 毫秒 'x-dead-letter-exchange' =>
['S', $xDeadLetterExchange], 'x-dead-letter-routing-key' =>
['S', $xDeadLetterRoutingKey], ]);
$this->
setQueue($queueName);
return $this;
}
}
DelayProducer.php
?phpdeclare(strict_types = 1);
namespace App\Components\Amqp;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Builder;
use Hyperf\Amqp\Message\ProducerMessageinterface;
use Hyperf\Di\Annotation\AnnotationCollector;
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
class DelayProducer extends Builder{
/** * @param ProducerMessageInterface $producerMessage * @param AmqpBuilder $queueBuilder * @param bool $confirm * @param int $timeout * * @return bool * @throws \Throwable */ public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool {
return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout) {
return $this->
produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
}
);
}
/** * @param ProducerMessageInterface $producerMessage * @param AmqpBuilder $queueBuilder * @param bool $confirm * @param int $timeout * * @return bool * @throws \Throwable */ private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool {
$result = false;
$this->
injectMessageProperty($producerMessage);
$message = new AMQPMessage($producerMessage->
payload(), $producerMessage->
getProperties());
$pool = $this->
getConnectionPool($producerMessage->
getPoolName());
/** @VAR \Hyperf\Amqp\Connection $connection */ $connection = $pool->
get();
if ($confirm) {
$channel = $connection->
getConfirmChannel();
}
else {
$channel = $connection->
getChannel();
}
$channel->
set_ack_handler(function () use (&
$result) {
$result = true;
}
);
try {
// 处理延时队列 $exchangeBuilder = $producerMessage->
getExchangeBuilder();
// 队列定义 $channel->
queue_declare($queueBuilder->
getQueue(), $queueBuilder->
isPassive(), $queueBuilder->
isDurable(), $queueBuilder->
isExclusive(), $queueBuilder->
isAutoDelete(), $queueBuilder->
isNowait(), $queueBuilder->
getArguments(), $queueBuilder->
getTicket());
// 路由定义 $channel->
exchange_declare($exchangeBuilder->
getExchange(), $exchangeBuilder->
getType(), $exchangeBuilder->
isPassive(), $exchangeBuilder->
isDurable(), $exchangeBuilder->
isAutoDelete(), $exchangeBuilder->
isInternal(), $exchangeBuilder->
isNowaIT(), $exchangeBuilder->
getArguments(), $exchangeBuilder->
getTicket());
// 队列绑定 $channel->
queue_bind($queueBuilder->
getQueue(), $producerMessage->
getExchange(), $producerMessage->
getRoutingKey());
// 消息发送 $channel->
basic_publish($message, $producerMessage->
getExchange(), $producerMessage->
getRoutingKey());
$channel->
wait_for_pending_acks_returns($timeout);
}
catch (Throwable $exception) {
// Reconnect the connection before release. $connection->
reconnect();
throw $exception;
}
finally {
$connection->
release();
}
return $confirm ? $result : true;
}
/** * @param ProducerMessageInterface $producerMessage */ private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void {
if (class_exists(AnnotationCollector::class)) {
/** @var \Hyperf\Amqp\Annotation\Producer $annotation */ $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
if ($annotation) {
$annotation->
routingKey &
&
$producerMessage->
setRoutingKey($annotation->
routingKey);
$annotation->
exchange &
&
$producerMessage->
setExchange($annotation->
exchange);
}
}
}
}
处理超时订单
Orderqueueconsumer.Php
Orderqueueproducer.Php
Orderqueueproducer.php
?phpdeclare(strict_types = 1);
namespace App\Amqp\Producer;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Builder\ExchangeBuilder;
use Hyperf\Amqp\Message\ProducerMessage;
/** * @Producer(exchange="order_exchange", routingKey="order_exchange") */class OrderQueueProducer extends ProducerMessage{
public function __construct($data) {
$this->
payload = $data;
}
public function getExchangeBuilder() : ExchangeBuilder {
return parent::getExchangeBuilder();
// TODO: Change the autogenerated stub }
}
Orderqueueconsumer.php
?phpdeclare(strict_types = 1);
namespace App\Amqp\Consumer;
use App\Service\CityTransport\OrderService;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
/** * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1) */class OrderQueueConsumer extends ConsumerMessage{
public function consume($data) : string {
##业务处理 }
public function isEnable() : bool {
return true;
}
}
Demo
$builder = new AmqpBuilder();
$builder->
setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');
$que = ApplicationContext::getContainer()->
get(DelayProducer::class);
var_dump($que->
produce(new OrderQueueProducer(['order_sn' =>
(string)mt_rand(10000, 90000)]), $builder))推荐教程:《PHP教程》
以上就是PHP 框架 Hyperf 实现处理超时未支付订单和延时队列的详细内容,更多请关注其它相关文章!
声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!
若转载请注明出处: PHP 框架 Hyperf 实现处理超时未支付订单和延时队列
本文地址: https://pptw.com/jishu/596313.html
