好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

PHP和RabbitMQ实现消息队列的完整代码

本篇文章给大家带来的内容是关于PHP和RabbitMQ实现消息队列的完整代码,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.
php扩展地址: http://pecl.php测试数据/package/amqp
具体以官网为准 http://HdhCmsTestrabbitmq测试数据/getstarted.html

介绍

config.php 配置信息
BaseMQ.php MQ基类
ProductMQ.php 生产者类
ConsumerMQ.php 消费者类
Consumer2MQ.php 消费者2(可有多个)

config.php
    <?php
    return [
   //配置
   'host' => [
  'host' => '127.0.0.1',
  'port' => '5672',
  'login' => 'guest',
  'password' => 'guest',
  'vhost'=>'/',
   ],
   //交换机
   'exchange'=>'word',
   //路由
   'routes' => [],
    ];
BaseMQ.php
    <?php
    /**
* Created by PhpStorm.
* User: pc
* Date: 2018/12/13
* Time: 14:11
*/
    
    namespace MyObjSummary\rabbitMQ;
    
    /** Member
* AMQPChannel
* AMQPConnection
* AMQPEnvelope
* AMQPExchange
* AMQPQueue
* Class BaseMQ
* @package MyObjSummary\rabbitMQ
*/
    class BaseMQ
    {
   /** MQ Channel
    * @var \AMQPChannel
    */
   public $AMQPChannel ;
    
   /** MQ Link
    * @var \AMQPConnection
    */
   public $AMQPConnection ;
    
   /** MQ Envelope
    * @var \AMQPEnvelope
    */
   public $AMQPEnvelope ;
    
   /** MQ Exchange
    * @var \AMQPExchange
    */
   public $AMQPExchange ;
    
   /** MQ Queue
    * @var \AMQPQueue
    */
   public $AMQPQueue ;
    
   /** conf
    * @var
    */
   public $conf ;
    
   /** exchange
    * @var
    */
   public $exchange ;
    
   /** link
    * BaseMQ constructor.
    * @throws \AMQPConnectionException
    */
   public function __construct()
   {
  $conf =  require 'config.php' ;
  if(!$conf)
 throw new \AMQPConnectionException('config error!');
  $this->conf= $conf['host'] ;
  $this->exchange = $conf['exchange'] ;
  $this->AMQPConnection = new \AMQPConnection($this->conf);
  if (!$this->AMQPConnection->connect())
 throw new \AMQPConnectionException("Cannot connect to the broker!\n");
   }
    
   /**
    * close link
    */
   public function close()
   {
  $this->AMQPConnection->disconnect();
   }
    
   /** Channel
    * @return \AMQPChannel
    * @throws \AMQPConnectionException
    */
   public function channel()
   {
  if(!$this->AMQPChannel) {
 $this->AMQPChannel =  new \AMQPChannel($this->AMQPConnection);
  }
  return $this->AMQPChannel;
   }
    
   /** Exchange
    * @return \AMQPExchange
    * @throws \AMQPConnectionException
    * @throws \AMQPExchangeException
    */
   public function exchange()
   {
  if(!$this->AMQPExchange) {
 $this->AMQPExchange = new \AMQPExchange($this->channel());
 $this->AMQPExchange->setName($this->exchange);
  }
  return $this->AMQPExchange ;
   }
    
   /** queue
    * @return \AMQPQueue
    * @throws \AMQPConnectionException
    * @throws \AMQPQueueException
    */
   public function queue()
   {
  if(!$this->AMQPQueue) {
 $this->AMQPQueue = new \AMQPQueue($this->channel());
  }
  return $this->AMQPQueue ;
   }
    
   /** Envelope
    * @return \AMQPEnvelope
    */
   public function envelope()
   {
  if(!$this->AMQPEnvelope) {
 $this->AMQPEnvelope = new \AMQPEnvelope();
  }
  return $this->AMQPEnvelope;
   }
    }

ProductMQ.php

    <?php
    //生产者 P
    namespace MyObjSummary\rabbitMQ;
    require 'BaseMQ.php';
    class ProductMQ extends BaseMQ
    {
   private $routes = ['hello','word']; //路由key
    
   /**
    * ProductMQ constructor.
    * @throws \AMQPConnectionException
    */
   public function __construct()
   {
 parent::__construct();
   }
    
   /** 只控制发送成功 不接受消费者是否收到
    * @throws \AMQPChannelException
    * @throws \AMQPConnectionException
    * @throws \AMQPExchangeException
    */
   public function run()
   {
  //频道
  $channel = $this->channel();
  //创建交换机对象
  $ex = $this->exchange();
  //消息内容
  $message = 'product message '.rand(1,99999);
  //开始事务
  $channel->startTransaction();
  $sendEd = true ;
  foreach ($this->routes as $route) {
 $sendEd = $ex->publish($message, $route) ;
 echo "Send Message:".$sendEd."\n";
  }
  if(!$sendEd) {
 $channel->rollbackTransaction();
  }
  $channel->commitTransaction(); //提交事务
  $this->close();
  die ;
   }
    }
    try{
   (new ProductMQ())->run();
    }catch (\Exception $exception){
   var_dump($exception->getMessage()) ;
    }
ConsumerMQ.php
    <?php
    //消费者 C
    namespace MyObjSummary\rabbitMQ;
    require 'BaseMQ.php';
    class ConsumerMQ extends BaseMQ
    {
   private  $q_name = 'hello'; //队列名
   private  $route  = 'hello'; //路由key
    
   /**
    * ConsumerMQ constructor.
    * @throws \AMQPConnectionException
    */
   public function __construct()
   {
  parent::__construct();
   }
    
   /** 接受消息 如果终止 重连时会有消息
    * @throws \AMQPChannelException
    * @throws \AMQPConnectionException
    * @throws \AMQPExchangeException
    * @throws \AMQPQueueException
    */
   public function run()
   {
    
  //创建交换机
  $ex = $this->exchange();
  $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
  $ex->setFlags(AMQP_DURABLE); //持久化
  //echo "Exchange Status:".$ex->declare()."\n";
    
  //创建队列
  $q = $this->queue();
  //var_dump($q->declare());exit();
  $q->setName($this->q_name);
  $q->setFlags(AMQP_DURABLE); //持久化
  //echo "Message Total:".$q->declareQueue()."\n";
    
  //绑定交换机与队列,并指定路由键
  echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n";
    
  //阻塞模式接收消息
  echo "Message:\n";
  while(True){
 $q->consume(function ($envelope,$queue){
$msg = $envelope->getBody();
echo $msg."\n"; //处理消息
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
 });
 //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
  }
  $this->close();
   }
    }
    try{
   (new ConsumerMQ)->run();
    }catch (\Exception $exception){
   var_dump($exception->getMessage()) ;
    }

以上就是PHP和RabbitMQ实现消息队列的完整代码的详细内容,更多请关注Gxl网其它相关文章!

查看更多关于PHP和RabbitMQ实现消息队列的完整代码的详细内容...

  阅读:42次