好得很程序员自学网
  • 首页
  • 后端语言
    • C#
    • PHP
    • Python
    • java
    • Golang
    • ASP.NET
  • 前端开发
    • Angular
    • react框架
    • LayUi开发
    • javascript
    • HTML与HTML5
    • CSS与CSS3
    • jQuery
    • Bootstrap
    • NodeJS
    • Vue与小程序技术
    • Photoshop
  • 数据库技术
    • MSSQL
    • MYSQL
    • Redis
    • MongoDB
    • Oracle
    • PostgreSQL
    • Sqlite
    • 数据库基础
    • 数据库排错
  • CMS系统
    • HDHCMS
    • WordPress
    • Dedecms
    • PhpCms
    • 帝国CMS
    • ThinkPHP
    • Discuz
    • ZBlog
    • ECSHOP
  • 高手进阶
    • Android技术
    • 正则表达式
    • 数据结构与算法
  • 系统运维
    • Windows
    • apache
    • 服务器排错
    • 网站安全
    • nginx
    • linux系统
    • MacOS
  • 学习教程
    • 前端脚本教程
    • HTML与CSS 教程
    • 脚本语言教程
    • 数据库教程
    • 应用系统教程
  • 新技术
  • 编程导航
    • 区块链
    • IT资讯
    • 设计灵感
    • 建站资源
    • 开发团队
    • 程序社区
    • 图标图库
    • 图形动效
    • IDE环境
    • 在线工具
    • 调试测试
    • Node开发
    • 游戏框架
    • CSS库
    • Jquery插件
    • Js插件
    • Web框架
    • 移动端框架
    • 模块管理
    • 开发社区
    • 在线课堂
    • 框架类库
    • 项目托管
    • 云服务

当前位置:首页>后端语言>PHP
<tfoot draggable='sEl'></tfoot>

php-kafka教程 php kafka

很多站长朋友们都不太清楚php-kafka教程,今天小编就来给大家整理php-kafka教程,希望对各位有所帮助,具体内容如下:

本文目录一览: 1、 消息中间件Kafka - PHP操作使用Kafka 2、 Kafka相关内容总结(Kafka集群搭建手记) 3、 Kafaka入门(1)- Kafka简介和安装与启动(mac) 4、 服务端技术实战系列——Kafka篇 消息中间件Kafka - PHP操作使用Kafka

cd librdkafka/

./configure make make install

安装成功界面 没有报错就是安装成功

Kafka相关内容总结(Kafka集群搭建手记)

Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。

入门请参照:

在此不再赘述。

这部分不是本文的重点,但是kafka需要用到kafka集群,所以先搭建kafka集群。

从kafka官方文档看到,kafka似乎在未来的版本希望抛弃zookeep集群,自己维护集群的一致性,拭目以待吧。

我们搭建集群使用的是三台同机房的机器,因为zookeeper不怎么占资源也不怎么占空间(我们的业务目前比较简单),所以三台机器上都搭建了zookeeper集群。

搭建zookeeper集群没什么难度,参考文档:

下面列一下我的配置并解析:

一共用三台物理机器,搭建一个Kafka集群。

每台服务器的硬盘划分都是一样的,每个独立的物理磁盘挂在一个单独的分区里面,这样很方便用于Kafka多个partition的数据读写与冗余。

/data1比较小,为了不成为集群的瓶颈,所以/data1用于存放kafka以及Zookeeper

每台机器的磁盘分布如下:

下面是kafka的简单配置,三台服务器都一样,如有不一致的在下文有说明。

kafka安装在目录/usr/local/kafka/下,下面的说明以10.1.xxx.57为例。

最重要的配置文件server.properties,需要配置的信息如下:

从上面的配置看到,kafka集群不需要像hadoop集群那样,配置ssh通讯,而且一个kafka服务器(官方文档称之为broker,下面统一使用这个称呼)并不知道其他的kafka服务器的存在,因此你需要逐个broker去启动kafka。各个broker根据自己的配置,会自动去配置文件上的zk服务器报到,这就是一个有zk服务器粘合起来的kafka集群。

我写了一个启动脚本,放在 /usr/local/kafka/bin 下面。启动脚本每个broker都一样:

如同kafka集群里面每一个broker都需要单独启动一样,kafka集群里面每一个broker都需要单独关闭。

官方给出的关闭脚本是单独运行 bin/kafka-server-stop.sh

但是我运行的结果是无法关闭。打开脚本一看,才发现是最简单的办法,发一个TERM信号到kafka的java进程,官方脚本给出的grep有点问题。

发信号之后,一直tail着kafka日志,看到正常关闭。

指定zookeeper服务器,topic名称是LvsKafka(注意topic名称不能有英文句号(.)和下划线(_),否则会通不过,理由是名称会冲突,下文对此略有解析)

replication-factor指出重复因子是2,也就是每条数据有两个拷贝,可靠性考虑。

partitions 指出需要多少个partition,数据量大的多一点,无论生产和消费,这是负载均衡和高并发的需要。

可以看到刚才新建的24个partition,比如partition 5, 他的leader是broker 59,也就是10.1.xxx.59这台机器。

建立topic时我们指出需要2个拷贝,从上面的输出的Replicas字段看到,这两个拷贝放在59,58两个机器,也就是10.1.xxx.59和10.1.xxx.58.

Isr表示当前partition的所有拷贝所在的机器中,哪些是还活着(可以提供服务)的。现在是59和58都还存活。

这个命令另外还会看到一些类似于下面的内容:

__consumer_offsets到底是什么呢?其实就是客户端的消费进度,客户端会定时上报到kafka集群,而kafka集群会把每个客户端的消费进度放入一个自己内部的topic中,这个topic就是__consumer_offsets。我查看过__consumer_offsets的内容,其实就是每个客户端的消费进度作为一条消息,放入__consumer_offsets这个topic中。

这里给了我们两个提示:

1、kafka自己管理客户端的消费进度,而不是依靠zk,这就是kafka官方文档说的kafka未来会抛弃zk的底气之一;

2、留意到这个kafka自己的topic是带下划线的,也就是,kafka担心我们自己建的topic如果带下划线的话会跟这些内部自用的topic冲突;

Kafaka入门(1)- Kafka简介和安装与启动(mac)

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。kafka 是一个高性能的消息队列,也是一个分布式流处理平台。

kafka中文网

kafka官网

Producer :Producer即生产者,消息的产生者,是消息的入口。

kafka cluster :

Broker :Broker是kafka实例,每个服务器上有一个或多个kafka的实例,姑且认为每个broker对应一台服务器。一个集群由多个broker组成,集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……

Topic :消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。

Partition :Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。 同一个topic在不同的分区的数据是不重复的 ,partition的表现形式就是一个一个的文件夹!

Replication : 每一个分区都有多个副本 ,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。

Message :每一条发送的消息主体。

Consumer :消费者,即消息的消费方,是消息的出口。

Consumer Group :将多个消费组成一个消费者组。在kafka的设计中 同一个分区的数据只能被同一消费者组中的某一个消费者消费 。Partition 的分配问题,即确定哪个 Partition 由哪个 Consumer 来消费。Kafka 有两种分配策略,一个是 RoundRobin,一个是 Range,默认为Range。

一个消费者组内也可以订阅多个topic

多个消费组可以订阅同一个topic 。

Zookeeper :kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。

使用brew进行安装,非常方便。

ZooKeeper是一个分布式的,开放源码的 分布式应用程序协调服务 ,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

kafka是基于zookeeper的,启动kafka之前,需要先启动zookeeper

查看启动是否成功

启动kafka

查看启动是否成功

查看topic列表

新起一个终端,作为生产者,用于发送消息,每一行算一条消息,将消息发送到kafka服务器

新起一个终端作为消费者,接收消息

服务关闭的顺序是先kafka,然后zookeeper

再过半小时,你就能明白kafka的工作原理了

Kafka架构原理,也就这么回事!

服务端技术实战系列——Kafka篇

一.概念原理

[if !supportLists]1.?[endif]主题(topic):主题是对消息的分类。

[if !supportLists]2.?[endif]消息(message):消息是kafka通信的基本单位。

[if !supportLists]3.?[endif]分区(partition): 一组 消息对应 一个 主题, 一个 主题对应 一个或多个 分区。每个分区为一系列有序消息组成的 有序队列 ;每个分区在物理上对应一个文件夹。

[if !supportLists]4.?[endif]副本(replica):每个分区有 一个或多个 副本,分区的副本分布在集群的 不同 代理(机器)上,以提高可用性;分区的副本与日志对象是一一对应的。

[if !supportLists]5.?[endif]Kafka只保证一个 分区内 的消息 有序性 ,不保证跨分区消息的有序性。消息被追加到相应分区中, 顺序写入磁盘 ,效率非常高。

[if !supportLists]6.?[endif]Kafka选取某个某个分区的 一个 副本作为leader副本,该分区的 其他 副本为follower副本。 只有leader副本负责处理客户端读/写请求 ,follower副本从leader副本同步数据。

[if !supportLists]7.?[endif]任何发布到分区的消息都会追加到日志文件的尾部, 每条消息 在日志文件中的 位置 都对应一个 按序递增的偏移量 ;偏移量在一个分区下严格有序。

[if !supportLists]8.?[endif]Kafka不允许对消息进行随机读写。

[if !supportLists]9.?[endif]新版消费者将 消费偏移量 保存到kafka内部的一个主题中。

[if !supportLists]10.?[endif]Kafka集群由 一个或多个代理 (Broker,也称为kafka实例)构成。可以在 一台 服务器上配置 一个或多个代理 ,每个代理具有唯一标识broker.id。

[if !supportLists]11.?[endif]生产者将消息 发送给代理 (Broker)。

[if !supportLists]12.?[endif]消费者以 拉取 (pull)方式拉取数据,每个消费者都属于一个消费组。

[if !supportLists]13.?[endif]同一个主题的一条消息只能被 同一个消费组 下的某一个消费者消费,但 不同消费组 的消费者可以 同时 消费该消息。

[if !supportLists]14.?[endif]消息 广播 :指定各消费者属于不同消费组;消息 单播 :指定各消费者属于同一个消费组。

[if !supportLists]15.?[endif]Kafka启动时在Zookeeper上创建相应节点来保存 元数据 ,元数据包括:代理节点信息、集群信息、主题信息、分区状态信息、分区副本分配方案、动态配置等;

[if !supportLists]16.?[endif]Kafka通过 监听 机制在节点注册监听器来监听节点元数据变化;

[if !supportLists]17.?[endif]Kafka将数据写入 磁盘 ,以文件系统来存数据;

[if !supportLists]18.?[endif]生产环境一般将zookeeper集群和kafka集群 分机架 部署;

[if !supportLists]二.[endif] Kafka Producer

配置:

/**

?* xTestProxy——KafkaConfigConstant

?*

?* @author ?ZhangChi

?* @date ?2018年6月20日---下午5:50:44

?* @version ?1.0

?*/

public ? class ?KafkaConfigConstant {

public ? static ? final ?String KAFKA_CLUSTER ?= "fa-common1.hangzhou-1.kafka.internal.lede测试数据:9200,fa-common2.hangzhou-1.kafka.internal.lede测试数据:9200,fa-common3.hangzhou-1.kafka.internal.lede测试数据:9200";

}

生产者配置:

/**

?* xTestProxy——HttpKafkaProducerFactory

?*

?* @author ?ZhangChi

?* @date ?2018年6月11日---下午2:37:51

?* @version ?1.0

?*/

public ? class ?HttpKafkaProducerFactory {

// 真正的KafkaProducer仅有一份

private ? static ?KafkaProducer kafkaProducer ?= null ;

private ? static ?Properties property ;

public ? static ?KafkaProducer getKafkaProducer() {

if ?( kafkaProducer ?== null ) {

synchronized ?(HttpKafkaProducerFactory. class ) {

if ?( kafkaProducer ?== null ) {

property ?= buildKafkaProperty ();

kafkaProducer ?= new ?KafkaProducer( property );

}

}

}

return ? kafkaProducer ;

}

public ? static ?Properties buildKafkaProperty() {

Properties props?= new ?Properties();

props.put(ProducerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER );

props.put(ProducerConfig. ACKS_CONFIG , "all");

props.put(ProducerConfig. RETRIES_CONFIG , 0);

props.put(ProducerConfig. BATCH_SIZE_CONFIG , 16384);

props.put(ProducerConfig. BUFFER_MEMORY_CONFIG , 33554432);

props.put(ProducerConfig. LINGER_MS_CONFIG , 1);

props.put(ProducerConfig. KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka测试数据mon.serialization.StringSerializer");

props.put(ProducerConfig. VALUE_SERIALIZER_CLASS_CONFIG ,

"org.apache.kafka测试数据mon.serialization.StringSerializer");

return ?props;

}

}

生产者线程组:

/**

?* xTestProxy——HttpKafkaProducerThread

?* 多线程每次new一个实例

?*

?* @author ?ZhangChi

?* @date ?2018年6月25日---下午2:09:39

?* @version ?1.0

?*/

public ? class ?HttpKafkaProducerThread implements ?Runnable {

private ? static ?Logger logger ?= LoggerFactory. getLogger ("HttpKafkaProducerThread");

private ? final ?String KAFKA_TOPIC?= KafkaConstant. HTTP_REQ_RESP_TOPIC ;

private ?String kafkaMessageJson;

private ?KafkaProducer producer;

public ?String messageType;

public ?String originalMessage;

private ? static ?KafkaMessage kafkaMessage ?= new ?KafkaMessage();

public ?HttpKafkaProducerThread(KafkaProducer producer, String messageType, String originalMessage) {

this .producer?= producer;

this .messageType?= messageType;

this .originalMessage?= originalMessage;

}

@Override

public ? void ?run() {

// TODO ?Auto-generated method stub

/* 1.构建kafka消息*/

kafkaMessageJson?= generateKafkaMessage( this .messageType, this .originalMessage);

/* 2.发送kafka消息*/

if ?(kafkaMessageJson?!= null ? !StringUtils. isEmpty (kafkaMessageJson)) {

logger .info("create message start:"?+ kafkaMessageJson);

producer.send( new ?ProducerRecord( this .KAFKA_TOPIC, kafkaMessageJson));

} else ?{

logger .info("kafkaMessageJson is null!");

}

}

private ?String generateKafkaMessage(String messageType, String originalMessage) {

if ?(StringUtils. isBlank (messageType) || StringUtils. isBlank (originalMessage)) {

return ? null ;

}

kafkaMessage .setMessageId(KafkaMessageUtils. generateId ());

kafkaMessage .setMessageTime(KafkaMessageUtils. generateTime ());

kafkaMessage .setMessageType(messageType);

kafkaMessage .setMessage(originalMessage);

String kafkaMessageToJson?= null ;

try ?{

kafkaMessageToJson?= KafkaMessageUtils. objectToJson ( kafkaMessage );

} catch ?(JsonProcessingException e) {

// TODO ?Auto-generated catch block

e.printStackTrace();

}

kafkaMessageJson?= kafkaMessageToJson;

return ?kafkaMessageToJson;

}

}

[if !supportLists]三.[endif] Kafka Consumer

消费者配置:

private ? static ?Properties buildKafkaProperty() {

Properties properties?= new ?Properties();

// 测试环境kafka的端口号是9200

properties.put(ConsumerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER );

// 消费组名称

properties.put(ConsumerConfig. GROUP_ID_CONFIG , KafkaConfigConstant. GROUP_ID );

properties.put(ConsumerConfig. CLIENT_ID_CONFIG , "test");

// 从头消费

properties.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG , "earliest");

// 自动提交偏移量

properties.put(ConsumerConfig. ENABLE_AUTO_COMMIT_CONFIG , "true");

// 时间间隔1s

properties.put(ConsumerConfig. AUTO_COMMIT_INTERVAL_MS_CONFIG , "1000");

properties.put(ConsumerConfig. KEY_DESERIALIZER_CLASS_CONFIG ,

"org.apache.kafka测试数据mon.serialization.StringDeserializer");

properties.put(ConsumerConfig. VALUE_DESERIALIZER_CLASS_CONFIG ,

"org.apache.kafka测试数据mon.serialization.StringDeserializer");

return ?properties;

}

消费者线程组:

/**

?* AnalysisEngine——HttpKafkaConsumerGroup

?*

?* @author ?ZhangChi

?* @date ?2018年6月11日---下午6:20:47

?* @version ?1.0

?*/

@Service("httpKafkaConsumerGroup")

public ? class ?HttpKafkaConsumerGroup {

@Autowired

private ?RequestAnalyzer requestAnalyzer;

@Autowired

private ?EsDocumentServiceImpl esDocumentServiceImpl;

@Autowired

private ?AnalysisEngineClient analysisEngineClient;

@Autowired

private ?MongoTemplate mongoTemplate;

private ?List httpKafkaConsumerList?= new ?ArrayList();

public ? void ?initHttpKafkaConsumerGroup( int ?consumerNumber, RunModeEnum mode) {

for ?( int ?i?= 0; i?< consumerNumber; i++) {

/**

?* 将注入的服务当做构造参数,这样保证每个子线程都能拿到服务实例而不是空指针!

?*/

HttpKafkaConsumer consumerThread?= new ?HttpKafkaConsumer(requestAnalyzer, esDocumentServiceImpl, mode, analysisEngineClient, mongoTemplate);

httpKafkaConsumerList.add(consumerThread);

}

}

public ? void ?consumeGroupStart() {

for ?(HttpKafkaConsumer item?: httpKafkaConsumerList) {

LogConstant. runLog .info("httpKafkaConsumerList size : "?+ httpKafkaConsumerList.size());

Thread consumerThread?= new ?Thread(item);

consumerThread.start();

}

}

}

先逐个初始化消费者实例,然后将这些消费者加入到消费组列表中。消费组启动后,会循环产生消费者线程。

?

关于php-kafka教程的介绍到此就结束了,不知道本篇文章是否对您有帮助呢?如果你还想了解更多此类信息,记得收藏关注本站,我们会不定期更新哦。

查看更多关于php-kafka教程 php kafka的详细内容...

声明:本文来自网络,不代表【好得很程序员自学网】立场,转载请注明出处:http://haodehen.cn/did196019
更新时间:2023-04-26   阅读:26次

上一篇: php手机访问判断 php判断安卓还是ios

下一篇:php怎么设置abr php怎么设置背景

最新资料更新

  • 1.名称占位符php 占位符html
  • 2.php下载控件 php下载器
  • 3.php防止ajax接口 php防止接口多次请求
  • 4.搭建分站源码php 建立分站怎么建
  • 5.基于PHP日记网站 phpstudy网站日志
  • 6.如何阅读php源码 php在线源码获取
  • 7.php数组键名排序 php数组值排序
  • 8.包含asp和php互通的词条
  • 9.包含php-vcmd的词条
  • 10.phpapp页面 php app
  • 11.免费的php解密 php des解密
  • 12.php项目任务分配 php任务调度框架
  • 13.php使用嵌套for php解析嵌套json
  • 14.php河内塔问题 河内塔算法
  • 15.php文件工具类 php文件处理
  • 16.php新浪微博开发 微博开发工具
  • 17.包含php-fpm-t的词条
  • 18.php变下载文件 php 下载文件
  • 19.手机打开php乱码 php乱码怎么办
  • 20.php系统源代码下载 php源码免费下载

CopyRight:2016-2025好得很程序员自学网 备案ICP:湘ICP备09009000号-16 http://haodehen.cn
本站资讯不构成任何建议,仅限于个人分享,参考须谨慎!
本网站对有关资料所引致的错误、不确或遗漏,概不负任何法律责任。
本网站刊载的所有内容(包括但不仅限文字、图片、LOGO、音频、视频、软件、程序等)版权归原作者所有。任何单位或个人认为本网站中的内容可能涉嫌侵犯其知识产权或存在不实内容时,请及时通知本站,予以删除。

网站内容来源于网络分享,如有侵权发邮箱到:kenbest@126.com,收到邮件我们会即时下线处理。
网站框架支持:HDHCMS   51LA统计 百度统计
Copyright © 2018-2025 「好得很程序员自学网」
[ SiteMap ]