在Kafka-Python中流式传输CSV数据

问题描述:

使用Kafka-Python将CSV数据发送到Kafka主题.消费者成功发送和接收数据.现在,我试图连续流式处理csv文件,添加到文件中的任何新条目都应自动发送到Kafka主题.任何建议对于连续流CSV文件都是有帮助的

Am sending the CSV data to Kafka topic using Kafka-Python. Data is sent and received by Consumer successfully. Now am trying to stream a csv file continuously, any new entry added to the file should be automatically sent to Kafka topic. Any suggestion would be helpful on continuous streaming of CSV file

下面是我现有的代码,

   from kafka import KafkaProducer
   import logging
   from json import dumps, loads
   import csv
   logging.basicConfig(level=logging.INFO)


   producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda 
   K:dumps(K).encode('utf-8'))

   with open('C:/Hadoop/Data/Job.csv', 'r') as file:
   reader = csv.reader(file, delimiter = '\t')
       for messages in reader:
       producer.send('Jim_Topic', messages)
       producer.flush()

Kafka Connect(Apache Kafka的一部分)是在Kafka与其他系统(包括平面文件)之间进行提取和导出的好方法.

Kafka Connect (part of Apache Kafka) is a good way to do ingest and egress between Kafka and other systems, including flat files.

您可以使用 Kafka Connect SpoolDir连接器将CSV文件流式传输到Kafka.从 Confluent Hub 安装,然后为您提供配置源文件:

You can use the Kafka Connect SpoolDir connector to stream CSV files into Kafka. Install it from Confluent Hub, and then provide it with configuration for your source file:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "orders_spooldir_00",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "schema.generation.enabled":"true",
        "csv.first.row.as.header":"true"
        }'

有关更多示例,请参见此博客和详细信息.

See this blog for more examples and details.