好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

如何基于Hyperf实现RabbitMQ+WebSocket消息推送

介绍

基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。

思路

利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

WebSocket 服务

composer require hyperf/websocket-server

配置文件 [config/autoload/server.php]

<?php
return [
    'mode' => SWOOLE_PROCESS,
    'servers' => [
   [
  'name' => 'http',
  'type' => Server::SERVER_HTTP,
  'host' => '0.0.0.0',
  'port' => 11111,
  'sock_type' => SWOOLE_SOCK_TCP,
  'callbacks' => [
 SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],
  ],
   ],
   [
  'name' => 'ws',
  'type' => Server::SERVER_WEBSOCKET,
  'host' => '0.0.0.0',
  'port' => 12222,
  'sock_type' => SWOOLE_SOCK_TCP,
  'callbacks' => [
 SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
 SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
 SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],
  ],
   ],
    ],

WebSocket 服务器端代码示例

<?php
declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @linkhttps://www.hyperf.io
 * @document https://doc.hyperf.io
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
 */
namespace App\Controller;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;
class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
    /**
* 发送消息
* @param WebSocketServer $server
* @param Frame $frame
*/
    public function onMessage(WebSocketServer $server, Frame $frame): void
    {
   //心跳刷新缓存
   $redis = $this->container->get(\Redis::class);
   //获取所有的客户端id
   $fdList = $redis->sMembers('websocket_sjd_1');
   //如果当前客户端在客户端集合中,就刷新
   if (in_array($frame->fd, $fdList)) {
  $redis->sAdd('websocket_sjd_1', $frame->fd);
  $redis->expire('websocket_sjd_1', 7200);
   }
   $server->push($frame->fd, 'Recv: ' . $frame->data);
    }
    /**
* 客户端失去链接
* @param Server $server
* @param int $fd
* @param int $reactorId
*/
    public function onClose(Server $server, int $fd, int $reactorId): void
    {
   //删掉客户端id
   $redis = $this->container->get(\Redis::class);
   //移除集合中指定的value
   $redis->sRem('websocket_sjd_1', $fd);
   var_dump('closed');
    }
    /**
* 客户端链接
* @param WebSocketServer $server
* @param Request $request
*/
    public function onOpen(WebSocketServer $server, Request $request): void
    {
   //保存客户端id
   $redis = $this->container->get(\Redis::class);
   $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);
   var_dump($res1);
   $res = $redis->expire('websocket_sjd_1', 7200);
   var_dump($res);
   $server->push($request->fd, 'Opened');
    }
}

WebSocket 前端代码

    function WebSocketTest() {
   if ("WebSocket" in window) {
  console.log("您的浏览器支持 WebSocket!");
  var num = 0
  // 打开一个 web socket
  var ws = new WebSocket("ws://127.0.0.1:12222");
  ws.onopen = function () {
 // Web Socket 已连接上,使用 send() 方法发送数据
 //alert("数据发送中...");
 //ws.send("发送数据");
  };
  window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
 var ping = {"type": "ping"};
 ws.send(JSON.stringify(ping));
  }, 5000);
  ws.onmessage = function (evt) {
 var d = JSON.parse(evt.data);
 console.log(d);
 if (d.code == 300) {
$(".address").text(d.address)
 }
 if (d.code == 200) {
var v = d.data
console.log(v);
num++
var str = `<div class="item">
 <p>${v.recordOutTime}</p>
 <p>${v.userOutName}</p>
 <p>${v.userOutNum}</p>
 <p>${v.doorOutName}</p>
  </div>`
$(".tableHead").after(str)
if (num > 7) {
    num--
    $(".table .item:nth-last-child(1)").remove()
}
 }
  };
  ws.error = function (e) {
 console.log(e)
 alert(e)
  }
  ws.onclose = function () {
 // 关闭 websocket
 alert("连接已关闭...");
  };
   } else {
  alert("您的浏览器不支持 WebSocket!");
   }
    }

AMQP 组件

composer require hyperf/amqp

配置文件 [config/autoload/amqp.php]

<?php
return [
    'default' => [
   'host' => 'localhost',
   'port' => 5672,
   'user' => 'guest',
   'password' => 'guest',
   'vhost' => '/',
   'pool' => [
  'min_connections' => 1,
  'max_connections' => 10,
  'connect_timeout' => 10.0,
  'wait_timeout' => 3.0,
  'heartbeat' => -1,
   ],
   'params' => [
  'insist' => false,
  'login_method' => 'AMQPLAIN',
  'login_response' => null,
  'locale' => 'en_US',
  'connection_timeout' => 3.0,
  'read_write_timeout' => 6.0,
  'context' => null,
  'keepalive' => false,
  'heartbeat' => 3,
   ],
    ],
];

MQ 消费者代码

<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Server\Server;
use Hyperf\Server\ServerFactory;
/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
 */
class DemoConsumer extends ConsumerMessage
{
    /**
* rabbmitMQ消费端代码
* @param $data
* @return string
*/
    public function consume($data): string
    {
   print_r($data);
   //获取集合中所有的value
   $redis = $this->container->get(\Redis::class);
   $fdList=$redis->sMembers('websocket_sjd_1');
   $server=$this->container->get(ServerFactory::class)->getServer()->getServer();
   foreach($fdList as $key=>$v){
  if(!empty($v)){
 $server->push((int)$v, $data);
  }
   }
   return Result::ACK;
    }
}

控制器代码

    /**
* test
* @return array
*/
    public function test()
    {
   $data = array(
  'code' => 200,
  'data' => [
 'userOutName' => 'ccflow',
 'userOutNum' => '9999',
 'recordOutTime' => date("Y-m-d H:i:s", time()),
 'doorOutName' => '教师公寓',
  ]
   );
   $data = \GuzzleHttp\json_encode($data);
   $message = new DemoProducer($data);
   $producer = ApplicationContext::getContainer()->get(Producer::class);
   $result = $producer->produce($message);
   var_dump($result);
   $user = $this->request->input('user', 'Hyperf');
   $method = $this->request->getMethod();
   return [
  'method' => $method,
  'message' => "{$user}.",
   ];
    }

最终效果

推荐:《PHP教程》

以上就是如何基于Hyperf实现RabbitMQ+WebSocket消息推送的详细内容,更多请关注Gxlcms其它相关文章!

查看更多关于如何基于Hyperf实现RabbitMQ+WebSocket消息推送的详细内容...

  阅读:40次