Spark Streaming:从具有多个架构的kafka读取数据
我正在为Spark Streaming的实现而苦苦挣扎.
I am struggling with the implementation in spark streaming.
kafka发出的消息看起来像这样,但具有更多字段
The messages from the kafka looks like this but with with more fields
{"event":"sensordata", "source":"sensors", "payload": {"actual data as a json}}
{"event":"databasedata", "mysql":"sensors", "payload": {"actual data as a json}}
{"event":"eventApi", "source":"event1", "payload": {"actual data as a json}}
{"event":"eventapi", "source":"event2", "payload": {"actual data as a json}}
我正在尝试从Kafka主题(具有多个架构)中读取消息.我需要阅读每条消息并查找事件和源字段,并确定将其存储为数据集的位置.实际数据以JSON形式存在于字段有效负载中,而JSON只是一条记录.
I am trying to read the messages from a Kafka topic (which has multiple schemas). I need to read each message and look for an event and source field and decide where to store as a Dataset. The actual data is in the field payload as a JSON which is only a single record.
有人可以帮助我实施此方法或其他替代方法吗?
Can someone help me to implement this or any other alternatives?
这是在同一个主题中发送具有多个架构的消息并加以使用的好方法吗?
Is it a good way to send the messages with multiple schemas in the same topic and consume it?
预先感谢
您可以根据传入的JSON对象创建Dataframe
.
You can create a Dataframe
from the incoming JSON object.
创建JSON对象的Seq[Sring]
.
使用val df=spark.read.json[Seq[String]]
.
在您选择的dataframe df
上执行操作.
Perform the operations on the dataframe df
of your choice.