Kafka的生产者

Kafka的生产者

一、数据生产流程

  Kafka的生产者

  1、创建ProducerRecord对象,该对象出来包括要发送的数据,还必须指定topic,也可以指定key,value和分区,发送ProducerRecord的时候,生产者做的第一件事就是把key和value序列化成ByteArrays,以便他们可以通过网络发送。

  2、接下来,数据会被发送到分区器,如果ProducerRecord中指定了分区,则分区器直接返回指定分区,否则,分区器通常会基于ProducerRecord的key值计算出一个分区,一旦分区被确认,生产者就知道数据会被发送到那个topic和分区,然后数据会被添加到同一批次发送到相同的topic和分区的数据里面,一个单独的线程会负责把那批数据发送到对应的broker上

  3、当broker接收到数据之后,如果数据已被成功写入到kafka,会返回一个包含topic,分区和偏移量offset的recordMetdata对象,如果写入失败,会返回异常,当生产者接受到异常信息,会尝试重新发送,如果尝试失败则抛出异常

 二、创建生产者

  必须指定三个属性:

    1.bootstarp.server:集群地址

    2.key.serializer:用于序列化keys的类名,Kafka brokers期待key和value的类型为byte数组,但是也允许使用参数化的Java对象作为key和value。这使得代码非常易读,但也意味着生产者必须知道如何把这些对象转换为byte数组。key.serializer应设为实现了org.apache.kafka.common.serialization.Serializer接口的类名,生产者将会使用这个类来把key对象序列化为byte数组。Kafka内置实现了ByteArraySerializer、StringSerializer和IntegerSerializer。注意,即使生产者发送的数据没有指定key,也必须设置key.serializer这个属性。

    3.value.serializer:用于序列化value的类名。类似于key.serializer,生产者将会使用指定的类来把value对象序列化为byte数组。

Kafka的生产者

     也可以通过ProducerConfig提取

  Kafka的生产者

     点击ProducerConfig可以看看已经设置了相关配置

    Kafka的生产者

 三、发送消息:

  有两种类型,同步和异步
    同步发送:通过send()发送完之后会返回一个Future对象,然后调用Future对象的get()方法等待kafka的响应,如果kafka正常响应,返回一个RecordMetadata对象,该对象存储消息的偏移量

Kafka的生产者

 Kafka的生产者

     异步发送:异步相当于重新启动一个线程,不阻塞当前主流业务处理,异步发送,在调用send()方法的时候指定一个callback函数,当broker接收到返回的时候,该callback函数会被触发执行。

Kafka的生产者

 Kafka的生产者

     要使用callback函数,先要实现org.apache.kafka.clients.producer.Callback接口,该接口只有一个onCompletion方法。如果发送异常,onCompletion的参数Exception e会为非空。

四、生产者的其他配置属性

  1、acks: 指定分区中必须有多少个副本接收到数据。此配置是生产者中非常重要的参数,它涉及到消息的可靠性和吞吐量之间的权衡,默认值为1

      acks=0,生产者不会等待broker任何确认,消息会被立即添加到缓冲区并被认为已经发送。在这种情况下,不能保证服务器已经接收到信息,并且重试配置不会生效(因为客户端不知道任何异常),每条消息返回的         偏移量始终是-1(因为生产者不等待broker确认,以最快的速度发送消息,所以适用于实现非常高的吞吐量)

      acks=1,在leader服务器的副本收到消息的同一时间,生产者也会接受到broker的确认。如果消息不能写入leader的副本(例如:learder宕机并且还没有选出新的leader),生产者将会接收到异常,然后可以重新发送书数据,防止丢失,如果通过不确定的learder选举宕机并且消息没有被写入到新的leader,该消息仍然会消失,在这种情况下,吞吐量取决于消息是同步还是异步,如果用同步,明显会延迟,如果用异步,延迟不那么明显,但吞吐量将受到正在发送消息数量的限制

      acks=-1,一旦所有的同步副本都接收到消息,生产者才会接收到broker的确认。这是最安全的,但也是最慢的

  2、retries: 生产者从服务器收到的错误有可能是临时性的错误(例如:分区找不到首领)在这种情况下,如果达到了retries设置的次数,生产者会放弃重试,并返回错误,默认等行100ms,可以通过retry.backoff.ms参数来修改这个时间间隔   

  3、batch.size:   发送到同一个partition的消息会被先放在同一个批次中,该参数指定一个批次可以使用的内存大小,默认为16384=16KB,单位是byte。不一定需要等到batch被填满才能发送

  4.max.request.size:用于控制生产者发送的请求大小,他可以指定能发送的单个消息的最大值,也可以指单个请求里所有的消息的总大小,broker对可接收的消息最大值也有自己的限制(message.max.size),所以两边的配置最好匹配,避免生产者发送的消息被broker拒绝

  5、buffer.memory:设置生产者内缓存区域的大小,生产者用它缓冲要发送到服务器的消息。

  6、linger.ms:生产者在发送消息前等待linger.ms,从而等待更多的消息加入到batch中。如果batch被填满或者linger.ms达到上限,就把batch中的消息发送出去

五、序列化器

    1、作用:消息要到网络.上进行传输,必须进行序列化,而序列化器的作用就是如此。

      Kafka提供了默认的字符串序列化器(org. apache.kafka.commonselalzation.StringSerillzer) .还有整型(IntegerSerializer)和字节数组(BytesSerializer) 序列化器,这些序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer)基本上能够满足大部分场景的需求。

    2、自定义序列化器

      举例:kafka在消息传递的时候,消息主题是一个自定义的类

      Kafka的生产者

      Kafka的生产者

 Kafka的生产者

   

  在创建ProducerRecord时,必须指定序列化器,推荐使用序列化框架Avro、Thrift、ProtoBuf等,不推荐自己创建序列化器。

    在使用 Avro 之前,需要先定义模式(schema),模式通常使用 JSON 来编写。(该例子来自: https://www.cnblogs.com/sodawoods-blogs/p/8969513.html,方便学习)

  举例2:

(1)创建一个类代表客户,作为消息的value

  Kafka的生产者

(2)定义schema

Kafka的生产者

 (3)生成Avro对象发送到Kafka

Kafka的生产者

 六、分区器

    本身kafka有自己的分区策略的,如果未指定,就会使用默认的分区策略。

    Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions.如果Key相同的话,那么就会分配到统一分区。一般情况采用默认

Kafka的生产者

 Kafka的生产者

 七、拦截器

  Producer拦截器(interceptor)是个相当新的功能,它和consumer端interceptor是 在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。

  生产者拦截器可以用在消息发送前做一些准备工作。

    使用场景:

      1.按照某个规则过滤掉不符合要求的消息

      2.修改消息的内容

      3.统计类需求

  举例:发送所有的消息加前缀:

Kafka的生产者

 Kafka的生产者

发送方:

Kafka的生产者

 消费方结果:

Kafka的生产者