好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

延时队列

Delayproducer.Php

Amqpbuilder.Php

AmqpBuilder.php

<?php
declare(strict_types = 1);
namespace App\Components\Amqp;
use Hyperf\Amqp\Builder\Builder;
use Hyperf\Amqp\Builder\QueueBuilder;
class AmqpBuilder extends QueueBuilder
{
    /**
* @param array|\PhpAmqpLib\Wire\AMQPTable $arguments
*
* @return \Hyperf\Amqp\Builder\Builder
*/
    public function setArguments($arguments) : Builder
    {
   $this->arguments = array_merge($this->arguments, $arguments);
   return $this;
    }
    /**
* 设置延时队列相关参数
*
* @param string $queueName
* @param int    $xMessageTtl
* @param string $xDeadLetterExchange
* @param string $xDeadLetterRoutingKey
*
* @return $this
*/
    public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self
    {
   $this->setArguments([
  'x-message-ttl'   => ['I', $xMessageTtl * 1000], // 毫秒
  'x-dead-letter-exchange'    => ['S', $xDeadLetterExchange],
  'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],
   ]);
   $this->setQueue($queueName);
   return $this;
    }
}

DelayProducer.php

<?php
declare(strict_types = 1);
namespace App\Components\Amqp;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Builder;
use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\Di\Annotation\AnnotationCollector;
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
class DelayProducer extends Builder
{
    /**
* @param ProducerMessageInterface $producerMessage
* @param AmqpBuilder    $queueBuilder
* @param bool $confirm
* @param int  $timeout
*
* @return bool
* @throws \Throwable
*/
    public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
    {
   return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)
   {
  return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
   });
    }
    /**
* @param ProducerMessageInterface $producerMessage
* @param AmqpBuilder    $queueBuilder
* @param bool $confirm
* @param int  $timeout
*
* @return bool
* @throws \Throwable
*/
    private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
    {
   $result = false;
   $this->injectMessageProperty($producerMessage);
   $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
   $pool    = $this->getConnectionPool($producerMessage->getPoolName());
   /** @var \Hyperf\Amqp\Connection $connection */
   $connection = $pool->get();
   if ($confirm) {
  $channel = $connection->getConfirmChannel();
   } else {
  $channel = $connection->getChannel();
   }
   $channel->set_ack_handler(function () use (&$result)
   {
  $result = true;
   });
   try {
  // 处理延时队列
  $exchangeBuilder = $producerMessage->getExchangeBuilder();
  // 队列定义
  $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
  // 路由定义
  $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
  // 队列绑定
  $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
  // 消息发送
  $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
  $channel->wait_for_pending_acks_returns($timeout);
   } catch (Throwable $exception) {
  // Reconnect the connection before release.
  $connection->reconnect();
  throw $exception;
   }
   finally {
  $connection->release();
   }
   return $confirm ? $result : true;
    }
    /**
* @param ProducerMessageInterface $producerMessage
*/
    private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
    {
   if (class_exists(AnnotationCollector::class)) {
  /** @var \Hyperf\Amqp\Annotation\Producer $annotation */
  $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
  if ($annotation) {
 $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
 $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
  }
   }
    }
}

处理超时订单

Orderqueueconsumer.Php

Orderqueueproducer.Php

Orderqueueproducer.php

<?php
declare(strict_types = 1);
namespace App\Amqp\Producer;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Builder\ExchangeBuilder;
use Hyperf\Amqp\Message\ProducerMessage;
/**
 * @Producer(exchange="order_exchange", routingKey="order_exchange")
 */
class OrderQueueProducer extends ProducerMessage
{
    public function __construct($data)
    {
   $this->payload = $data;
    }
    public function getExchangeBuilder() : ExchangeBuilder
    {
   return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
    }
}

Orderqueueconsumer.php

<?php
declare(strict_types = 1);
namespace App\Amqp\Consumer;
use App\Service\CityTransport\OrderService;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
/**
 * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1)
 */
class OrderQueueConsumer extends ConsumerMessage
{
    public function consume($data) : string
    {
  ##业务处理
    }
    public function isEnable() : bool
    {
   return true;
    }
}

Demo

$builder = new AmqpBuilder();
   $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');
   $que = ApplicationContext::getContainer()->get(DelayProducer::class);
   var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))

推荐教程:《PHP教程》

以上就是PHP 框架 Hyperf 实现处理超时未支付订单和延时队列的详细内容,更多请关注Gxlcms其它相关文章!

查看更多关于PHP 框架 Hyperf 实现处理超时未支付订单和延时队列的详细内容...

  阅读:46次