好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

详解PHP实现生产者与消费者(Kafka应用)

本篇文章给大家介绍PHP实现生产者与消费者,希望对需要的朋友有所帮助!

前言

PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装方法自行百度,本篇不做说明了。

生产者(测试)

创建消费者需要步骤:

生产者配置参数 创建生产者实例 创建主题实例(依赖生产者) 生产主题消息 推送消息

具体代码如下:

   $conf = new \RdKafka\Conf();
   // 绑定服务节点
   $conf->set('metadata.broker.list', '127.0.0.1:32772');

   // 创建生产者
   $kafka = new \RdKafka\Producer($conf);

   // 创建主题实例
   $topic = $kafka->newTopic('p1r1');
   // 生产主题数据,此时消息在缓冲区中,并没有真正被推送
   $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message');
   // 阻塞时间(毫秒), 0为非阻塞
   $kafka->poll(0); 

   // 推送消息,如果不调用此函数,消息不会被发送且会丢失
   $result = $kafka->flush(5000);

   if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
  throw new \RuntimeException('Was unable to flush, messages might be lost!');
   }

消费者

创建一个消费者需要几个步骤:

消费者配置参数 应用配置参数创建消费者实例 订阅对应主题 拉取数据 提交位移

具体代码如下:

   $conf = new \RdKafka\Conf();
   // 绑定消费者组
   $conf->set('group.id', 'ceshi');
   // 绑定服务节点,多个用,分隔
   $conf->set('metadata.broker.list', '127.0.0.1:32787');
   // 设置自动提交为false
   $conf->set('enable.auto测试数据mit', 'false');
   // 设置当前消费者拉取数据时的偏移量, 可选参数:
   // earliest: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。
   // latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。
   $conf->set('auto.offset.reset', 'earliest');

   // 创建消费者实例
   $consumer = new \RdKafka\KafkaConsumer($conf);
   // 消费者订阅主题,数组形式
   $consumer->subscribe(['topic1','topic2']);
   while (true) {
  // 消费数据,阻塞5秒(5秒内有数据就消费,没有数据等待5秒进入下一轮循环)
  $message = $consumer->consume(5000);
  switch ($message->err) {
 case RD_KAFKA_RESP_ERR_NO_ERROR:
// 业务逻辑
var_dump($message);

// 提交位移
$consumer->commit($message);
break;
 case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
 case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
 default:
throw new \Exception($message->errstr(), $message->err);
break;
  }
   }
   // 关闭消费者(一般用在脚本中,不需要关闭)
   $conumser->close();

只消费指定分区中的数据:

    // 对消费者指定分区,注意此方式不能与subscribe一同使用
    $consumer->assign([
   new RdKafka\TopicPartition("topic", 0),
   new RdKafka\TopicPartition("topic", 1),
    ]);

以上就是详解PHP实现生产者与消费者(Kafka应用)的详细内容!

查看更多关于详解PHP实现生产者与消费者(Kafka应用)的详细内容...

  阅读:36次