很多站长朋友们都不太清楚php-kafka教程,今天小编就来给大家整理php-kafka教程,希望对各位有所帮助,具体内容如下:
本文目录一览: 1、 消息中间件Kafka - PHP操作使用Kafka 2、 Kafka相关内容总结(Kafka集群搭建手记) 3、 Kafaka入门(1)- Kafka简介和安装与启动(mac) 4、 服务端技术实战系列——Kafka篇 消息中间件Kafka - PHP操作使用Kafkacd librdkafka/
./configure make make install
安装成功界面 没有报错就是安装成功
Kafka相关内容总结(Kafka集群搭建手记)Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
入门请参照:
在此不再赘述。
这部分不是本文的重点,但是kafka需要用到kafka集群,所以先搭建kafka集群。
从kafka官方文档看到,kafka似乎在未来的版本希望抛弃zookeep集群,自己维护集群的一致性,拭目以待吧。
我们搭建集群使用的是三台同机房的机器,因为zookeeper不怎么占资源也不怎么占空间(我们的业务目前比较简单),所以三台机器上都搭建了zookeeper集群。
搭建zookeeper集群没什么难度,参考文档:
下面列一下我的配置并解析:
一共用三台物理机器,搭建一个Kafka集群。
每台服务器的硬盘划分都是一样的,每个独立的物理磁盘挂在一个单独的分区里面,这样很方便用于Kafka多个partition的数据读写与冗余。
/data1比较小,为了不成为集群的瓶颈,所以/data1用于存放kafka以及Zookeeper
每台机器的磁盘分布如下:
下面是kafka的简单配置,三台服务器都一样,如有不一致的在下文有说明。
kafka安装在目录/usr/local/kafka/下,下面的说明以10.1.xxx.57为例。
最重要的配置文件server.properties,需要配置的信息如下:
从上面的配置看到,kafka集群不需要像hadoop集群那样,配置ssh通讯,而且一个kafka服务器(官方文档称之为broker,下面统一使用这个称呼)并不知道其他的kafka服务器的存在,因此你需要逐个broker去启动kafka。各个broker根据自己的配置,会自动去配置文件上的zk服务器报到,这就是一个有zk服务器粘合起来的kafka集群。
我写了一个启动脚本,放在 /usr/local/kafka/bin 下面。启动脚本每个broker都一样:
如同kafka集群里面每一个broker都需要单独启动一样,kafka集群里面每一个broker都需要单独关闭。
官方给出的关闭脚本是单独运行 bin/kafka-server-stop.sh
但是我运行的结果是无法关闭。打开脚本一看,才发现是最简单的办法,发一个TERM信号到kafka的java进程,官方脚本给出的grep有点问题。
发信号之后,一直tail着kafka日志,看到正常关闭。
指定zookeeper服务器,topic名称是LvsKafka(注意topic名称不能有英文句号(.)和下划线(_),否则会通不过,理由是名称会冲突,下文对此略有解析)
replication-factor指出重复因子是2,也就是每条数据有两个拷贝,可靠性考虑。
partitions 指出需要多少个partition,数据量大的多一点,无论生产和消费,这是负载均衡和高并发的需要。
可以看到刚才新建的24个partition,比如partition 5, 他的leader是broker 59,也就是10.1.xxx.59这台机器。
建立topic时我们指出需要2个拷贝,从上面的输出的Replicas字段看到,这两个拷贝放在59,58两个机器,也就是10.1.xxx.59和10.1.xxx.58.
Isr表示当前partition的所有拷贝所在的机器中,哪些是还活着(可以提供服务)的。现在是59和58都还存活。
这个命令另外还会看到一些类似于下面的内容:
__consumer_offsets到底是什么呢?其实就是客户端的消费进度,客户端会定时上报到kafka集群,而kafka集群会把每个客户端的消费进度放入一个自己内部的topic中,这个topic就是__consumer_offsets。我查看过__consumer_offsets的内容,其实就是每个客户端的消费进度作为一条消息,放入__consumer_offsets这个topic中。
这里给了我们两个提示:
1、kafka自己管理客户端的消费进度,而不是依靠zk,这就是kafka官方文档说的kafka未来会抛弃zk的底气之一;
2、留意到这个kafka自己的topic是带下划线的,也就是,kafka担心我们自己建的topic如果带下划线的话会跟这些内部自用的topic冲突;
Kafaka入门(1)- Kafka简介和安装与启动(mac)Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。kafka 是一个高性能的消息队列,也是一个分布式流处理平台。
kafka中文网
kafka官网
Producer :Producer即生产者,消息的产生者,是消息的入口。
kafka cluster :
Broker :Broker是kafka实例,每个服务器上有一个或多个kafka的实例,姑且认为每个broker对应一台服务器。一个集群由多个broker组成,集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
Topic :消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
Partition :Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。 同一个topic在不同的分区的数据是不重复的 ,partition的表现形式就是一个一个的文件夹!
Replication : 每一个分区都有多个副本 ,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Message :每一条发送的消息主体。
Consumer :消费者,即消息的消费方,是消息的出口。
Consumer Group :将多个消费组成一个消费者组。在kafka的设计中 同一个分区的数据只能被同一消费者组中的某一个消费者消费 。Partition 的分配问题,即确定哪个 Partition 由哪个 Consumer 来消费。Kafka 有两种分配策略,一个是 RoundRobin,一个是 Range,默认为Range。
一个消费者组内也可以订阅多个topic
多个消费组可以订阅同一个topic 。
Zookeeper :kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
使用brew进行安装,非常方便。
ZooKeeper是一个分布式的,开放源码的 分布式应用程序协调服务 ,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
kafka是基于zookeeper的,启动kafka之前,需要先启动zookeeper
查看启动是否成功
启动kafka
查看启动是否成功
查看topic列表
新起一个终端,作为生产者,用于发送消息,每一行算一条消息,将消息发送到kafka服务器
新起一个终端作为消费者,接收消息
服务关闭的顺序是先kafka,然后zookeeper
再过半小时,你就能明白kafka的工作原理了
Kafka架构原理,也就这么回事!
服务端技术实战系列——Kafka篇一.概念原理
[if !supportLists]1.?[endif]主题(topic):主题是对消息的分类。
[if !supportLists]2.?[endif]消息(message):消息是kafka通信的基本单位。
[if !supportLists]3.?[endif]分区(partition): 一组 消息对应 一个 主题, 一个 主题对应 一个或多个 分区。每个分区为一系列有序消息组成的 有序队列 ;每个分区在物理上对应一个文件夹。
[if !supportLists]4.?[endif]副本(replica):每个分区有 一个或多个 副本,分区的副本分布在集群的 不同 代理(机器)上,以提高可用性;分区的副本与日志对象是一一对应的。
[if !supportLists]5.?[endif]Kafka只保证一个 分区内 的消息 有序性 ,不保证跨分区消息的有序性。消息被追加到相应分区中, 顺序写入磁盘 ,效率非常高。
[if !supportLists]6.?[endif]Kafka选取某个某个分区的 一个 副本作为leader副本,该分区的 其他 副本为follower副本。 只有leader副本负责处理客户端读/写请求 ,follower副本从leader副本同步数据。
[if !supportLists]7.?[endif]任何发布到分区的消息都会追加到日志文件的尾部, 每条消息 在日志文件中的 位置 都对应一个 按序递增的偏移量 ;偏移量在一个分区下严格有序。
[if !supportLists]8.?[endif]Kafka不允许对消息进行随机读写。
[if !supportLists]9.?[endif]新版消费者将 消费偏移量 保存到kafka内部的一个主题中。
[if !supportLists]10.?[endif]Kafka集群由 一个或多个代理 (Broker,也称为kafka实例)构成。可以在 一台 服务器上配置 一个或多个代理 ,每个代理具有唯一标识broker.id。
[if !supportLists]11.?[endif]生产者将消息 发送给代理 (Broker)。
[if !supportLists]12.?[endif]消费者以 拉取 (pull)方式拉取数据,每个消费者都属于一个消费组。
[if !supportLists]13.?[endif]同一个主题的一条消息只能被 同一个消费组 下的某一个消费者消费,但 不同消费组 的消费者可以 同时 消费该消息。
[if !supportLists]14.?[endif]消息 广播 :指定各消费者属于不同消费组;消息 单播 :指定各消费者属于同一个消费组。
[if !supportLists]15.?[endif]Kafka启动时在Zookeeper上创建相应节点来保存 元数据 ,元数据包括:代理节点信息、集群信息、主题信息、分区状态信息、分区副本分配方案、动态配置等;
[if !supportLists]16.?[endif]Kafka通过 监听 机制在节点注册监听器来监听节点元数据变化;
[if !supportLists]17.?[endif]Kafka将数据写入 磁盘 ,以文件系统来存数据;
[if !supportLists]18.?[endif]生产环境一般将zookeeper集群和kafka集群 分机架 部署;
[if !supportLists]二.[endif] Kafka Producer
配置:
/**
?* xTestProxy——KafkaConfigConstant
?*
?* @author ?ZhangChi
?* @date ?2018年6月20日---下午5:50:44
?* @version ?1.0
?*/
public ? class ?KafkaConfigConstant {
public ? static ? final ?String KAFKA_CLUSTER ?= "fa-common1.hangzhou-1.kafka.internal.lede测试数据:9200,fa-common2.hangzhou-1.kafka.internal.lede测试数据:9200,fa-common3.hangzhou-1.kafka.internal.lede测试数据:9200";
}
生产者配置:
/**
?* xTestProxy——HttpKafkaProducerFactory
?*
?* @author ?ZhangChi
?* @date ?2018年6月11日---下午2:37:51
?* @version ?1.0
?*/
public ? class ?HttpKafkaProducerFactory {
// 真正的KafkaProducer仅有一份
private ? static ?KafkaProducer kafkaProducer ?= null ;
private ? static ?Properties property ;
public ? static ?KafkaProducer getKafkaProducer() {
if ?( kafkaProducer ?== null ) {
synchronized ?(HttpKafkaProducerFactory. class ) {
if ?( kafkaProducer ?== null ) {
property ?= buildKafkaProperty ();
kafkaProducer ?= new ?KafkaProducer( property );
}
}
}
return ? kafkaProducer ;
}
public ? static ?Properties buildKafkaProperty() {
Properties props?= new ?Properties();
props.put(ProducerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER );
props.put(ProducerConfig. ACKS_CONFIG , "all");
props.put(ProducerConfig. RETRIES_CONFIG , 0);
props.put(ProducerConfig. BATCH_SIZE_CONFIG , 16384);
props.put(ProducerConfig. BUFFER_MEMORY_CONFIG , 33554432);
props.put(ProducerConfig. LINGER_MS_CONFIG , 1);
props.put(ProducerConfig. KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka测试数据mon.serialization.StringSerializer");
props.put(ProducerConfig. VALUE_SERIALIZER_CLASS_CONFIG ,
"org.apache.kafka测试数据mon.serialization.StringSerializer");
return ?props;
}
}
生产者线程组:
/**
?* xTestProxy——HttpKafkaProducerThread
?* 多线程每次new一个实例
?*
?* @author ?ZhangChi
?* @date ?2018年6月25日---下午2:09:39
?* @version ?1.0
?*/
public ? class ?HttpKafkaProducerThread implements ?Runnable {
private ? static ?Logger logger ?= LoggerFactory. getLogger ("HttpKafkaProducerThread");
private ? final ?String KAFKA_TOPIC?= KafkaConstant. HTTP_REQ_RESP_TOPIC ;
private ?String kafkaMessageJson;
private ?KafkaProducer producer;
public ?String messageType;
public ?String originalMessage;
private ? static ?KafkaMessage kafkaMessage ?= new ?KafkaMessage();
public ?HttpKafkaProducerThread(KafkaProducer producer, String messageType, String originalMessage) {
this .producer?= producer;
this .messageType?= messageType;
this .originalMessage?= originalMessage;
}
@Override
public ? void ?run() {
// TODO ?Auto-generated method stub
/* 1.构建kafka消息*/
kafkaMessageJson?= generateKafkaMessage( this .messageType, this .originalMessage);
/* 2.发送kafka消息*/
if ?(kafkaMessageJson?!= null ? !StringUtils. isEmpty (kafkaMessageJson)) {
logger .info("create message start:"?+ kafkaMessageJson);
producer.send( new ?ProducerRecord( this .KAFKA_TOPIC, kafkaMessageJson));
} else ?{
logger .info("kafkaMessageJson is null!");
}
}
private ?String generateKafkaMessage(String messageType, String originalMessage) {
if ?(StringUtils. isBlank (messageType) || StringUtils. isBlank (originalMessage)) {
return ? null ;
}
kafkaMessage .setMessageId(KafkaMessageUtils. generateId ());
kafkaMessage .setMessageTime(KafkaMessageUtils. generateTime ());
kafkaMessage .setMessageType(messageType);
kafkaMessage .setMessage(originalMessage);
String kafkaMessageToJson?= null ;
try ?{
kafkaMessageToJson?= KafkaMessageUtils. objectToJson ( kafkaMessage );
} catch ?(JsonProcessingException e) {
// TODO ?Auto-generated catch block
e.printStackTrace();
}
kafkaMessageJson?= kafkaMessageToJson;
return ?kafkaMessageToJson;
}
}
[if !supportLists]三.[endif] Kafka Consumer
消费者配置:
private ? static ?Properties buildKafkaProperty() {
Properties properties?= new ?Properties();
// 测试环境kafka的端口号是9200
properties.put(ConsumerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER );
// 消费组名称
properties.put(ConsumerConfig. GROUP_ID_CONFIG , KafkaConfigConstant. GROUP_ID );
properties.put(ConsumerConfig. CLIENT_ID_CONFIG , "test");
// 从头消费
properties.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG , "earliest");
// 自动提交偏移量
properties.put(ConsumerConfig. ENABLE_AUTO_COMMIT_CONFIG , "true");
// 时间间隔1s
properties.put(ConsumerConfig. AUTO_COMMIT_INTERVAL_MS_CONFIG , "1000");
properties.put(ConsumerConfig. KEY_DESERIALIZER_CLASS_CONFIG ,
"org.apache.kafka测试数据mon.serialization.StringDeserializer");
properties.put(ConsumerConfig. VALUE_DESERIALIZER_CLASS_CONFIG ,
"org.apache.kafka测试数据mon.serialization.StringDeserializer");
return ?properties;
}
消费者线程组:
/**
?* AnalysisEngine——HttpKafkaConsumerGroup
?*
?* @author ?ZhangChi
?* @date ?2018年6月11日---下午6:20:47
?* @version ?1.0
?*/
@Service("httpKafkaConsumerGroup")
public ? class ?HttpKafkaConsumerGroup {
@Autowired
private ?RequestAnalyzer requestAnalyzer;
@Autowired
private ?EsDocumentServiceImpl esDocumentServiceImpl;
@Autowired
private ?AnalysisEngineClient analysisEngineClient;
@Autowired
private ?MongoTemplate mongoTemplate;
private ?List httpKafkaConsumerList?= new ?ArrayList();
public ? void ?initHttpKafkaConsumerGroup( int ?consumerNumber, RunModeEnum mode) {
for ?( int ?i?= 0; i?< consumerNumber; i++) {
/**
?* 将注入的服务当做构造参数,这样保证每个子线程都能拿到服务实例而不是空指针!
?*/
HttpKafkaConsumer consumerThread?= new ?HttpKafkaConsumer(requestAnalyzer, esDocumentServiceImpl, mode, analysisEngineClient, mongoTemplate);
httpKafkaConsumerList.add(consumerThread);
}
}
public ? void ?consumeGroupStart() {
for ?(HttpKafkaConsumer item?: httpKafkaConsumerList) {
LogConstant. runLog .info("httpKafkaConsumerList size : "?+ httpKafkaConsumerList.size());
Thread consumerThread?= new ?Thread(item);
consumerThread.start();
}
}
}
先逐个初始化消费者实例,然后将这些消费者加入到消费组列表中。消费组启动后,会循环产生消费者线程。
?
关于php-kafka教程的介绍到此就结束了,不知道本篇文章是否对您有帮助呢?如果你还想了解更多此类信息,记得收藏关注本站,我们会不定期更新哦。
查看更多关于php-kafka教程 php kafka的详细内容...