Kafka 学习笔记 (I) Kafka 基准测试、闲置和事务、Java 编程操作 Kafka-7 Kafka 事务
最编程
2024-10-01 18:55:52
...
7.1 Kafka事务介绍
通过事务机制,Kafka可以实现对多个Topic的多个Partition的原子性的写入,即处于同一个事务内的所有消息,不管最终需要落地到哪个Topic 的哪个Partition,最终结果都是要么全部写成功,要么全部写失败。
开启事务,必须开启幂等性,Kafka的事务机制,在底层依赖于幂等生产者。
7.2 事务操作API
要开启Kafka事务,生产者需要添加以下配置:
// 配置事务的id,开启了事务会默认开启幂等性
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional");
消费者则需要添加以下配置:
// 设置隔离级别
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// 关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Producer接口中定义了以下5个事务相关方法:
- 1.
initTransactions
(初始化事务):要使用Kafka事务,必须先进行初始化操作 - 2.
beginTransaction
(开始事务):启动一个Kafka事务 - 3.
sendOffsetsToTransaction
(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交 - 4.
commitTransaction
(提交事务):提交事务 - 5.
abortTransaction
(放弃事务):取消事务
7.3 Kafka事务编程
7.3.1 需求
在Kafka的Topic[ods_user
]中有一些用户数据,数据格式如下:
姓名,性别,出生日期
张三,1,1980-10-09
李四,0,1985-11-01
现在要编写程序,将用户的性别转换为男、女(1-男,0-女),转换后将数据写入到Topic[dwd_user
]中。要求使用事务保障,要么消费了数据的同时写入数据到新Topic,提交offset;要么全部失败。
7.3.2 创建Topic
启动生产者控制台程序,准备发送消息到Topic[ods_user
]:
[root@node-01 kafka01]$ bin/kafka-console-producer.sh --broker-list 192.168.245.130:9092 --topic ods_user
>
启动消费者控制台程序,准备从新Topic[dwd_user
]消费消息:
[root@node-01 kafka01]$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.245.130:9092 --topic dwd_user --from-beginning --isolation-level read_committed
7.3.3 编写生产者
private static KafkaProducer<String, String> createProducer() {
// 1.创建用于连接Kafka的Properties配置
Properties props = new Properties();
// 配置事务的id,开启了事务会默认开启幂等性
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional");
props.put("bootstrap.servers", "192.168.245.130:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2.创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
return producer;
}
7.3.4 创建消费者
private static KafkaConsumer<String, String> createConsumer() {
// 1.创建Kafka消费者配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092");
props.setProperty("group.id", "my_group");
// 关闭自动提交offset
props.setProperty("enable.auto.commit", "false");
// 事务隔离级别
props.put("isolation.level", "read_committed");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 2.创建Kafka消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 3.订阅要消费的主题
consumer.subscribe(Arrays.asList("ods_user"));
return consumer;
}
7.3.5 消费旧Topic数据并生产到新Topic
public static void main(String[] args) {
KafkaProducer<String, String> producer = createProducer();
KafkaConsumer<String, String> consumer = createConsumer();
// 1.初始化事务
producer.initTransactions();
while (true) {
try {
// 2.开启事务
producer.beginTransaction();
// 定义Map结构,用于保存分区对应的offset
Map<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();
// 3.拉取ods_user的消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
System.out.println("原始消息:" + record.value());
// 4.保存偏移量
offsetCommits.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
// 5.进行转换处理
String[] fields = record.value().split(",");
fields[1] = fields[1].equalsIgnoreCase("1") ? "男" : "女";
String newMsg = fields[0] + "," + fields[1] + "," + fields[2];
// 6.将新消息生产到dwd_user
producer.send(new ProducerRecord<>("dwd_user", newMsg));
System.out.println("新消息:" + newMsg);
}
// 7.提交偏移量到事务
producer.sendOffsetsToTransaction(offsetCommits, "my_group");
// 8.提交事务
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 9.放弃事务
producer.abortTransaction();
}
}
}
7.3.6 测试
执行上述main()
方法,然后向Topic[ods_user
]发送消息:
main()
方法日志打印:
从新Topic[dwd_user
]消费消息:
7.3.7 模拟异常测试事务
假设在进行转换处理的时候出现异常。再次向Topic[ods_user
]发送消息,程序会读取到消息,但转换报错:
再次重启main()
方法,还是可以读取到消息,但转换报错。说明消息一直都没有被消费成功,offset没有被提交,Kafka事务生效了。
…
本节完,更多内容请查阅分类专栏:微服务学习笔记
感兴趣的读者还可以查阅我的另外几个专栏:
- SpringBoot源码解读与原理分析
- MyBatis3源码深度解析
- Redis从入门到精通
- MyBatisPlus详解
- SpringCloud学习笔记