常见的消息队列有 activemq ,rabbitmq,rocketmq;
消息队列常用于两个系统之间的数据传递,可以点对点(一对一聊天) 也可以订阅模式(群组)
kafka是订阅模式。
队列是有序的,先进先出
kafka是无序的,全局无序,分区内有序,也可以让他变成有序的
一、只用一个分区
二、多个分区,但是producer写入按照顺序写,然后consumer按照顺序读取
异步:同步会阻塞,如果某些逻辑没有先后顺序,不影响主逻辑,可以异步处理(开线程)
削峰:当访问数量激增,采用消息系统,可以先将用户的请求缓存起来,慢慢处理,否则会崩溃
解耦:如果以后想添加逻辑,可以单独的写逻辑代码,只需要读取kafka的数据就好,否则还得重构代码
高吞吐,低延迟,kafka每秒可以处理几十万的消息数据,延迟只有几毫秒。
topic可以分区,而且消费者可以组成消费者组,同步消费不同分区的数据
支持热扩展
kafka把数据缓存到内存的同时,会持久化到磁盘。安全可靠
高容错性,一个topic处于isr的分区副本只要有一个还能用,这个topic就能用
高并发:支持数千个客户端同时读写
主要用于数据处理系统中的缓冲!(尤其是实时流式数据处理)
收集日志:flume对接到kafka,采集实时的日志信息
当作消息系统来用
每个kafka节点就是一个broker,都有自己的brokerid 从0开时
他也是一个broker他额外有其他功能,管理整个集群中所有分区和副本的状态。
当某个分区的 leader 副本出现故障时,由控制器负责 为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由控制 器负责分区的重新分配。
2.0版本,zookeeper作为协调器,监听broker和consumers的上下线情况
3.0版本,zookeeper被废弃了
因此当创建命令行consumer的时候要指定zookeeper的地址和端口,broker的配置文件需要指定zookeeper的地址
主题,相当于一个表
分区,1个topic可以有多个分区,一个分区就是一个kafka数据的最小单元,不能被拆分,而且他与consumer是对应关系,一个消费者组中,一个consumer可以消费多个分区,但是一个分区只能被的一个消费者消费。分区不能被拆分消费。
每个分区可以设置相应的副本数量,但是不能超过broker的数量,3个broker的话副本数最多设置3个否则报错,副本中有两个角色,leader,fllower
副本中的leader,主副本,负责消息的读写,分区 leader 副本的选举由控制器 controller 负责具体实施。
选举基本思路:按照 AR 集合中副本的顺序查找第一个存活的副本,并且这个副本在 ISR 集合中; 一个分区的 AR 集合在 partition 分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的 ISR 集合中副本的顺序可能会改变
只负责与leader完成副本的数据比对,进行副本的数据同步
In-Sync Replicas isr 处于同步状态的fllower副本列表,确定一个副本处于isr状态,有2个判断条件
#默认10s,follow没有向leader发送心跳包就会被移除
rerplica.lag.time.max.ms=10000
#默认消息差大于4000会被移除
rerplica.lag.max.messages=4000
生产者
一个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程
在主线程中由 kafkaProducer 创建消息,然后通过可能的拦截器(可以自定义,默认没有)、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator, 也称为消息收集器)中。
分区器:其中如果 key不为 null,那么默认的分区器会对 key 进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。
key为null,不指定分区,默认轮询
指定key,默认按照key的hash分区算法分区
RecordAccumulator 主要用来缓存消息,将消息按照partitionid进行缓存,缓存到一定大小或一定时间之后,组成一个消息批次,共sender获取消息减少网络开销
RecordAccumulator 最大默认缓存32M数据,可以通过buffer.memory参数调整
如果RecordAccumulator持续大于最大缓存消息量(buffer.memory),经过一定时间后会抛出异常停止工作,这个时间这个取决于参数max.block.ms 的配置,此参数的默认值为 60000,即 允许阻塞60s
Deque
RecordAccumulator中会根据分区创建多个双端队列,即使无法填充满也会占据默认的大小batch.size因此如果分区过多,那么双端队列也会有很多,此时会对默认缓存大小有一定的冲击,因此可以把buffer.memory调整大一点
Sender从RecordAccumulator 获取缓存的消息之后,会将<分区,Deque< ProducerBatch >>的形式转变成
请求在从 sender 线程发往 Kafka服务端之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map
sender收到kafka的ack返回码后,去InFlightRequests删除对应的request也会去RecordAccumulator删除对应的producerBatch。
消费者客户端
可以指定分区消费assign,可以不指定分区消费subcribe
消费者组,可以将多个消费者组成一个消费者组,同一个组内的消费者消费的数据是不同的,不同组的消费者消费的数据是重复的,而且分区和消费者组是绑定的。
比如一个topic有2个分区,一个消费者组有2个消费者,那么一个消费者消费一个分区
比如一个topic有1个分区,一个消费者组有2个消费者,那么一个消费者消费一个分区,另外那个消费者啥也不干
比如一个topic有3个分区,一个消费者组有2个消费者,那么其中一个消费者消费一个分区,另外一个消费者消费两个分区
结论,一个消费者组内的消费者消费的数据是瓜分消费,而且是对分区进行瓜分,而不是对分区内的数据进行瓜分,一个分区内的数据一定只能由一个消费者消费
partition是不能被切分的,只能有一个消费者去处理,partition是能被消费的最小粒度
在 kafka消费者组内,存在两种的分区分配策略:range(默认) 和 round robin。
按照topic的分区号顺序分区,默认的分配规则,
消费者设置参数partition.assignment.strategy =range 或 roundrobin
修改消费规则
缺点
按照hashcode排序,然后轮询
当consumer加入消费者组时,此时groupcoordinator还没有,consumer需要根据自己的组id计算hashcode,然后模除以__consumer_offset的分区数(默认是50),得到的数作为该消费者组记录偏移量的__offset_consumer分区,然后这个分区的leader存在的broker节点作为group coordinator节点。
“g001”.hashcode%50
谁先进入这个消费者组谁就是消费者组的leader消费者,
如果leader挂掉,那么其余consumer随机产生一个消费者组的leader
费者组的leader消费者,会选择大家都能支持的分区规则,然后报告给group coordinator,然后group coordinator再下发给所有的消费者
消费者加入消费者组后,会定期与group coordinator定期心跳通信,这个时间由参数heartbeat.interval.ms指定,但是这个时间不能大于延迟确认时间,session.timeout.ms,消费者停止心跳通信后,group coordinator会延迟一段时间后,确认消费者宕机。session.timeout.ms,参数 的 配 置 值 必 须 在 broker 端 参 数group.min.session.timeout.ms (默认值为 6000 ,即 6 秒)和 group.max.session. timeout. ms (默认 值为300000 ,即 5 分钟)允许的范围内。
自动创建分区
kafka-topics.sh --create --topic order --zookeeperdream1:2181,dream2:2181,dream3:2181 --partitions 3 --replication-factor 2
kafka-topics.sh --delete --topic test --zookeeper dream1:2181,dream2:2181,dream3:2181
增加分区数
删除 topic,需要一个参数处于启用状态: delete.topic.enable= true 使用 kafka-topics .sh 脚本删除主题的行为本质上只是在 ZooKeeper 中的 /admin/delete_topics 路径下 建一个与待删除主题同名的节点,以标记该主题为待删除的状态。与创建主题相同的是,真正删 除主题的动作也是由 Kafka 的控制器负责完成的。
bin/kafka-topics.sh --zookeeper dream1:2181,dream2:2181,dream3:2181-alter --partitions 6 --topic test01
Kafka 只支持增加分区,不支持减少分区 原因是:减少分区,代价太大(数据的转移,日志段拼接合并)如果真的需要实现此功能,则完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按 照既定的逻辑复制过去
kafka-topics.sh --zookeeper dream1:2181,dream2:2181 –list
kafka-topics.sh --zookeeper dream1:2181 --describe --topic order
kafka-console-producer.sh --broker-listdream1:9092,dream2:9092,dream3:9092 --topic order
kafka-console-consumer.sh --bootstrap-serverdream1:9092,dream2:9092,dream3:9092 --topic order
通过管理命令,可以为已创建的 topic 增加、修改、删除 topic level 参数
添加压缩配置
bin/kafka-configs.sh --zookeeper dream1:2181 --entity-type topics--entity-name tpc_1 --alter --add-config compression.type=gzip
删除压缩配置
bin/kafka-configs.sh --zookeeper dream1:2181 --entity-type topics--entity-name tpc_1 --alter --delete-config compression.typ
bootstrap.servers
key.serializer
value.serializer
为了防止写错参数名,可以用ProducerConfig
一条消息的最大字节限制,默认值为 1048576B ,即 1MB
这个参数与broker的message.max.bytes参数,最多能接收多大的消息要匹配
message.max.bytes>= max.request.size
每个Batch 要存放batch.size大小的数据后,才可以发送出去。比如说 batch.size 默认值是 16KB,那么凑够 16KB 的数据才会发送
理论上来说,提升 batch.size 的大小,可以允许更多的数据缓冲在里面,那么一次 Request 发送出去的数据量就更多了,这样吞吐量可能会有所提升。
但是 batch.size 也不能过大,要是数据老是缓冲在 Batch 里迟迟不发送出去,那么发送消息的延迟就会很高。
需要用测试工具进行测试
除了满足batch.size,为了防止数据卡死,指定linger.ms后也会提交批次,默认是0,不按照时间进行提交。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。
压缩格式,默认是none
该参数还可以配置为 "gzip","snappy" 和 "lz4"。对消息进行压缩可以极大地减少网络传输、降低网络 I/O,从而提高整体的性能 。 消息压缩是一种以时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩;
retries 参数用来配置生产者重试的次数,默认值为 0,即在发生异常的时候不进行任何重试动作,另一个参数 retry.backoff.ms,这个参数的默认值为 100,它用来设定两次重试之间的 时间间隔,避免无效的频繁重试。
如果将 acks 参数配置为非零值,并且 max .flight.requests.per.connection 参数配置为大于1 的值,那可能会出现错序的现象:如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试 发送第一批次的消息,此时如果第一次的消息写入成功,那么这两个批次的消息就出现了错序。一般而言,在需要保证消息顺序且的场合建议把参数 max.in.flight.requests.per.connection 配置为 1,而不是把 acks 配置为0,不过这样也会影响整体的吞吐。
用来指定分区器,默认:org.apache.kafka.internals.DefaultPartitioner
自定义 partitioner 需要实现org.apache.kafka.clients.producer.Partitioner 接口
-------------------------------------------------------------------------------------------------
package com.ws; import cn.hutool.core.util.RandomUtil; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.HashMap; import java.util.Map; import java.util.Random; public class KafkaProducer1 { public static void main(String[] args) { // (1)配置生产者客户端参数 Map prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"dream1:9092,dream2:9092,dream3:9092"); prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //(2)创建相应的生产者实例; KafkaProducer for (int i = 0; i < 10; i++) { // (3)创建消息 // (4)发送消息 int i1 = RandomUtil.randomInt(0, 3)+2; // producer.send(new ProducerRecord<>("K1", i1, null, "所在分区:"+i1+" data,record"+i)); producer.send(new ProducerRecord<>("K1", "key"+i1, "key: key"+i1+" data:record"+i)); // producer.send(new ProducerRecord<>("K1", "key: key"+i+" data:record"+i)); } // (5)关闭生产者实例。 producer.close(); } } |
三种方式
指定key,不指定partition
指定partition,不指定key
不指定
bootstrap.servers
key.deserializer
value.deserializer
group.id
auto.offset.reset
etch.min.bytes=1B
一次拉取的最小字节数
fetch.max.bytes=50M
一次拉取的最大数据量
fetch.max.wait.ms=500ms
拉取时的最大等待时长
max.partition.fetch.bytes = 1MB
每个分区一次拉取的最大数据量
max.poll.records=500
一次拉取的最大条数
connections.max.idle.ms=540000ms
网络连接的最大闲置时长
request.timeout.ms=30000ms
一次请求等待响应的最大超时时间
metadata.max.age.ms=300000
元数据在限定时间内没有进行更新,则会被强制更新
reconnect.backoff.ms=50ms
尝试重新连接指定主机之前的退避时间
retry.backoff.ms=100ms
尝试重新拉取数据的重试间隔
isolation.level=read_uncommitted
隔离级别! 决定消费者能读到什么样的数据
read_uncommitted:可以消费到 LSO(LastStableOffset)位置;
read_committed:可以消费到 HW(High Watermark)位置
max.poll.interval.ms
超过时限没有发起 poll 操作,则消费组认为该消费者已离开消费组
enable.auto.commit=true
开启消费位移的自动提交
auto.commit.interval.ms=5000
自动提交消费位移的时间间隔
不可以指定消费分区
优点:在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移, assign是固定的消费分区所以无法调整。
KafkaConsumer
1、 指定kafka的topic名,可以指定多个
consumer.subscribe(Arrays.asList("K1"));
2、指定一个正则表达式,可以动态匹配新增的topic
consumer.subscribe(Patten.compile(K*"));
可以指定消费分区
KafkaConsumer
// (2)订阅主题;只要1号分区、2号分区的数据
TopicPartition k1 = new TopicPartition("K1", 1);
TopicPartition k2 = new TopicPartition("K1", 2);
List
consumer.assign(topicPartitions);
一个消费者组中,subscribe和assign不能同时使用。
以下三种方式都可以。
consumer.unsubscribe();
consumer.subscribe(new ArrayList
consumer.assign(new ArrayList
一般来说指定偏移量的话都是要指定分区的,否则意义不大,因为每个分区的offset都是从零开始的,所以offset全局是无意义的。一般需要指定消费哪个分区。
// (2)订阅主题;只要0号分区的数据
TopicPartition k1 = new TopicPartition("K1", 0);
List
consumer.assign(topicPartitions);
// 0号分区 从80开始消费包含80
consumer.seek(k1, 80);
Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit 配置, 默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置,默认值为 5 秒,此参数生效的前提是 enable. auto.commit 参数为 true。
此参数生效的前提是 enable. auto.commit 参数为 false。
kafka提供了两种手动提交方式
consumer.commitSync() 阻塞同步提交
consumer.commitAsync() 阻塞异步提交,这个方法可以写一个回调函数
想实现exactly once必须要手动控制偏移量提交,指定偏移量消费
在开始之前,指定偏移量消费,然后消费者拉取一次数据,计算一次结果,提交一次偏移量,如果把结果和偏移量同时写入mysql,那么就可以保证不重复消费,exactly once,除了mysql还有hbase(行级事务)、redis (支持事务的组件),就是让消费数据的计算结果和偏移量(提交offset时要offset+1) 提交再一个事务内,要成功都成功要失败都失败
kafka groupCodingruangde
/** * 创建主题 * @param admin */ private static void create(AdminClient admin,String topic) { // 创建 admin client 对象 // 由服务端 controller 自行分配分区及副本所在 broker NewTopic tpc_2 = new NewTopic(topic, 2, (short) 2); CreateTopicsResult result = admin.createTopics(Arrays.asList(tpc_2)); // 从 future 中等待服务端返回 try { result.all().get(); } catch (Exception e) { e.printStackTrace(); } System.out.println("------------------完成!---------------------"); }
|
/** * 创建主题,手动指定分区及副本的 broker 分配 * @param admin */ private static void create2(AdminClient admin,String topic) { // 手动指定分区及副本的 broker 分配 HashMap // 分区 0,分配到 broker0,broker1 replicaAssignments.put(0, Arrays.asList(0,1)); // 分区 1,分配到 broker0,broker2 replicaAssignments.put(1,Arrays.asList(1,2)); NewTopic tpc_3 = new NewTopic("K2", replicaAssignments); CreateTopicsResult result = admin.createTopics(Arrays.asList(tpc_3)); // 从 future 中等待服务端返回 try { result.all().get(); } catch (Exception e) { e.printStackTrace(); } System.out.println("------------------完成!---------------------"); }
|
/** * 删除主题 * @param admin */ private static void delete(AdminClient admin,List DeleteTopicsResult deleteTopicsResult = admin.deleteTopics(tp); System.out.println(deleteTopicsResult.all().get()); System.out.println("------------------完成!---------------------"); }
|
/** * 列出主题 * @param admin * @throws InterruptedException * @throws ExecutionException */ private static void list(AdminClient admin) throws InterruptedException, ExecutionException { // 列出主题 ListTopicsResult listTopics = admin.listTopics(); // 列出主题对象 // KafkaFuture // Collection // for (TopicListing topicListing : topicListings) { // System.out.println(topicListing); // } // 列出主题名称 KafkaFuture Set for (String string : strings) { System.out.println(string); } System.out.println("------------------完成!---------------------"); }
|
kafka作为source
kafka作为channel
kafka作为sink
如果想把数据写入kafka,除了把kafka当作source,还可以把kafka作为channel。只需要source+channel就行
日志大小log.segment.bytes 的默认值为 1073741824,即 1G
滚动时间如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高 默认情况下,只配置了 log.roll.hours参数,其值为 168,即 7 天
索引文件大小log.index.size .max.bytes 的默认值为10485760,即 10MB
bin/kafka-producer-perf-test.sh --topic tpc_3 --num-records 100000--record-size 1024 --throughput -1 --producer-propsbootstrap.servers=doitedu01:9092 ack
topic 用来指定生产者发送消息的目标主题
num-records 用来指定发送消息的总条数
record-size 用来设置每条消息的字节数
producer-props 参数用来指定生产者的配置,可同时指定多组配置,各组配置之间以空格分隔
producer-props 参数对应的还有一个 producer-config 参数,它用来指定生产者的配置文件
throughput 用来进行限流控制,当设定的值小于 0 时不限流,当设定的值大于 0 时,当发送的吞 吐量大于该值时就会被阻塞一段时间
经验:如何把 kafka 服务器的性能利用到最高,一般是让一台机器承载( cpu 线程数*2~3 )个分区
测试环境: 节点 3 个,cpu 2 核 2 线程,内存 8G ,每条消息 1k
测试结果: topic 在 12 个分区时,写入、读取的效率都是达到最高
写入: 75MB/s ,7.5 万条/s
读出: 310MB/s ,31 万条/s
bin/kafka-consumer-perf-test.sh --topic tpc_3 --messages 100000--broker-list doitedu01:9092 --consumer.config x.properties
结果数据个字段含义:
start.time, end.time, data.consumed.in.MB,MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms,fetch.MB.sec, fetch.nMsg.sec
2020-11-14 15:43:42:422, 2020-11-1415:43:43:347, 98.1377, 106.0948, 100493, 108641.0811, 13, 912, 107.6071,110189.6930
结果中包含了多项信息,分别对应起始运行时间(start. time)、结束运行时 end.time)、消息总 量(data.consumed.in.MB ,单位为 MB ),按字节大小计算的消费吞吐量(单位为 MB )、消费的 消息总数( data. consumed.in nMsg )、按消息个数计算的吞吐量(nMsg.sec)、再平衡的时间( rebalance time.ms 单位为 MB/s)、拉取消息的持续时间(fetch.time.ms,单位为 ms)、每秒 拉取消息的字节大小(fetch.MB.sec 单位 MB/s)、每秒拉取消息的个数( fetch.nM.sec)。其中 fetch.time.ms= end.time -start.time - rebalance.time.ms
经验
如果一定要给一个准则,则建议将分区数设定为集群中 broker 的倍数,即假定集群中有 3 个 broker 节 点,可以设定分区数为 3/6/9 等,至于倍数的选定可以参考预估的吞吐量。 不过,如果集群中的 broker节点数有很多,比如大几十或上百、上千,那么这种准则也不太适用。
flume-kafka-sparkStreaming/flink-mysql/hbase/ck/redis
如果spark处理完的数据是一个结果数据,不是累加的数据,那么可以考虑mysql/hbase/ck/redis之类的。
flume-kafka- sparkStreaming/flink-kafka-mysql/hbase/ck/redis
如果spark处理完的数据还是一些逐渐增加的数据,那么我们可以考虑当作中间topic再写入到kafka,然后再导入mysql/hbase/ck/redis
topic的0号分区 0号副本是随机选择的,比如在broker2。那么1号分区的0号副本,在0号分区的brokerid上追加 就放在broker3,2号分区0号副本放在brokerid4
每个分区的1号副本就根据0号分区偏移一个随机数,然后234副本追加
kafka的一个topic有多个副本,其中一个是leader,另外的副本叫做fllower,fllower会定期想leader同步信息,这样保证了数据存在多个副本
ack=0,不需要broker的确认,发完就不管了
很有可能会丢数据,吞吐量很高
ack=1,需要分区的leader副本确认
有一定概率丢数据,当leader同步接收完数据后,fllower副本没来的急同步数据leader就挂掉了,那么最近一次的数据就会丢失
ack=-1/all,需要isr副本确认
min.insync.replicas参数决定了需要几个副本确认,所以就算是ack=-1,如果min.insync.replicas参数等于1,那就相当于ack=1,没有意义,所以一般需要>=2
处于同步状态的副本,判断是否处于同步状态,有两个参数确认“消息差(4000)”和“心跳通信(10s),正常情况下只有处于isr的副本才有作为leader的机会,就算发生leader宕机,数据丢失也少
当leader因故障发送切换时,新的leader数据和旧的leader提供的数据是一致的,因为所有副本之间有个高水位线的概念,所有的副本的LEO(Log End Offset)的最小值作为高水位线,标志所有的副本最小同步数据的offset,此水位线之后的数据不会提供给consumer去消费,当宕机的副本重启后,同步数据时会先截断水位线之后的数据,然后再同步,就能保证所有的分区HW以前的数据都是一致的(提供给消费者的数据都是一致的)
如果 unclean.leader.election.enable 参数设置为 true,就有可能发生数据丢失和数据 不一致的情况,Kafka 的可靠性就会降低;而如果 unclean.leader.election.enable 参数设置为 false
综上所述,为了保证数据的可靠性与一致性,我们最少需要配置一下几个参数: 设置
producer 的: acks=all(或者 request.required.acks= -1)
topic 的: replication.factor>=3,
min.insync.replicas>=2;
unclean.leader.election.enable=false
为了防止数据重试或其他故障导致数据多次写入。
开启幂等性功能的方式很简单,只需要显式地将生产者客户端参数enable.idempotence 设置为 true 即 可(这个参数的默认值为 false),
参考如下: [i d m p ten s]
properties.put("enable.idempotence",true);
retries > 0
max.in.flight.requests.per.connection<=5 (默认配置)
acks = -1
每一个 producer 在初始化时会生成一个 producer_id,并为每个目标 partition 维护一个“序列号”;
producer 每发送一条消息,会将对应的“序列号”加 1
broker 端会为每一对维护一个序列号,对于每收到的一条消息,会判断服务端的 SN_old 和接收到的消息中的 SN_new 进行对比:
如果 SN_OLD+1 = SN_NEW,正常情况
如果 SN_old+1>SN_new,说明是重复写入的数据,直接丢弃
如果 SN_old+1
producer.send(“aaa”) 消息 aaa 就拥有了一个唯一的序列号 如果这条消息发送失败,producer 内部自动重试(retry),此时序列号不变; producer.send(“aaa”) 消息 aaa 此时又拥有一个新的序列号
事务场景是有规定的,只有这个场景下才能使用事务
数据采集到kafka之后,简单的业务场景,又写回到kafka
创建kafka生产者
创建kafka消费者
properties.put("transactional.id","transactionid00001"); 指定事务id
properties.put ("enable.idempotence",true); 开启幂等性
initTransactions 初始化事务
beginTransactions 开启事务
poll
业务
send
producer提交offset
commitTransactions 提交事务
abordTransactions 如果异常回滚事务
api
package com.ws; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.time.Duration; import java.util.*; public class TransactionTest { private static final String BOOTSTRAP_SERVER = "dream1:9092,dream2:9092,dream3:9092"; private static final String TRANSACTION_GROUP = "transactional_01"; public static void main(String[] args) { // 创建消费者 Map propc.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); propc.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); propc.put(ConsumerConfig.GROUP_ID_CONFIG, "transaction_g01"); propc.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); propc.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER); // 创建生产者 Map propp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER); propp.put(ProducerConfig.ACKS_CONFIG, "-1"); propp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); propp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 唯一的事务id propp.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_GROUP); KafkaProducer KafkaConsumer // 订阅topic List consumer.subscribe(transaction_topic); // 初始化事务 producer.initTransactions(); while (true) { // 开启事务 producer.beginTransaction(); try { // 消费者拉去数据 ConsumerRecords // 获取topic的分区信息 Set // 装载分区的偏移量信息 Map // 按照分区读取数据 for (TopicPartition topicPartition : assignment) { // 获取分区数据 List if (!records.isEmpty()) { // 读取数据 做业务,然后 发送数据 for (ConsumerRecord /* * 业务,我的业务是变成大写 */ String s = record.value().toUpperCase(); producer.send(new ProducerRecord } // 组装分区偏移量信息 offset一定要+1 offset.put(topicPartition, new OffsetAndMetadata(records.get(records.size() - 1).offset() + 1)); } } // 提交偏移量 producer.sendOffsetsToTransaction(offset, TRANSACTION_GROUP); // 提交事务 producer.commitTransaction(); } catch (ProducerFencedException e) { // 回滚事务 producer.abortTransaction(); e.printStackTrace(); } } }
} |