Kafka学习笔记
创始人
2025-05-29 14:27:39
0

kafka和消息队列的区别

常见的消息队列有 activemq ,rabbitmq,rocketmq;

消息队列常用于两个系统之间的数据传递,可以点对点(一对一聊天) 也可以订阅模式(群组)

kafka是订阅模式。

队列是有序的,先进先出

kafka是无序的,全局无序,分区内有序,也可以让他变成有序的

一、只用一个分区

二、多个分区,但是producer写入按照顺序写,然后consumer按照顺序读取

消息系统主要作用?

异步:同步会阻塞,如果某些逻辑没有先后顺序,不影响主逻辑,可以异步处理(开线程)

削峰:当访问数量激增,采用消息系统,可以先将用户的请求缓存起来,慢慢处理,否则会崩溃

解耦:如果以后想添加逻辑,可以单独的写逻辑代码,只需要读取kafka的数据就好,否则还得重构代码

kafka有什么优点

  • 高吞吐,低延迟,kafka每秒可以处理几十万的消息数据,延迟只有几毫秒。

  • topic可以分区,而且消费者可以组成消费者组,同步消费不同分区的数据

  • 支持热扩展

  • kafka把数据缓存到内存的同时,会持久化到磁盘。安全可靠

  • 高容错性,一个topic处于isr的分区副本只要有一个还能用,这个topic就能用

  • 高并发:支持数千个客户端同时读写

kafka的用途?

  • 主要用于数据处理系统中的缓冲!(尤其是实时流式数据处理)

  • 收集日志:flume对接到kafka,采集实时的日志信息

  • 当作消息系统来用

kafka的构成

broker

每个kafka节点就是一个broker,都有自己的brokerid 从0开时

controller

他也是一个broker他额外有其他功能,管理整个集群中所有分区和副本的状态。

当某个分区的 leader 副本出现故障时,由控制器负责 为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由控制 器负责分区的重新分配。

zookeeper

2.0版本,zookeeper作为协调器,监听broker和consumers的上下线情况

3.0版本,zookeeper被废弃了

因此当创建命令行consumer的时候要指定zookeeper的地址和端口,broker的配置文件需要指定zookeeper的地址

topic

主题,相当于一个表

partition

分区,1个topic可以有多个分区,一个分区就是一个kafka数据的最小单元,不能被拆分,而且他与consumer是对应关系,一个消费者组中,一个consumer可以消费多个分区,但是一个分区只能被的一个消费者消费。分区不能被拆分消费。

replacation

每个分区可以设置相应的副本数量,但是不能超过broker的数量,3个broker的话副本数最多设置3个否则报错,副本中有两个角色,leader,fllower

leader

副本中的leader,主副本,负责消息的读写,分区 leader 副本的选举由控制器 controller 负责具体实施。

选举基本思路:按照 AR 集合中副本的顺序查找第一个存活的副本,并且这个副本在 ISR 集合中; 一个分区的 AR 集合在 partition 分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的 ISR 集合中副本的顺序可能会改变

fllower

只负责与leader完成副本的数据比对,进行副本的数据同步

isr

In-Sync Replicas isr 处于同步状态的fllower副本列表,确定一个副本处于isr状态,有2个判断条件

#默认10s,follow没有向leader发送心跳包就会被移除

rerplica.lag.time.max.ms=10000

#默认消息差大于4000会被移除

rerplica.lag.max.messages=4000

producer

生产者

一个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程

  • 在主线程中由 kafkaProducer 创建消息,然后通过可能的拦截器(可以自定义,默认没有)、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator, 也称为消息收集器)中。

  • 分区器:其中如果 key不为 null,那么默认的分区器会对 key 进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。

  • key为null,不指定分区,默认轮询

  • 指定key,默认按照key的hash分区算法分区

  • RecordAccumulator 主要用来缓存消息,将消息按照partitionid进行缓存,缓存到一定大小或一定时间之后,组成一个消息批次,共sender获取消息减少网络开销

  • RecordAccumulator 最大默认缓存32M数据,可以通过buffer.memory参数调整

  • 如果RecordAccumulator持续大于最大缓存消息量(buffer.memory),经过一定时间后会抛出异常停止工作,这个时间这个取决于参数max.block.ms 的配置,此参数的默认值为 60000,即 允许阻塞60s

  • Deque每个分区会创建一个双端队列,消息从尾部写入,头部读取,record进入accumulator时根据partitionid找到属于自己的Deque,如果没有就创建默认长度的队列,然后根据找到最后一个ProducerBatch根据batch.size判断是否能够继续写入当前记录,如果不能继续写入或没有ProducerBatch就创建新的ProducerBatch,这个ProducerBatch的大小取决于最后这条记录,如果这条记录小于batch.size的大小,那么直接用batch.size的大小,否则用这条记录的大小作为一个ProducerBatch的大小

  • RecordAccumulator中会根据分区创建多个双端队列,即使无法填充满也会占据默认的大小batch.size因此如果分区过多,那么双端队列也会有很多,此时会对默认缓存大小有一定的冲击,因此可以把buffer.memory调整大一点

  • Sender从RecordAccumulator 获取缓存的消息之后,会将<分区,Deque< ProducerBatch >>的形式转变成的形式,其中Node表示 Kafka 集群 broker节点,producer只需要指定分区号,sender需要根据分区号,找到对应的broker地址。因此需要做一步底层转换

  • 请求在从 sender 线程发往 Kafka服务端之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map>,它的主要作用是缓存了已经发出去但还没有收到服务端响应的请求(Nodeld 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.request.per.connection ,默认值为 5,即每个连接最多只能缓存5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response )。通过比较 Deque 的 size 与这个参数的大小来判断对应的 Node 中是否己经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继其发送请求会增大请求超时的可能。

  • sender收到kafka的ack返回码后,去InFlightRequests删除对应的request也会去RecordAccumulator删除对应的producerBatch。

consumer

消费者客户端

可以指定分区消费assign,可以不指定分区消费subcribe

consumer_group

消费者组,可以将多个消费者组成一个消费者组,同一个组内的消费者消费的数据是不同的,不同组的消费者消费的数据是重复的,而且分区和消费者组是绑定的。

比如一个topic有2个分区,一个消费者组有2个消费者,那么一个消费者消费一个分区

比如一个topic有1个分区,一个消费者组有2个消费者,那么一个消费者消费一个分区,另外那个消费者啥也不干

比如一个topic有3个分区,一个消费者组有2个消费者,那么其中一个消费者消费一个分区,另外一个消费者消费两个分区

结论,一个消费者组内的消费者消费的数据是瓜分消费,而且是对分区进行瓜分,而不是对分区内的数据进行瓜分,一个分区内的数据一定只能由一个消费者消费

partition是不能被切分的,只能有一个消费者去处理,partition是能被消费的最小粒度

consumer分配规则

在 kafka消费者组内,存在两种的分区分配策略:range(默认) 和 round robin。

range(默认)

按照topic的分区号顺序分区,默认的分配规则,

消费者设置参数partition.assignment.strategy =range 或 roundrobin

修改消费规则

缺点

round_robin

按照hashcode排序,然后轮询

group coordinator组协调器

1、选组协调器

当consumer加入消费者组时,此时groupcoordinator还没有,consumer需要根据自己的组id计算hashcode,然后模除以__consumer_offset的分区数(默认是50),得到的数作为该消费者组记录偏移量的__offset_consumer分区,然后这个分区的leader存在的broker节点作为group coordinator节点。

“g001”.hashcode%50

2、选消费者组内的leader

谁先进入这个消费者组谁就是消费者组的leader消费者,

如果leader挂掉,那么其余consumer随机产生一个消费者组的leader

3、选择合适的分配策略

费者组的leader消费者,会选择大家都能支持的分区规则,然后报告给group coordinator,然后group coordinator再下发给所有的消费者

4、心跳通信

消费者加入消费者组后,会定期与group coordinator定期心跳通信,这个时间由参数heartbeat.interval.ms指定,但是这个时间不能大于延迟确认时间,session.timeout.ms,消费者停止心跳通信后,group coordinator会延迟一段时间后,确认消费者宕机。session.timeout.ms,参数 的 配 置 值 必 须 在 broker 端 参 数group.min.session.timeout.ms (默认值为 6000 ,即 6 秒)和 group.max.session. timeout. ms (默认 值为300000 ,即 5 分钟)允许的范围内。

broker服务端常用参数

自动创建分区

kafka命令行

创建topic

kafka-topics.sh --create --topic order --zookeeperdream1:2181,dream2:2181,dream3:2181 --partitions 3 --replication-factor 2

删除topic

kafka-topics.sh --delete --topic test --zookeeper dream1:2181,dream2:2181,dream3:2181

增加分区数

删除 topic,需要一个参数处于启用状态: delete.topic.enable= true 使用 kafka-topics .sh 脚本删除主题的行为本质上只是在 ZooKeeper 中的 /admin/delete_topics 路径下 建一个与待删除主题同名的节点,以标记该主题为待删除的状态。与创建主题相同的是,真正删 除主题的动作也是由 Kafka 的控制器负责完成的。

修改分区数

bin/kafka-topics.sh --zookeeper dream1:2181,dream2:2181,dream3:2181-alter --partitions 6 --topic test01

Kafka 只支持增加分区,不支持减少分区 原因是:减少分区,代价太大(数据的转移,日志段拼接合并)如果真的需要实现此功能,则完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按 照既定的逻辑复制过去

查看所有的topic

kafka-topics.sh --zookeeper dream1:2181,dream2:2181 –list

描述topic

kafka-topics.sh --zookeeper dream1:2181 --describe --topic order

创建topic生产者-producer

kafka-console-producer.sh --broker-listdream1:9092,dream2:9092,dream3:9092 --topic order

创建topic消费者-consumer

kafka-console-consumer.sh --bootstrap-serverdream1:9092,dream2:9092,dream3:9092 --topic order

动态配置 topic 参数

通过管理命令,可以为已创建的 topic 增加、修改、删除 topic level 参数

添加、修改配置参数

添加压缩配置

bin/kafka-configs.sh --zookeeper dream1:2181 --entity-type topics--entity-name tpc_1 --alter --add-config compression.type=gzip

删除配置参数

删除压缩配置

bin/kafka-configs.sh --zookeeper dream1:2181 --entity-type topics--entity-name tpc_1 --alter --delete-config compression.typ

producer客户端常用参数

必要参数

  • bootstrap.servers

  • key.serializer

  • value.serializer

为了防止写错参数名,可以用ProducerConfig

可选参数

max.request.size

一条消息的最大字节限制,默认值为 1048576B ,即 1MB

这个参数与broker的message.max.bytes参数,最多能接收多大的消息要匹配

message.max.bytes>= max.request.size

batch.size

每个Batch 要存放batch.size大小的数据后,才可以发送出去。比如说 batch.size 默认值是 16KB,那么凑够 16KB 的数据才会发送

理论上来说,提升 batch.size 的大小,可以允许更多的数据缓冲在里面,那么一次 Request 发送出去的数据量就更多了,这样吞吐量可能会有所提升。

但是 batch.size 也不能过大,要是数据老是缓冲在 Batch 里迟迟不发送出去,那么发送消息的延迟就会很高。

需要用测试工具进行测试

linger.ms

除了满足batch.size,为了防止数据卡死,指定linger.ms后也会提交批次,默认是0,不按照时间进行提交。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。

compression.type

压缩格式,默认是none

该参数还可以配置为 "gzip","snappy" 和 "lz4"。对消息进行压缩可以极大地减少网络传输、降低网络 I/O,从而提高整体的性能 。 消息压缩是一种以时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩;

retries和retry.backoff.ms

retries 参数用来配置生产者重试的次数,默认值为 0,即在发生异常的时候不进行任何重试动作,另一个参数 retry.backoff.ms,这个参数的默认值为 100,它用来设定两次重试之间的 时间间隔,避免无效的频繁重试。

如果将 acks 参数配置为非零值,并且 max .flight.requests.per.connection 参数配置为大于1 的值,那可能会出现错序的现象:如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试 发送第一批次的消息,此时如果第一次的消息写入成功,那么这两个批次的消息就出现了错序。一般而言,在需要保证消息顺序且的场合建议把参数 max.in.flight.requests.per.connection 配置为 1,而不是把 acks 配置为0,不过这样也会影响整体的吞吐。

acks

partitioner.classes

用来指定分区器,默认:org.apache.kafka.internals.DefaultPartitioner

自定义 partitioner 需要实现org.apache.kafka.clients.producer.Partitioner 接口

producer Java Api

org.apache.kafka

kafka-clients

2.1.1

-------------------------------------------------------------------------------------------------

package com.ws;

import cn.hutool.core.util.RandomUtil;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;

import java.util.Map;

import java.util.Random;

public class KafkaProducer1 {

public static void main(String[] args) {

// (1)配置生产者客户端参数

Map prop = new HashMap<>();

prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"dream1:9092,dream2:9092,dream3:9092");

prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//(2)创建相应的生产者实例;

KafkaProducer producer = new KafkaProducer<>(prop);

for (int i = 0; i < 10; i++) {

// (3)创建消息

// (4)发送消息

int i1 = RandomUtil.randomInt(0, 3)+2;

// producer.send(new ProducerRecord<>("K1", i1, null, "所在分区:"+i1+" data,record"+i));

producer.send(new ProducerRecord<>("K1", "key"+i1, "key: key"+i1+" data:record"+i));

// producer.send(new ProducerRecord<>("K1", "key: key"+i+" data:record"+i));

}

// (5)关闭生产者实例。

producer.close();

}

}

三种方式

  • 指定key,不指定partition

  • 指定partition,不指定key

  • 不指定

consumer客户端常用参数

必要参数

  • bootstrap.servers

  • key.deserializer

  • value.deserializer

  • group.id

  • auto.offset.reset

可选参数

etch.min.bytes=1B

一次拉取的最小字节数

fetch.max.bytes=50M

一次拉取的最大数据量

fetch.max.wait.ms=500ms

拉取时的最大等待时长

max.partition.fetch.bytes = 1MB

每个分区一次拉取的最大数据量

max.poll.records=500

一次拉取的最大条数

connections.max.idle.ms=540000ms

网络连接的最大闲置时长

request.timeout.ms=30000ms

一次请求等待响应的最大超时时间

metadata.max.age.ms=300000

元数据在限定时间内没有进行更新,则会被强制更新

reconnect.backoff.ms=50ms

尝试重新连接指定主机之前的退避时间

retry.backoff.ms=100ms

尝试重新拉取数据的重试间隔

isolation.level=read_uncommitted

隔离级别! 决定消费者能读到什么样的数据

read_uncommitted:可以消费到 LSO(LastStableOffset)位置;

read_committed:可以消费到 HW(High Watermark)位置

max.poll.interval.ms

超过时限没有发起 poll 操作,则消费组认为该消费者已离开消费组

enable.auto.commit=true

开启消费位移的自动提交

auto.commit.interval.ms=5000

自动提交消费位移的时间间隔

consumer Java Api

subscribe消费

不可以指定消费分区

优点:在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移, assign是固定的消费分区所以无法调整。

KafkaConsumer consumer = new KafkaConsumer<>(prop);

  1. 1、 指定kafka的topic名,可以指定多个

consumer.subscribe(Arrays.asList("K1"));

2、指定一个正则表达式,可以动态匹配新增的topic

consumer.subscribe(Patten.compile(K*"));

assign消费

可以指定消费分区

KafkaConsumer consumer = new KafkaConsumer<>(prop);

// (2)订阅主题;只要1号分区、2号分区的数据

TopicPartition k1 = new TopicPartition("K1", 1);

TopicPartition k2 = new TopicPartition("K1", 2);

List topicPartitions = Arrays.asList(k1,k2);

consumer.assign(topicPartitions);

一个消费者组中,subscribe和assign不能同时使用。

取消订阅

以下三种方式都可以。

consumer.unsubscribe();

consumer.subscribe(new ArrayList()) ;

consumer.assign(new ArrayList());

指定偏移量消费

一般来说指定偏移量的话都是要指定分区的,否则意义不大,因为每个分区的offset都是从零开始的,所以offset全局是无意义的。一般需要指定消费哪个分区。

// (2)订阅主题;只要0号分区的数据

TopicPartition k1 = new TopicPartition("K1", 0);

List topicPartitions = Arrays.asList(k1);

consumer.assign(topicPartitions);

// 0号分区 从80开始消费包含80

consumer.seek(k1, 80);

偏移量提交

自动提交

Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit 配置, 默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置,默认值为 5 秒,此参数生效的前提是 enable. auto.commit 参数为 true。

手动api提交

此参数生效的前提是 enable. auto.commit 参数为 false。

kafka提供了两种手动提交方式

  1. consumer.commitSync() 阻塞同步提交

  1. consumer.commitAsync() 阻塞异步提交,这个方法可以写一个回调函数

手动代码提交(exactly once)

想实现exactly once必须要手动控制偏移量提交,指定偏移量消费

在开始之前,指定偏移量消费,然后消费者拉取一次数据,计算一次结果,提交一次偏移量,如果把结果和偏移量同时写入mysql,那么就可以保证不重复消费,exactly once,除了mysql还有hbase(行级事务)、redis (支持事务的组件),就是让消费数据的计算结果和偏移量(提交offset时要offset+1) 提交再一个事务内,要成功都成功要失败都失败

kafka groupCodingruangde

AdminClient Java Api

创建topic

/**

* 创建主题

* @param admin

*/

private static void create(AdminClient admin,String topic) {

// 创建 admin client 对象

// 由服务端 controller 自行分配分区及副本所在 broker

NewTopic tpc_2 = new NewTopic(topic, 2, (short) 2);

CreateTopicsResult result = admin.createTopics(Arrays.asList(tpc_2));

// 从 future 中等待服务端返回

try {

result.all().get();

} catch (Exception e) {

e.printStackTrace();

}

System.out.println("------------------完成!---------------------");

}

指定分区创建topic

/**

* 创建主题,手动指定分区及副本的 broker 分配

* @param admin

*/

private static void create2(AdminClient admin,String topic) {

// 手动指定分区及副本的 broker 分配

HashMap> replicaAssignments = new HashMap<>();

// 分区 0,分配到 broker0,broker1

replicaAssignments.put(0, Arrays.asList(0,1));

// 分区 1,分配到 broker0,broker2

replicaAssignments.put(1,Arrays.asList(1,2));

NewTopic tpc_3 = new NewTopic("K2", replicaAssignments);

CreateTopicsResult result = admin.createTopics(Arrays.asList(tpc_3));

// 从 future 中等待服务端返回

try {

result.all().get();

} catch (Exception e) {

e.printStackTrace();

}

System.out.println("------------------完成!---------------------");

}

删除topic

/**

* 删除主题

* @param admin

*/

private static void delete(AdminClient admin,List tp) throws ExecutionException, InterruptedException {

DeleteTopicsResult deleteTopicsResult = admin.deleteTopics(tp);

System.out.println(deleteTopicsResult.all().get());

System.out.println("------------------完成!---------------------");

}

查看topic

/**

* 列出主题

* @param admin

* @throws InterruptedException

* @throws ExecutionException

*/

private static void list(AdminClient admin) throws InterruptedException, ExecutionException {

// 列出主题

ListTopicsResult listTopics = admin.listTopics();

// 列出主题对象

// KafkaFuture> listings = listTopics.listings();

// Collection topicListings = listings.get();

// for (TopicListing topicListing : topicListings) {

// System.out.println(topicListing);

// }

// 列出主题名称

KafkaFuture> names = listTopics.names();

Set strings = names.get();

for (String string : strings) {

System.out.println(string);

}

System.out.println("------------------完成!---------------------");

}

kafka-flume

  1. kafka作为source

  1. kafka作为channel

  1. kafka作为sink

如果想把数据写入kafka,除了把kafka当作source,还可以把kafka作为channel。只需要source+channel就行

原理加强

日志格式分段

  1. 日志大小log.segment.bytes 的默认值为 1073741824,即 1G

  1. 滚动时间如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高 默认情况下,只配置了 log.roll.hours参数,其值为 168,即 7 天

  1. 索引文件大小log.index.size .max.bytes 的默认值为10485760,即 10MB

kafka生产者测试

bin/kafka-producer-perf-test.sh --topic tpc_3 --num-records 100000--record-size 1024 --throughput -1 --producer-propsbootstrap.servers=doitedu01:9092 ack

topic 用来指定生产者发送消息的目标主题

num-records 用来指定发送消息的总条数

record-size 用来设置每条消息的字节数

producer-props 参数用来指定生产者的配置,可同时指定多组配置,各组配置之间以空格分隔

producer-props 参数对应的还有一个 producer-config 参数,它用来指定生产者的配置文件

throughput 用来进行限流控制,当设定的值小于 0 时不限流,当设定的值大于 0 时,当发送的吞 吐量大于该值时就会被阻塞一段时间

经验:如何把 kafka 服务器的性能利用到最高,一般是让一台机器承载( cpu 线程数*2~3 )个分区

测试环境: 节点 3 个,cpu 2 核 2 线程,内存 8G ,每条消息 1k

测试结果: topic 在 12 个分区时,写入、读取的效率都是达到最高

写入: 75MB/s ,7.5 万条/s

读出: 310MB/s ,31 万条/s

kafka消费者测试

bin/kafka-consumer-perf-test.sh --topic tpc_3 --messages 100000--broker-list doitedu01:9092 --consumer.config x.properties

结果数据个字段含义:

start.time, end.time, data.consumed.in.MB,MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms,fetch.MB.sec, fetch.nMsg.sec

2020-11-14 15:43:42:422, 2020-11-1415:43:43:347, 98.1377, 106.0948, 100493, 108641.0811, 13, 912, 107.6071,110189.6930

结果中包含了多项信息,分别对应起始运行时间(start. time)、结束运行时 end.time)、消息总 量(data.consumed.in.MB ,单位为 MB ),按字节大小计算的消费吞吐量(单位为 MB )、消费的 消息总数( data. consumed.in nMsg )、按消息个数计算的吞吐量(nMsg.sec)、再平衡的时间( rebalance time.ms 单位为 MB/s)、拉取消息的持续时间(fetch.time.ms,单位为 ms)、每秒 拉取消息的字节大小(fetch.MB.sec 单位 MB/s)、每秒拉取消息的个数( fetch.nM.sec)。其中 fetch.time.ms= end.time -start.time - rebalance.time.ms

经验

如果一定要给一个准则,则建议将分区数设定为集群中 broker 的倍数,即假定集群中有 3 个 broker 节 点,可以设定分区数为 3/6/9 等,至于倍数的选定可以参考预估的吞吐量。 不过,如果集群中的 broker节点数有很多,比如大几十或上百、上千,那么这种准则也不太适用。

kafka场景

flume-kafka-sparkStreaming/flink-mysql/hbase/ck/redis

如果spark处理完的数据是一个结果数据,不是累加的数据,那么可以考虑mysql/hbase/ck/redis之类的。

flume-kafka- sparkStreaming/flink-kafka-mysql/hbase/ck/redis

如果spark处理完的数据还是一些逐渐增加的数据,那么我们可以考虑当作中间topic再写入到kafka,然后再导入mysql/hbase/ck/redis

topic分区的分配规则

topic的0号分区 0号副本是随机选择的,比如在broker2。那么1号分区的0号副本,在0号分区的brokerid上追加 就放在broker3,2号分区0号副本放在brokerid4

每个分区的1号副本就根据0号分区偏移一个随机数,然后234副本追加

kafka数据可靠性、一致性

kafka分区有副本概念

kafka的一个topic有多个副本,其中一个是leader,另外的副本叫做fllower,fllower会定期想leader同步信息,这样保证了数据存在多个副本

kafka有ack机制

  • ack=0,不需要broker的确认,发完就不管了

很有可能会丢数据,吞吐量很高

  • ack=1,需要分区的leader副本确认

有一定概率丢数据,当leader同步接收完数据后,fllower副本没来的急同步数据leader就挂掉了,那么最近一次的数据就会丢失

  • ack=-1/all,需要isr副本确认

min.insync.replicas参数决定了需要几个副本确认,所以就算是ack=-1,如果min.insync.replicas参数等于1,那就相当于ack=1,没有意义,所以一般需要>=2

kafka有isr副本概念

处于同步状态的副本,判断是否处于同步状态,有两个参数确认“消息差(4000)”和“心跳通信(10s),正常情况下只有处于isr的副本才有作为leader的机会,就算发生leader宕机,数据丢失也少

kafka有HW概念(High Water Mark)

当leader因故障发送切换时,新的leader数据和旧的leader提供的数据是一致的,因为所有副本之间有个高水位线的概念,所有的副本的LEO(Log End Offset)的最小值作为高水位线,标志所有的副本最小同步数据的offset,此水位线之后的数据不会提供给consumer去消费,当宕机的副本重启后,同步数据时会先截断水位线之后的数据,然后再同步,就能保证所有的分区HW以前的数据都是一致的(提供给消费者的数据都是一致的)

为了提高可用性,有不清洁选举

如果 unclean.leader.election.enable 参数设置为 true,就有可能发生数据丢失和数据 不一致的情况,Kafka 的可靠性就会降低;而如果 unclean.leader.election.enable 参数设置为 false

结论

综上所述,为了保证数据的可靠性与一致性,我们最少需要配置一下几个参数: 设置

  • producer 的: acks=all(或者 request.required.acks= -1)

  • topic 的: replication.factor>=3,

  • min.insync.replicas>=2;

  • unclean.leader.election.enable=false

幂等性

概念

为了防止数据重试或其他故障导致数据多次写入。

开启幂等性功能的方式很简单,只需要显式地将生产者客户端参数enable.idempotence 设置为 true 即 可(这个参数的默认值为 false),

参考如下: [i d m p ten s]

  • properties.put("enable.idempotence",true);

  • retries > 0

  • max.in.flight.requests.per.connection<=5 (默认配置)

  • acks = -1

实现办法

每一个 producer 在初始化时会生成一个 producer_id,并为每个目标 partition 维护一个“序列号”;

producer 每发送一条消息,会将对应的“序列号”加 1

broker 端会为每一对维护一个序列号,对于每收到的一条消息,会判断服务端的 SN_old 和接收到的消息中的 SN_new 进行对比:

  • 如果 SN_OLD+1 = SN_NEW,正常情况

  • 如果 SN_old+1>SN_new,说明是重复写入的数据,直接丢弃

  • 如果 SN_old+1

producer.send(“aaa”) 消息 aaa 就拥有了一个唯一的序列号 如果这条消息发送失败,producer 内部自动重试(retry),此时序列号不变; producer.send(“aaa”) 消息 aaa 此时又拥有一个新的序列号

kafka自带事务机制

事务场景是有规定的,只有这个场景下才能使用事务

数据采集到kafka之后,简单的业务场景,又写回到kafka

  • 创建kafka生产者

  • 创建kafka消费者

  • properties.put("transactional.id","transactionid00001"); 指定事务id

  • properties.put ("enable.idempotence",true); 开启幂等性

  • initTransactions 初始化事务

  • beginTransactions 开启事务

  • poll

  • 业务

  • send

  • producer提交offset

  • commitTransactions 提交事务

  • abordTransactions 如果异常回滚事务

api

package com.ws;

import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.TopicPartition;

import org.apache.kafka.common.errors.ProducerFencedException;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;

import java.util.*;

public class TransactionTest {

private static final String BOOTSTRAP_SERVER = "dream1:9092,dream2:9092,dream3:9092";

private static final String TRANSACTION_GROUP = "transactional_01";

public static void main(String[] args) {

// 创建消费者

Map propc = new HashMap<>();

propc.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

propc.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

propc.put(ConsumerConfig.GROUP_ID_CONFIG, "transaction_g01");

propc.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

propc.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);

// 创建生产者

Map propp = new HashMap<>();

propp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);

propp.put(ProducerConfig.ACKS_CONFIG, "-1");

propp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

propp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 唯一的事务id

propp.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_GROUP);

KafkaProducer producer = new KafkaProducer<>(propp);

KafkaConsumer consumer = new KafkaConsumer<>(propc);

// 订阅topic

List transaction_topic = Arrays.asList("transaction_topic");

consumer.subscribe(transaction_topic);

// 初始化事务

producer.initTransactions();

while (true) {

// 开启事务

producer.beginTransaction();

try {

// 消费者拉去数据

ConsumerRecords poll = consumer.poll(Duration.ofMillis(1000));

// 获取topic的分区信息

Set assignment = consumer.assignment();

// 装载分区的偏移量信息

Map offset = new HashMap<>();

// 按照分区读取数据

for (TopicPartition topicPartition : assignment) {

// 获取分区数据

List> records = poll.records(topicPartition);

if (!records.isEmpty()) {

// 读取数据 做业务,然后 发送数据

for (ConsumerRecord record : records) {

/*

* 业务,我的业务是变成大写

*/

String s = record.value().toUpperCase();

producer.send(new ProducerRecord("transaction_topic_new", record.key(), s));

}

// 组装分区偏移量信息 offset一定要+1

offset.put(topicPartition, new OffsetAndMetadata(records.get(records.size() - 1).offset() + 1));

}

}

// 提交偏移量

producer.sendOffsetsToTransaction(offset, TRANSACTION_GROUP);

// 提交事务

producer.commitTransaction();

} catch (ProducerFencedException e) {

// 回滚事务

producer.abortTransaction();

e.printStackTrace();

}

}

}

}

相关内容

热门资讯

新城控股王晓松:2026年公司... 中经记者 杨让晨 石英婧 上海报道“2026年,是国家‘十五五’规划的开局之年,也是新城的关键一年。...
新势力车企2025年成绩单:大... 红星资本局1月1日消息,2026年1月1日,新势力车企陆续公布了2025年全年的销售成绩单。红星资本...
《燕梳师院》盘点2025年中国... 2025年,中国保险业迈入高质量发展的关键转型期,“十五五”规划的开局为行业注入新动能。在科技赋能、...
《燕梳师院》发布2025年世界... 2025年,全球保险业在政策调控、市场整合、科技转型与风险应对等多维度迎来深度变革。从主要经济体的监...
岁末年初,国家拿出真金白银! 文/王恩博岁序更替之际,市场高度关注中国经济政策如何衔接。抓住这一关键时间窗口,近期各方面靠前发力,...
中国医药两家子公司补税6521... 中经记者 晏国文 卢志坤 北京报道2026年1月1日,中国医药(600056.SH)披露,下属全资子...
中国同辐子公司补缴税款2.71... 中经记者 晏国文 卢志坤 北京报道2025年12月31日,港股上市公司中国同辐(1763.HK)披露...
中国军号发布军事演习主题海报《... 本文转自【中国军号微博】; 中国人民解放军打“独”促统决不手软,将持续组织反分裂反干涉行动,坚决维护...
利空突袭!暴跌50%! 美股及印度股市,均有突发消息传来!北京时间2026年1月1日凌晨,在美股市场上,生物制药公司Corc...
两度出手全资控矿!盛新锂能拟2... 微成都报道12月30日晚间,盛新锂能(002240.SZ)发布公告称,公司拟通过全资子公司四川盛屯锂...