初识kafka-connect 一、kakfak-connect简介 二、独立模式 三、问题 四、REST API

kafka-connet是一个工具,用来在kafka和外部数据存储系统之间移动数据,kafka-connect可以简单快捷地将数据从kafka导入或导出,数据范围涵盖了关系型数据库、日志、度量数据、Hadoop、数据仓库、NoSql数据存储、ES等。

kafka-connect架构如下(图片来源:百度):

初识kafka-connect
一、kakfak-connect简介
二、独立模式
三、问题
四、REST API

kafka-connect有两个核心概念:Source和Sink。Source:负责导入数据到kafka,Sink负责从kafka导出数据,它们统称Connector,即连接器。

另外还有两个重要概念:Task和Worker,每一个Connector都会协调一系列的task去执行任务,Connector把一项工作任务分割成许多的task,然后把task分发到各个worker进程中去执行。task不保存自己的状态信息,而是交给特定的kafka主题去保存。

kafka-connect提供了以下特性:

初识kafka-connect
一、kakfak-connect简介
二、独立模式
三、问题
四、REST API

即:

  • 通用性:规范化其他数据系统与kafka的继集成,简化了连接器的开发、部署和管理
  • 支持独立模式(standalone)分布式模式(distributed)
  • REST接口:使用Rest API提交和管理Connector
  • 自动位移管理:自动管理位移的提交,不需要开发人员干预,降低了开发成本
  • 分布式和可扩展性:Kafka Connect 基于现有的组管理协议来实现扩展Kafka Connect 集群
  • 流式计算和批处理的集成

kafka中通过connect-standalone.sh和connect-distributed.sh命令来实现独立模式和分布式模式运行的Kafka Connect,可以在kafka的/bin目录下看到:

初识kafka-connect
一、kakfak-connect简介
二、独立模式
三、问题
四、REST API

二、独立模式

在独立模式中,所有操作都是在一个进程中完成的,它比较适合测试和功能验证的场景,但是无法充分利用kafka自身所提供的负载均衡和高容错特性。

下面来演示一下使用独立模式将一个文件中的内容导入到kafka中。

2.1 Source连接器用法

1、修改配置文件

  • connect-standalone.properties:用于Work进程运行的配置文件
  • connect-file-source.properties:Source连接器的配置文件

connect-standalone.properties内容如下(一般情况下使用默认配置即可):

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets

connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/kafka/test-connect-source.txt
topic=topic-connect

要修改:

file:该连接器数据源文件路径

topic:设置连接器将数据导入哪个主题,如果该主题不存在则会自动创建,当然也可以自己提前创建好(推荐)

2、创建topic

我选择手动创建topic-connect,创建命令如下:

./kafka-topics --zookeeper localhost:2181 --create --topic topic-connect --replication-factor 1 --partitions 1

创建完成后查看以下topic的创建结果:

./kafka-topics --zookeeper localhost:2181 --describe --topic topic-connect

创建结果如下:

初识kafka-connect
一、kakfak-connect简介
二、独立模式
三、问题
四、REST API

 3、启动source连接器

./bin/connect-standalone ./libexec/config/connect-standalone.properties ./libexec/config/connect-file-source.properties

4、向test-connect-source.txt文件写入数据

echo "hello world">>/tmp/kafka/test-connect-source.txt

5、查看结果

查看的方式有两种,一种是通过kafka-console-consumer.sh脚本,一种是kafka-dump-log.sh脚本,前者可以实时查看效果,后者每次写入后都要执行命令才能看到,下面演示通过这两种方式查看的效果:

5.1 kafka-console-consumer.sh
./kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-connect

结果如下:

初识kafka-connect
一、kakfak-connect简介
二、独立模式
三、问题
四、REST API

 5.2 kafka-dump-log.sh
./kafka-dump-log --files /usr/local/var/lib/kafka-logs/topic-connect-0/00000000000000000000.log --print-data-log

结果如下:

 初识kafka-connect
一、kakfak-connect简介
二、独立模式
三、问题
四、REST API

 以上是Source连接器的用法,下面再来探索一下Sink连接器的用法.

2.2 Sink连接器用法

1、修改配置文件

  • connect-standalone.properties:用于Work进程运行的配置文件
  • connect-file-sink.properties:Sink连接器的配置文件

connect-standalone.properties内容如下(需要修改key和value的converter方式):

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets

connect-file-sink.properties内容如下:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/kafka/test-connect-sink.txt
topic=topic-connect-sink

2、创建topic

我选择手动创建topic-connect,创建命令如下:

./kafka-topics --zookeeper localhost:2181 --create --topic topic-connect-sink --replication-factor 1 --partitions 1

创建完成后查看以下topic的创建结果:

./kafka-topics --zookeeper localhost:2181 --describe --topic topic-connect-sink

 初识kafka-connect
一、kakfak-connect简介
二、独立模式
三、问题
四、REST API

 3、启动sink连接器

./bin/connect-standalone ./libexec/config/connect-standalone.properties ./libexec/config/connect-file-sink.properties

4、发送消息

发送消息到topic-connect-sink

./bin/kafka-console-producer --broker-list localhost:9092 --topic topic-connect-sink

初识kafka-connect
一、kakfak-connect简介
二、独立模式
三、问题
四、REST API

5、查看sink文件

cat test-connect-sink.txt

初识kafka-connect
一、kakfak-connect简介
二、独立模式
三、问题
四、REST API

 可以看到发送到test-connect-sink 这个topic的消息成功存储到sink文件test-connect-sink.txt中了。

三、问题

在实践sink的过程中,我本来是想通过Source将一条消息从source文件导入kafka,同时通过Sink将该条消息从kafka中导出到sink文件,配置如下:

只要将connect-file-source.properties和connect-file-sink.properties这两个配置文件中的topic改成相同的即可,但是执行之后却发现:

  • 当向test-connect-source.txt文件写入消息时,并不会在test-connect-sink.txt文件中写入,也不会在通过kafka-console-consumer.sh消费到;
  • 当通过命令向topic-connect发送消息时,不会写入test-connect-sink.txt文件,但是能通过kafka-console-consumer.sh消费到;

后来经过一下午的排查,居然发现一直改的是connect-console-source.propertiesconnect-console-sink.properties文件,而不是connect-file-source.propertiesconnect-file-properties,

真的也是被自己蠢哭的一下午,然后重新修改了配置文件,通过以下命令重启connecter之后,在conncect-source.txt写入,在connect-sink.txt中就能看到了。

四、REST API

可以通过Kafka Connect 提供的基于REST风格的API接口来管理连接器,默认端口号是8083,可以通过Worker进程配置文件中的rest.port参数来修改端口号。

如:

curl localhost:8083/connectors

初识kafka-connect
一、kakfak-connect简介
二、独立模式
三、问题
四、REST API

方法请求类型 REST API 接口说明
GET / 查看kafka集群版本信息
GET /connectors 查看当前活跃的连接器列表,显示连接器的名字,即配置connector配置文件中的name属性
POST /connectors 根据指定配置,创建一个新的连接器
GET /connectors/{name} 查看指定连接器的信息
GET /connectors/{name}/config 查看指定连接器的配置信息
PUT /connectors/{name}/config 修改指定连接器的配置信息
GET /connectors/{name}/state 查看指定连接器的状态
POST /connectors/{name}/restart 重启指定的连接器
PUT /connectors/{name}/pause 暂停指定的连接器
GET /connectors/{name}/tasks 查询指定连接器正在运行的task
POST /connectors/name}/tasks 修改指定连接器的Task配置
GET /connectors/{name}/tasks/{taskId}/status 查询指定连接器中指定Task的状态
POST /connectors/{name}/tasks/{taskId}/restat 重启指定连接器中指定的Task
DELETE /connectors/{name} 删除指定的连接器

大周末的,写了一天博客,这效率太低了,出门透透气~~~

参考文献:

朱忠华 《深入理解Kafka核心设计与实践原理》