GCP Dataflow 是否支持 python 中的 kafka IO?
我正在尝试使用 python 代码中的 kafka.ReadFromKafka() 方法从 kafka 主题读取数据.我的代码如下所示:
I am trying to read data from kafka topic using kafka.ReadFromKafka() method in python code.My code looks like below:
from apache_beam.io.external import kafka
import apache_beam as beam
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
plants = (
p
| 'read' >> kafka.ReadFromKafka({'bootstrap.servers': 'public_ip:9092'}, ['topic1']))
但收到以下错误消息.
ERROR:apache_beam.runners.runner:访问读取回溯时出错(最近一次调用最后一次):文件test_file.py",第 16 行,在
是不是因为apache beam Dataflow runner不支持kafkaIO?
Is it because apache beam Dataflow runner doesn't support kafkaIO ?
beam 的 python SDK 确实支持连接到 Kafka.下面是代码片段
The python SDK for beam does support connecting to Kafka. Below is a code snippet
from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio
kafka_topic = "notifications"
kafka_config = {"topic": kafka_topic,
"bootstrap_servers": "localhost:9092",
"group_id": "notification_consumer_group"}
with beam.Pipeline(options=PipelineOptions()) as p:
notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(kafka_config)
notifications | 'Writing to stdout' >> beam.Map(print)
bootstrap_servers 是一个逗号分隔的主机和端口配置,您的代理部署在其中.您将从您的 Kafka 集群配置中获取此信息.
The bootstrap_servers is a comma separated host and port configuration where your brokers are deployed. You will get this information from your Kafka cluster configuration.