Kafka是由LinkedIn公司采用Scala语言开发的一个多分区多副本且基于ZooKeeper协调的分布式消息系统。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。

基本概念

      如下图展示了一个典型的Kafka体系架构,包括若干Producer、若干Broker、若干Consumer,以及一个ZooKeeper集群。其中ZooKeeper是Kafka用来负责集群元数据的管理、控制0器的选举等操作的。Producer将消息发送到Broker,Broker负责将收到的消息存储到磁盘中,而Consumer负责从Broker订阅并消费消息。
请输入图片描述

      下面简要介绍一下Kafka中的几个重要概念:

  • Producer: 生产者,即发送消息的一方。生产者负责创建消息,然后将其投递到Kafka中。
  • Consumer: 消费者,即接收消息的一方。消费者连接到Kafka上并接收消息,进而进行相应的业务逻辑处理。
  • Broker: 服务代理节点。Broker可以简单地看作一个独立的Kafka服务节点或服务实例,大多数情况下也可以将Broker看作一台Kafka服务器。一个或多个Broker组成了一个Kafka集群。
  • Topic: 主题。Kafka中的消息以主题为单位进行分类,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都需要指定一个主题),而消费者负责订阅主题并进行消费。
  • Partition: 分区。主题只是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一个主题下的不同分区包含的消息是不同的。


offset是消息在分区中的唯一标识。

Kafka通过offset来保证消息在分区内的顺序性。但是,由于offset不跨越分区,所以Kafka保证的是分区有序而不是主题有序。

      如下图所示,主题中有4个分区,消息被顺序追加到每个分区日志文件的尾部。Kafka中的分区可以分布在不同的服务器(broker)上,所以一个主题可以跨越多个broker,通过增加分区的数量可以实现水平扩展。
请输入图片描述

多副本(Replica)机制

      Kafka为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。同分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是"一主多从"的关系,其中,leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务。

Kafka通过多副本机制实现了故障的自动转移,当Kafka集群中某个broker失效时仍然能保证服务可用。

      如下图所示,Kafka集群中有4个broker,某个主题中有3个分区,且副本个数(副本因子)为3。生产者和消费者只与leader副本进行交互,而follower副本只负责消息同步。
注意:很多时候follower副本中的消息相对于leader副本而言会有一定的滞后性

请输入图片描述

AR、ISR与OSR

      分区中的所有副本统称为AR(Assigned Replicas)。所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas),ISR集合时AR集合中的一个子集。"一定程度同步"是指可忍受的之后范围,这个范围可以通过参数进行配置。与leader副本同步之后过多的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas),不能得出AR与ISR、OSR之间的关系:AR=ISR+OSR。正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR=ISR,OSR为空。
请输入图片描述

      leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态,当follower副本落后太多或失效时,leader副本会把它从ISR集合中剔除。如果OSR集合中有follower副本"追上"了leader副本时,那么leader副本会把它从OSR集合中转移至ISR集合。

默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader,而在OSR集合中的副本则没有任何机会。(此规则可以修改哦~)

HW于LEO

      HW是High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量,消费者只能拉取到这个offset之前的消息。
      LEO是Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offset。
      如下图说明了HW和LEO。此日志文件中共有9条消息,第一条消息的offset(LogStartOffset)为0,最后一条消息的offset为8,offset为9的位置表示下一条待写入消息的位置。日志文件的HW为6,表示消费者只能拉取offset在0至5之间的消息,而offset为6的消息对消费者而言是不可见的。
      你也许会很奇怪,为什么要有HW和LEO呢?HW与LEO之间的不能读取吗?如果只有一个副本那当然没问题,问题是我们有多个副本,每个副本可读的范围是不同的(因为同步的滞后),所以HW是ISR集合中最小的LEO。
请输入图片描述

相关软件下载链接:https://pan.baidu.com/s/1rW-qrxq-TxJHA6ot7UvICg 提取码:9i1m

安装与配置

      搭建Kafka运行环境还需要设计ZooKeeper,Kafka和ZooKeeper都是运行在JVM之上的服务,所以先需要安装JDK。

JDK的安装和配置

cd /opt && mkdir Kafka
tar -zxvf jdk-8u181-linux-x64.tar.gz 
vim /etc/profile
# 追加如下内容
export JAVA_HOME=/opt/Kafka/jdk1.8.0_181
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=./://$JAVA_HOME/lib:$JRE_HOME/lib

source /etc/profile
# 查看JDK版本信息,验证是否安装成功
java -version

ZooKeeper的安装和配置

      ZooKeeper是安装Kafka集群的必要组件,Kafka通过ZooKeeper来实施对元数据信息的管理,包括集群、broker、主题、分区等内容。

tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
vim /etc/profile
# 追加如下内容
export ZOOKEEPER_HOME=/opt/Kafka/apache-zookeeper-3.5.7-bin
export PATH=$PATH:$ZOOKEEPER_HOME/bin

source /etc/profile

      修改ZooKeeper的配置文件,进入$ZOOKEEPER_HOME/conf目录,将zoo_sample.cfg文件修改为zoo.cfg。zoo.cfg文件的内容参考如下:

# ZooKeeper服务器心跳时间,单位为ms
tickTime=2000

# 投票选举新leader的初始化时间
initLimit=10

# leader与follower心跳检测最大容忍时间,响应超过syncLimit*tickTime,leader认为
# follower挂掉,从服务器列表中删除follower
syncLimit=5

# 数据目录
dataDir=/tmp/zookeeper/data

# 日志目录
dataLogDir=/tmp/zookeeper/log

# ZooKeeper对外服务端口
clientPore=2181

      系统中没有/tmp/zookeeper/data/tmp/zookeeper/log这两个目录,所以需要首先创建。在/tmp/zookeeper/data下创建一个myid文件,并写入一个数值,比如0。myid文件里存放的是服务器的编号。
      通过zkServer.sh start启动ZK,详情如下:

[root@kafka-01 bin]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/Kafka/apache-zookeeper-3.5.7-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

      我们可以通过使用zkSetver.sh status命令查看ZK服务的状态。

[root@kafka-01 bin]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/Kafka/apache-zookeeper-3.5.7-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: standalone

这里我们搭建的不是ZooKeeper集群,如果需要搭建ZK集群请看下面这篇文章哦

Kafka的安装和配置

      完成JDK和ZK的安装后,终于迎来了Kafka的安装与配置。

tar -zxvf kafka_2.11-2.4.0.tgz
vim /etc/profile
# 追加如下内容
export KAFKA_HOME=/opt/Kafka/kafka_2.11-2.4.0
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

      修改broker的配置文件$KAFKA_HOME/config/server.properties。主要关注一下几个参数即可:

# broker的编号,如果集群中有多个broker,每个broker的编号都需要设置的不同
broker.id=0

# broker对外提供的服务入口地址
listeners=PLAINTEXT://192.168.233.134:9092

# 存放消息日志文件的地址
log.dirs=/tmp/kafka-logs

# Kafka所需的ZooKeeper集群地址
zookeeper.connect=192.168.233.134:2181/kafka

      启动Kafka,kafka-server-start.sh -daemon /opt/Kafka/kafka_2.11-2.4.0/config/server.properties-daemon表示后台运行。使用jps -l查看Kafka进程是否启动。

[root@kafka-01 tmp]# jps -l
10437 sun.tools.jps.Jps
10408 kafka.Kafka # 这个就是Kafka进程
9820 org.apache.zookeeper.server.quorum.QuorumPeerMain

生产与消费

创建主题

      由前文可知,生产者将消息发送到Kafka的主题分区中,消费者通过订阅主题从而消费消息,所以在演示生产与消费之前需要创建一个主题作为消息的载体。Kafka提供了许多使用的工具,存放在$KAFKA_HOME/bin目录下,我们可以使用 kafka-topics.sh 创建一个分区数为 4 ,副本因子为 3 的主题——topic-demo

[root@kafka-01 opt]# kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-demo --replication-factor 3 --partitions 4
Created topic topic-demo.
  • --zookeeper:指定Kafka连接的ZooKeeper服务地址
  • --topic:指定所创建主题的名称
  • --replication-factor:副本因子,此数目不能大于broker数目
  • --partitions:分区个数
  • --create:创建主题的动作指令

      还可以通过--describe显示主题的更多具体信息。

[root@kafka-01 opt]# kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-demo
Topic: topic-demo    PartitionCount: 4    ReplicationFactor: 3    Configs: 
    Topic: topic-demo    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: topic-demo    Partition: 1    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: topic-demo    Partition: 2    Leader: 0    Replicas: 0,1,2    Isr: 0
    Topic: topic-demo    Partition: 3    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2

生产消息

      使用kafka-console-producer.sh脚本工具生产消息。如下发送了一条消息"Hello world"至主题"topic-demo"。

[root@kafka-01 opt]# kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo
>Hello World

消费消息

      另起shell中端,通过kafka-console-consumer.sh脚本来订阅主题topic-demo。

[root@kafka-01 opt]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo

你会发现刚刚生产者生产的消息怎么无法消费到呢!难道错了吗?当然没有,这部分知识在后面介绍,你只需要再生产一条即可。

JAVA示例代码

      导入依赖。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>

生产者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerSample {
    public static final String brokerList = "192.168.233.134:9092";
    public static final String topic = "topic-demo";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers",brokerList);

        //配置生产者客户端参数并创建KafkaProducer实例
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        //构建所需要发送的消息
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello World");
        //发送消息
        kafkaProducer.send(record);

        //关闭生产者客户端实例
        kafkaProducer.close();
    }
}

消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerSample {
    public static final String brokerList = "192.168.233.134:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers",brokerList);

        //设置消费组名称
        properties.put("group.id",groupId);

        //创建一个消费者客户端实例
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        //订阅主题
        kafkaConsumer.subscribe(Collections.singletonList(topic));

        //循环消费消息
        while (true){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}
Last modification:May 4th, 2021 at 10:48 pm
如果觉得我的文章对你有用,请随意赞赏