GCP Dataflow 是否支持 python 中的 kafka IO?

问题描述:

我正在尝试使用 python 代码中的 kafka.ReadFromKafka() 方法从 kafka 主题读取数据.我的代码如下所示:

I am trying to read data from kafka topic using kafka.ReadFromKafka() method in python code.My code looks like below:

from apache_beam.io.external import kafka
import apache_beam as beam

options = PipelineOptions()

with beam.Pipeline(options=options) as p:
           plants = (
      p
        |       'read' >> kafka.ReadFromKafka({'bootstrap.servers': 'public_ip:9092'}, ['topic1']))

但收到以下错误消息.

ERROR:apache_beam.runners.runner:访问读取回溯时出错(最近一次调用最后一次):文件test_file.py",第 16 行,在 中.|'阅读'>>kafka.ReadFromKafka({'bootstrap.servers': 'localhost:9092'}, ['topic1']) 文件/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py",行547,在 __exit__ self.run().wait_until_finish() 文件/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py",第 526 行,在运行中返回 self.runner.run_pipeline(self, self._options) 文件/usr/local/lib/python3.7/dist-packages/apache_beam/runners/dataflow/dataflow_runner.py",第 565 行,在 run_pipeline self.visit_transforms(pipeline, options) 文件中;/usr/local/lib/python3.7/dist-packages/apache_beam/runners/runner.py", line 224, in visit_transforms pipeline.visit(RunVisitor(self)) 文件 "/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py",第 572 行,访问 self._root_transform().visit(visitor, self,visited) 文件/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py", line 1075, invisit part.visit(visitor, pipeline,visited) 文件 "/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py",第 1078 行,在访问visitor.visit_transform(self) 文件/usr/local/lib/python3.7/dist-packages/apache_beam/runners/runner.py",第 219 行,在 visit_transform self.runner.run_transform(transform_node, options) 文件/usr/local/lib/python3.7/dist-packages/apache_beam/runners/runner.py",第 249 行,在 run_transform(transform_node.transform, self)) NotImplementedError: 执行 [<ReadFromKafka(PTransform) label=[ReadFromKafka(beam:external:java:kafka:read:v1)]>] 没有在 runner <apache_beam.runners 中实现.dataflow.dataflow_runner.DataflowRunner 对象位于 0x7f72463344a8>.

是不是因为apache beam Dataflow runner不支持kafkaIO?

Is it because apache beam Dataflow runner doesn't support kafkaIO ?

beam 的 python SDK 确实支持连接到 Kafka.下面是代码片段

The python SDK for beam does support connecting to Kafka. Below is a code snippet

from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio

kafka_topic = "notifications"
kafka_config = {"topic": kafka_topic,
                "bootstrap_servers": "localhost:9092",
                "group_id": "notification_consumer_group"}

with beam.Pipeline(options=PipelineOptions()) as p:
    notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(kafka_config)
    notifications | 'Writing to stdout' >> beam.Map(print)

bootstrap_servers 是一个逗号分隔的主机和端口配置,您的代理部署在其中.您将从您的 Kafka 集群配置中获取此信息.

The bootstrap_servers is a comma separated host and port configuration where your brokers are deployed. You will get this information from your Kafka cluster configuration.