欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

Kafka 学习笔记 (I) Kafka 基准测试、闲置和事务、Java 编程操作 Kafka-7 Kafka 事务

最编程 2024-10-01 18:55:52
...

7.1 Kafka事务介绍

通过事务机制,Kafka可以实现对多个Topic的多个Partition的原子性的写入,即处于同一个事务内的所有消息,不管最终需要落地到哪个Topic 的哪个Partition,最终结果都是要么全部写成功,要么全部写失败

开启事务,必须开启幂等性,Kafka的事务机制,在底层依赖于幂等生产者。

7.2 事务操作API

要开启Kafka事务,生产者需要添加以下配置:

// 配置事务的id,开启了事务会默认开启幂等性
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional");

消费者则需要添加以下配置:

// 设置隔离级别
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// 关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

Producer接口中定义了以下5个事务相关方法:

  • 1.initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作
  • 2.beginTransaction(开始事务):启动一个Kafka事务
  • 3.sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交
  • 4.commitTransaction(提交事务):提交事务
  • 5.abortTransaction(放弃事务):取消事务

7.3 Kafka事务编程

7.3.1 需求

在Kafka的Topic[ods_user]中有一些用户数据,数据格式如下:

姓名,性别,出生日期
张三,1,1980-10-09
李四,0,1985-11-01

现在要编写程序,将用户的性别转换为男、女(1-男,0-女),转换后将数据写入到Topic[dwd_user]中。要求使用事务保障,要么消费了数据的同时写入数据到新Topic,提交offset;要么全部失败。

7.3.2 创建Topic

启动生产者控制台程序,准备发送消息到Topic[ods_user]:

[root@node-01 kafka01]$ bin/kafka-console-producer.sh --broker-list 192.168.245.130:9092 --topic ods_user
>

启动消费者控制台程序,准备从新Topic[dwd_user]消费消息:

[root@node-01 kafka01]$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.245.130:9092 --topic dwd_user --from-beginning  --isolation-level read_committed

7.3.3 编写生产者

private static KafkaProducer<String, String> createProducer() {
    // 1.创建用于连接Kafka的Properties配置
    Properties props = new Properties();
    // 配置事务的id,开启了事务会默认开启幂等性
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional");
    props.put("bootstrap.servers", "192.168.245.130:9092");
    props.put("acks", "all");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // 2.创建一个生产者对象KafkaProducer
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    return producer;
}

7.3.4 创建消费者

private static KafkaConsumer<String, String> createConsumer() {
    // 1.创建Kafka消费者配置
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092");
    props.setProperty("group.id", "my_group");
    // 关闭自动提交offset
    props.setProperty("enable.auto.commit", "false");
    // 事务隔离级别
    props.put("isolation.level", "read_committed");
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    // 2.创建Kafka消费者
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    // 3.订阅要消费的主题
    consumer.subscribe(Arrays.asList("ods_user"));
    return consumer;
}

7.3.5 消费旧Topic数据并生产到新Topic

public static void main(String[] args) {
    KafkaProducer<String, String> producer = createProducer();
    KafkaConsumer<String, String> consumer = createConsumer();
    // 1.初始化事务
    producer.initTransactions();
    while (true) {
        try {
            // 2.开启事务
            producer.beginTransaction();
            // 定义Map结构,用于保存分区对应的offset
            Map<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();
            // 3.拉取ods_user的消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("原始消息:" + record.value());
                // 4.保存偏移量
                offsetCommits.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
                // 5.进行转换处理
                String[] fields = record.value().split(",");
                fields[1] = fields[1].equalsIgnoreCase("1") ? "男" : "女";
                String newMsg = fields[0] + "," + fields[1] + "," + fields[2];
                // 6.将新消息生产到dwd_user
                producer.send(new ProducerRecord<>("dwd_user", newMsg));
                System.out.println("新消息:" + newMsg);
            }
            // 7.提交偏移量到事务
            producer.sendOffsetsToTransaction(offsetCommits, "my_group");
            // 8.提交事务
            producer.commitTransaction();
        } catch (ProducerFencedException e) {
            // 9.放弃事务
            producer.abortTransaction();
        }

    }
}

7.3.6 测试

执行上述main()方法,然后向Topic[ods_user]发送消息:

main()方法日志打印:

从新Topic[dwd_user]消费消息:

7.3.7 模拟异常测试事务

假设在进行转换处理的时候出现异常。再次向Topic[ods_user]发送消息,程序会读取到消息,但转换报错:

再次重启main()方法,还是可以读取到消息,但转换报错。说明消息一直都没有被消费成功,offset没有被提交,Kafka事务生效了。

本节完,更多内容请查阅分类专栏:微服务学习笔记

感兴趣的读者还可以查阅我的另外几个专栏:

  • SpringBoot源码解读与原理分析
  • MyBatis3源码深度解析
  • Redis从入门到精通
  • MyBatisPlus详解
  • SpringCloud学习笔记