使用 Spark Structured Streaming 读取带有架构的 Kafka Connect JSONConverter 消息
我正在尝试读取来自 Kafka 主题的消息.消息格式如下(示例格式):
I am trying to read message from Kafka Topic. Message are in below format (sample format):
{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}
另外,请注意主题包含来自不同表格的消息,而不仅仅是 1 个表格.
Also, please note topic has message from different tables and not just 1 table.
我想要实现的是使用 Spark Structured Streaming 从 Kafka Topic 读取上述消息,并创建一个带有列名的数据框,其值都来自 JSON 消息本身.
What I am trying to achieve is to read above message from Kafka Topic using Spark Structured Streaming and create a dataframe with column names ad its value both coming from JSON message itself.
我不想使用 case 类或 StructType 显式定义架构.
I don't want to explicitly define a schema using case class or StructType.
我试过了:
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", "topic1").option("startingOffsets", "earliest").load()
val y=df.select(get_json_object(($"value"), "$.payload").alias("payload")
当我查看 Y(它是一个数据框)时,它是 1 列,有效载荷下的值是该列中的 JSON.
When I view Y (which is a dataframe), it comes as 1 column with value under payload as JSON in that column.
如何获取数据框中的单个列?我没有做到这一点.
How to get individual column in a dataframe? I am not achieve this.
(再次重申,我不能使用通用 case 类或 StructType 作为架构部分,因为通过 Kafka 消息传来的消息来自不同的表,所以我希望在运行时从 JSON 本身创建更多的动态架构.)
(Again reiterating I cannot use a generic case class or StructType for schema part as messages coming through Kafka message are from different tables so I want a more of dynamic Schema created from JSON itself on the run.)
选项 1:更改 Kafka Connect 源以设置 value.converter.schemas.enable=false
.这只会给你(开始时未包装的有效载荷),然后你可以跳到下面的帖子.
Option 1: Change the Kafka Connect source to set value.converter.schemas.enable=false
. This will only give you the (unwrapped payload to begin with), then you can skip to below post.
否则,在剥离 Connect 架构后,您需要使用 from_json()
来应用架构
Otherwise, after you strip the Connect schema, you would need to use from_json()
to apply a schema
val y = df.select(get_json_object($"value", "$.payload").alias("payload"))
val z = df.select(from_json($"payload", schema))
你所有的字段都是字符串,所以看起来像
All your fields are strings, so would look like
val schema: StructType = StructType(Seq(
StructField("emp_id", StringType()),
StructField("emp_name", StringType()),
StructField("city", StringType()),
StructField("emp_sal", StringType()),
StructField("manager_name", StringType())
))
相关