Kafka是由LinkedIn公司采用Scala语言开发的一个多分区、多副本且基于ZooKeeper协调的分布式消息系统。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
基本概念
如下图展示了一个典型的Kafka体系架构,包括若干Producer、若干Broker、若干Consumer,以及一个ZooKeeper集群。其中ZooKeeper是Kafka用来负责集群元数据的管理、控制0器的选举等操作的。Producer将消息发送到Broker,Broker负责将收到的消息存储到磁盘中,而Consumer负责从Broker订阅并消费消息。
下面简要介绍一下Kafka中的几个重要概念:
- Producer: 生产者,即发送消息的一方。生产者负责创建消息,然后将其投递到Kafka中。
- Consumer: 消费者,即接收消息的一方。消费者连接到Kafka上并接收消息,进而进行相应的业务逻辑处理。
- Broker: 服务代理节点。Broker可以简单地看作一个独立的Kafka服务节点或服务实例,大多数情况下也可以将Broker看作一台Kafka服务器。一个或多个Broker组成了一个Kafka集群。
- Topic: 主题。Kafka中的消息以主题为单位进行分类,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都需要指定一个主题),而消费者负责订阅主题并进行消费。
- Partition: 分区。主题只是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一个主题下的不同分区包含的消息是不同的。
如下图所示,主题中有4个分区,消息被顺序追加到每个分区日志文件的尾部。Kafka中的分区可以分布在不同的服务器(broker)上,所以一个主题可以跨越多个broker,通过增加分区的数量可以实现水平扩展。
多副本(Replica)机制
Kafka为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。同分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是"一主多从"的关系,其中,leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务。
如下图所示,Kafka集群中有4个broker,某个主题中有3个分区,且副本个数(副本因子)为3。生产者和消费者只与leader副本进行交互,而follower副本只负责消息同步。

AR、ISR与OSR
分区中的所有副本统称为AR(Assigned Replicas)。所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas),ISR集合时AR集合中的一个子集。"一定程度同步"是指可忍受的之后范围,这个范围可以通过参数进行配置。与leader副本同步之后过多的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas),不能得出AR与ISR、OSR之间的关系:AR=ISR+OSR。正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR=ISR,OSR为空。
leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态,当follower副本落后太多或失效时,leader副本会把它从ISR集合中剔除。如果OSR集合中有follower副本"追上"了leader副本时,那么leader副本会把它从OSR集合中转移至ISR集合。
HW于LEO
HW是High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量,消费者只能拉取到这个offset之前的消息。
LEO是Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offset。
如下图说明了HW和LEO。此日志文件中共有9条消息,第一条消息的offset(LogStartOffset)为0,最后一条消息的offset为8,offset为9的位置表示下一条待写入消息的位置。日志文件的HW为6,表示消费者只能拉取offset在0至5之间的消息,而offset为6的消息对消费者而言是不可见的。
你也许会很奇怪,为什么要有HW和LEO呢?HW与LEO之间的不能读取吗?如果只有一个副本那当然没问题,问题是我们有多个副本,每个副本可读的范围是不同的(因为同步的滞后),所以HW是ISR集合中最小的LEO。
安装与配置
搭建Kafka运行环境还需要设计ZooKeeper,Kafka和ZooKeeper都是运行在JVM之上的服务,所以先需要安装JDK。
JDK的安装和配置
cd /opt && mkdir Kafka
tar -zxvf jdk-8u181-linux-x64.tar.gz
vim /etc/profile
# 追加如下内容
export JAVA_HOME=/opt/Kafka/jdk1.8.0_181
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=./://$JAVA_HOME/lib:$JRE_HOME/lib
source /etc/profile
# 查看JDK版本信息,验证是否安装成功
java -version
ZooKeeper的安装和配置
ZooKeeper是安装Kafka集群的必要组件,Kafka通过ZooKeeper来实施对元数据信息的管理,包括集群、broker、主题、分区等内容。
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
vim /etc/profile
# 追加如下内容
export ZOOKEEPER_HOME=/opt/Kafka/apache-zookeeper-3.5.7-bin
export PATH=$PATH:$ZOOKEEPER_HOME/bin
source /etc/profile
修改ZooKeeper的配置文件,进入$ZOOKEEPER_HOME/conf目录,将zoo_sample.cfg文件修改为zoo.cfg。zoo.cfg文件的内容参考如下:
# ZooKeeper服务器心跳时间,单位为ms
tickTime=2000
# 投票选举新leader的初始化时间
initLimit=10
# leader与follower心跳检测最大容忍时间,响应超过syncLimit*tickTime,leader认为
# follower挂掉,从服务器列表中删除follower
syncLimit=5
# 数据目录
dataDir=/tmp/zookeeper/data
# 日志目录
dataLogDir=/tmp/zookeeper/log
# ZooKeeper对外服务端口
clientPore=2181
系统中没有/tmp/zookeeper/data
和/tmp/zookeeper/log
这两个目录,所以需要首先创建。在/tmp/zookeeper/data
下创建一个myid
文件,并写入一个数值,比如0。myid文件里存放的是服务器的编号。
通过zkServer.sh start
启动ZK,详情如下:
[root@kafka-01 bin]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/Kafka/apache-zookeeper-3.5.7-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
我们可以通过使用zkSetver.sh status
命令查看ZK服务的状态。
[root@kafka-01 bin]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/Kafka/apache-zookeeper-3.5.7-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: standalone

Kafka的安装和配置
完成JDK和ZK的安装后,终于迎来了Kafka的安装与配置。
tar -zxvf kafka_2.11-2.4.0.tgz
vim /etc/profile
# 追加如下内容
export KAFKA_HOME=/opt/Kafka/kafka_2.11-2.4.0
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
修改broker的配置文件$KAFKA_HOME/config/server.properties
。主要关注一下几个参数即可:
# broker的编号,如果集群中有多个broker,每个broker的编号都需要设置的不同
broker.id=0
# broker对外提供的服务入口地址
listeners=PLAINTEXT://192.168.233.134:9092
# 存放消息日志文件的地址
log.dirs=/tmp/kafka-logs
# Kafka所需的ZooKeeper集群地址
zookeeper.connect=192.168.233.134:2181/kafka
启动Kafka,kafka-server-start.sh -daemon /opt/Kafka/kafka_2.11-2.4.0/config/server.properties
。-daemon
表示后台运行。使用jps -l
查看Kafka进程是否启动。
[root@kafka-01 tmp]# jps -l
10437 sun.tools.jps.Jps
10408 kafka.Kafka # 这个就是Kafka进程
9820 org.apache.zookeeper.server.quorum.QuorumPeerMain
生产与消费
创建主题
由前文可知,生产者将消息发送到Kafka的主题分区中,消费者通过订阅主题从而消费消息,所以在演示生产与消费之前需要创建一个主题作为消息的载体。Kafka提供了许多使用的工具,存放在$KAFKA_HOME/bin
目录下,我们可以使用 kafka-topics.sh
创建一个分区数为 4 ,副本因子为 3 的主题——topic-demo
。
[root@kafka-01 opt]# kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-demo --replication-factor 3 --partitions 4
Created topic topic-demo.
- --zookeeper:指定Kafka连接的ZooKeeper服务地址
- --topic:指定所创建主题的名称
- --replication-factor:副本因子,此数目不能大于broker数目
- --partitions:分区个数
- --create:创建主题的动作指令
还可以通过--describe
显示主题的更多具体信息。
[root@kafka-01 opt]# kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-demo
Topic: topic-demo PartitionCount: 4 ReplicationFactor: 3 Configs:
Topic: topic-demo Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: topic-demo Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: topic-demo Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0
Topic: topic-demo Partition: 3 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
生产消息
使用kafka-console-producer.sh
脚本工具生产消息。如下发送了一条消息"Hello world"至主题"topic-demo"。
[root@kafka-01 opt]# kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo
>Hello World
消费消息
另起shell中端,通过kafka-console-consumer.sh
脚本来订阅主题topic-demo。
[root@kafka-01 opt]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo
JAVA示例代码
导入依赖。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerSample {
public static final String brokerList = "192.168.233.134:9092";
public static final String topic = "topic-demo";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers",brokerList);
//配置生产者客户端参数并创建KafkaProducer实例
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//构建所需要发送的消息
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello World");
//发送消息
kafkaProducer.send(record);
//关闭生产者客户端实例
kafkaProducer.close();
}
}
消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerSample {
public static final String brokerList = "192.168.233.134:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers",brokerList);
//设置消费组名称
properties.put("group.id",groupId);
//创建一个消费者客户端实例
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
//订阅主题
kafkaConsumer.subscribe(Collections.singletonList(topic));
//循环消费消息
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
版权属于:带翅膀的猫
本文链接:https://www.chengpengper.cn/archives/154/
转载时须注明出处及本声明