Flink,使用多个Kafka源时如何正确设置并行度?

问题描述:

我仍然无法清楚地了解并行性,假设我们有一个有足够插槽的 flink 集群.在我们的 flink 作业中,我们使用了来自 3 个不同 kafka 集群的 3 个 kafka 主题,每个主题有 10 个分区.

I still cannot get a clear idea of parallelism, let's say we have a flink cluster which has enough slots. In our flink job, we consume 3 kafka topics from 3 different kafka clusters, each topic has 10 partitions.

如果我们想尽快消费消息.并行度应该是多少?

If we want to consume the message as soon as possible. What should the parallelism be?

如果我们设置parallelism为10,那么就会使用10个slot,按照我的理解,就是10个线程对吧?如果这 10 个线程连接"到 topic_1,则没有线程读取 topic_2 和 topic_3.

If we set parallelism to 10, then 10 slots will be used, this means, according to my understanding, 10 threads right? If these 10 threads "connect" to the topic_1, then there's no thread reading topic_2 and topic_3.

如果我们将并行度设置为 30,则有 30 个线程,但这 30 个线程是否足够智能,其中 10 个转到 topic_1,其中 10 个转到 topic_2,其余 10 个转到 topic_3?

If we set parallelism to 30, then there're 30 threads, but will these 30 threads be smart enough that 10 of them go to topic_1, and 10 of them go to topic_2, and the rest 10 go to topic_3?

Kafka 消费者组

每个 Kafka 消费者都属于一个消费者组,即可以将其视为一组消费者的逻辑容器/命名空间.一个消费者组可以接收来自一个或多个主题的消息.消费者组中的实例可以从每个主题内的零个、一个或多个分区接收消息(取决于分区和消费者实例的数量)

Each Kafka consumer belongs to a consumer group i.e. it can be thought of as a logical container/namespace for a bunch of consumers. A consumer group can receive messages from one or more topics. Instances in a consumer group can receive messages from zero, one or more partitions within each topic (depending on the number of partitions and consumer instances)

Kafka 分区是如何分配给 Flink Worker 的?

在 Kafka 中,来自同一消费者组的每个消费者都会被分配一个或多个分区.请注意,两个消费者不可能从同一个分区消费.Flink 消费者的数量取决于 Flink 并行度,这意味着每个 Flink 任务(我们粗略地认为每个 Flink 任务 = Flink 插槽 = Flink 并行度 = 可用 CPU 核)可以作为一个消费者组中的一个单独的消费者.此外,您应该注意到主题只是对分区和数据进行分组的抽象,内部仅分区按照以下模式分配给 Flink 的并行任务实例.

In Kafka, each consumer from the same consumer group gets assigned one or more partitions. Note that it is not possible for two consumers to consume from the same partition. The number of Flink consumers depends on the Flink parallelism, meaning that each Flink Task (We roughly consider each Flink Task = Flink slots = Flink Parallelism = Available CPU core) can act as a separate consumer in a consumer group. Also, you should notice that topics are just an abstraction for grouping partitions and data, internally only partitions are assigned to Flink’s parallel task instances according to the following pattern.

三种可能的情况:

1.kafka 分区 == flink 并行性

这种情况是理想的,因为每个消费者负责一个分区.如果你的消息在分区之间是平衡的,那么工作将平均分配给 Flink 算子

This case is ideal since each consumer takes care of one partition. If your messages are balanced between partitions, the work will be evenly spread across Flink operators

2.卡夫卡分区

当 Flink 任务比 Kafka 分区多时,部分 Flink 消费者会空闲,不读取任何数据:

When there are more Flink tasks than Kafka partitions, some of the Flink consumers will just idle, not reading any data:

在这种情况下,并行度高于分区数(因为您想在未来的运算符中使用它),您可以在 Kafka 源代码之后执行 .rebalance() .这确保了 Kafka 源之后的所有算子都能获得均匀的负载,代价是不得不重新分配数据(因此存在反/序列化 + 网络开销).

In this case, where you have higher parallelism than the number of partitions (because you want to make use of it in a future operator), you could do a .rebalance() after the Kafka source. This makes sure that all operators after the Kafka source get an even load, at the cost of having to redistribute the data (so there is de/serialization + network overhead).

3.kafka 分区 > flink 并行

当Kafka分区多于Flink任务时,Flink消费者实例会同时订阅多个分区:

When there are more Kafka partitions than Flink tasks, Flink consumer instances will subscribe to multiple partitions at the same time:

在所有情况下,Flink 都会以最佳方式将任务分配给分区.

In all cases, Flink will optimally assign Tasks to the partitions.

在您的情况下,您可以使用 Flink Kafka 连接器创建 Kafka Consumer 组并为其分配一个或多个主题(例如使用 Regex).所以如果Kafka有3个topic,每个topic有10个partition,给Flink Job Manager分配30个slot(core),就可以达到理想的情况,即每个consumer(Flink slot)消费一个Kafka partition.

In your case, you can create the Kafka Consumer group using Flink Kafka connector and assign one or more topics to it (using Regex, for example). So if Kafka has three topics including 10 partitions each, assigning 30 slots (core) to Flink Job Manager, you can achieve the ideal case, meaning each consumer(Flink slot) will consume one Kafka partition.

参考资料:1, 2, 3