欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

深入理解Kafka的生产者:消息发送的高级特性

最编程 2024-01-27 15:12:45
...

消息生产流程

在这里插入图片描述

整个流程如下:

  1. Producer创建时,会创建一个Sender线程并设置为守护线程。
  2. 生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。
  3. 批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。
  4. 批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端内部会对该消息进行重试。
  5. 落盘到broker成功,返回生产元数据给生产者。
  6. 元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回。

ProducerRecord

在生产发送消息前,会将信息封装成ProducerRecord对象。主要由以下几部分组成:

private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;

其中主要有要发送的Topic名称,要发送至那个分区,以及要发送的数据和key。

其他的都比较好理解,key的作用是如果key存在的话,就会对key进行hash,然后根据不同的结果发送至不同的分区,这样当有相同的key时,所有相同的key都会发送到同一个分区,我们之前也提到,所有的新消息都会被添加到分区的尾部,进而保证了数据的顺序性。

序列化器

在这里插入图片描述

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。

序列化器的作用就是用于序列化要发送的消息的。

Kafka使用org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数组,如下:

public interface Serializer<T> extends Closeable {
    default void configure(Map<String, ?> configs, boolean isKey) {
    }

    byte[] serialize(String var1, T var2);

    default byte[] serialize(String topic, Headers headers, T data) {
        return this.serialize(topic, data);
    }

    default void close() {
    }
}

其中kafka提供了许多实现类。

在这里插入图片描述

除了上述提供的,还可以自定义序列化器,只要实现Serializer接口即可。

假如我们有如下实体类:

public class User {
    private Integer userId;
    private String username;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }
}

序列化类:

import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.common.errors.SerializationException; 
import org.apache.kafka.common.serialization.Serializer; 
import java.io.UnsupportedEncodingException;
import java.nio.Buffer; 
import java.nio.ByteBuffer; 
import java.util.Map;


public class UserSerializer implements Serializer<User> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // do nothing
    }

    @Override
    public byte[] serialize(String topic, User data) {
        try {
            // 如果数据是null,则返回null 
            if (data == null) return null;
            Integer userId = data.getUserId();
            String username = data.getUsername();
            int length = 0;
            byte[] bytes = null;
            if (null != username) {
                bytes = username.getBytes("utf-8");
                length = bytes.length;
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
            buffer.putInt(userId);
            buffer.putInt(length);
            buffer.put(bytes);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("序列化数据异常");
        }
    }

    @Override
    public void close() {
        // do nothing
    }
}

分区器

在这里插入图片描述

分区器来计算消息该发送到哪个分区中。

默认(DefaultPartitioner)分区计算:

  1. 如果record提供了分区号,则使用record提供的分区号
  2. 如果record没有提供分区号,则使用key的序列化后的值的hash值对分区数量取模
  3. 如果record没有提供分区号,也没有提供key,则使用轮询的方式分配分区号。

也可以自定义分区器,需要

  1. 首先开发Partitioner接口的实现类
  2. 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xx.Xxx.class”)

拦截器

在这里插入图片描述

Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用于实现Client端的定制化控制逻辑。

对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。

注意拦截器发生异常抛出的异常会被忽略;此外,也要注意一种情况:如果某个拦截器依赖上一个拦截器的结果,但是当上一个拦截器异常,则该拦截器可能也不会正常工作,因为他接受到的是上一个成功返回的结果,而不是期望的上一个拦截器结果。

Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。
  • close:关闭Interceptor,主要用于执行一些资源清理工作。

如前所述,Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们。

自定义拦截器步骤:

  1. 实现ProducerInterceptor接口
  2. 在KafkaProducer的设置中设置自定义的拦截器

案例:

消息实体类:

public class User {
    private Integer userId;
    private String username;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }
}

还是用上面的序列化器。

自定义拦截器1:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class InterceptorOne implements ProducerInterceptor<Integer, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);

    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        System.out.println("拦截器1 -- go");


        // 消息发送的时候,经过拦截器,调用该方法

        // 要发送的消息内容
        final String topic = record.topic();
        final Integer partition = record.partition();
        final Integer key = record.key();
        final String value = record.value();
        final Long timestamp = record.timestamp();
        final Headers headers = record.headers();


        // 拦截器拦下来之后根据原来消息创建的新的消息
        // 此处对原消息没有做任何改动
        ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
                topic,
                partition,
                timestamp,
                key,
                value,
                headers
        );
        // 传递新的消息
        return newRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("拦截器1 -- back");
        // 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
        // 会影响kafka生产者的性能。
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
        final Object classContent = configs.get("classContent");
        System.out.println(classContent);
    }
}

自定义拦截器2:

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class InterceptorTwo implements ProducerInterceptor<Integer, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);

    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        System.out.println("拦截器2 -- go");


        // 消息发送的时候,经过拦截器,调用该方法

        // 要发送的消息内容
        final String topic = record.topic();
        final Integer partition = record.partition();
        final Integer key = record.key();
        final String value = record.value();
        final Long timestamp = record.timestamp();
        final Headers headers = record.headers();


        // 拦截器拦下来之后根据原来消息创建的新的消息
        // 此处对原消息没有做任何改动
        ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
                topic,
                partition,
                timestamp,
                key,
                value,
                headers
        );
        // 传递新的消息
        return newRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("拦截器2 -- back");
        // 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
        // 会影响kafka生产者的性能。
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
        final Object classContent = configs.get("classContent");
        System.out.println(classContent);
    }
}

正常那个运行会打印:

拦截器1 -- go
拦截器2 -- go
拦截器1 -- back
拦截器2 -- back

即发送和回调都是按照链的顺序正序来的。

推荐阅读