如何使Serdes与多步Kafka流一起使用

问题描述:

我是Kafka的新手,我正在使用Twitter API作为数据源来构建一个入门项目.我创建了一个Producer,可以查询Twitter API,并使用字符串序列化程序将数据发送到我的kafka主题,以获取键和值.我的Kafka Stream应用程序读取此数据并进行字数统计,但也按推文的日期进行分组.这部分是通过一个名为wordCounts的KTable来完成的,以利用其upsert功能.该KTable的结构为:

I am new to Kafka and I'm building a starter project using the Twitter API as a data source. I have create a Producer which can query the Twitter API and sends the data to my kafka topic with string serializer for both key and value. My Kafka Stream Application reads this data and does a word count, but also grouping by the date of the tweet. This part is done through a KTable called wordCounts to make use of its upsert functionality. The structure of this KTable is:

关键字:{word:exampleWord,日期:exampleDate},值:numberOfOccurences

Key: {word: exampleWord, date: exampleDate}, Value: numberOfOccurences

然后我尝试将KTable流中的数据重组为平面结构,以便稍后将其发送到数据库.您可以在wordCountsStructured KStream对象中看到此内容.这将重组数据,使其看起来像下面的结构.该值最初是JsonObject,但我将其转换为字符串以匹配我设置的Serdes.

I then attempt to restructure the data in the KTable stream to a flat structure so I can later send it to a database. You can see this in the wordCountsStructured KStream object. This restructures the data to look like the structure below. The value is initially a JsonObject but i convert it to a string to match the Serdes which i set.

Key: null, Value: {word: exampleWord, date: exampleDate, Counts: numberOfOccurences}

但是,当我尝试将其发送到第二个kafka主题时,出现以下错误.

However, when I try to send this to my second kafka topic, I get the error below.

一个序列化器(键:org.apache.kafka.common.serialization.StringSerializer/值:org.apache.kafka.common.serialization.StringSerializer)不是与实际的键或值类型兼容(键类型:com.google.gson.JsonObject/值类型:com.google.gson.JsonObject).更改StreamConfig中的默认Serdes或提供正确的Serdes通过方法参数.

A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: com.google.gson.JsonObject / value type: com.google.gson.JsonObject). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

我对此感到困惑,因为我要发送给主题的KStream类型为< String,String> .有谁知道我该怎么解决?

I'm confused by this since the KStream I am sending to the topic is of type <String, String>. Does anyone know how I might fix this?

public class TwitterWordCounter {

private final JsonParser jsonParser = new JsonParser();

public Topology createTopology(){
    StreamsBuilder builder = new StreamsBuilder();


    KStream<String, String> textLines = builder.stream("test-topic2");
    KTable<JsonObject, Long> wordCounts = textLines
            //parse each tweet as a tweet object
            .mapValues(tweetString -> new Gson().fromJson(jsonParser.parse(tweetString).getAsJsonObject(), Tweet.class))
            //map each tweet object to a list of json objects, each of which containing a word from the tweet and the date of the tweet
            .flatMapValues(TwitterWordCounter::tweetWordDateMapper)
            //update the key so it matches the word-date combination so we can do a groupBy and count instances
            .selectKey((key, wordDate) -> wordDate)
            .groupByKey()
            .count(Materialized.as("Counts"));

    /*
        In order to structure the data so that it can be ingested into SQL, the value of each item in the stream must be straightforward: property, value
        so we have to:
         1. take the columns which include the dimensional data and put this into the value of the stream.
         2. lable the count with 'count' as the column name
     */
    KStream<String, String> wordCountsStructured = wordCounts.toStream()
            .map((key, value) -> new KeyValue<>(null, MapValuesToIncludeColumnData(key, value).toString()));

    KStream<String, String> wordCountsPeek = wordCountsStructured.peek(
            (key, value) -> System.out.println("key: " + key + "value:" + value)
    );

    wordCountsStructured.to("test-output2", Produced.with(Serdes.String(), Serdes.String()));

    return builder.build();
}

public static void main(String[] args) {
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application1111");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "myIPAddress");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    TwitterWordCounter wordCountApp = new TwitterWordCounter();

    KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config);
    streams.start();

    // shutdown hook to correctly close the streams application
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

}

//this method is used for taking a tweet and transforming it to a representation of the words in it plus the date
public static List<JsonObject> tweetWordDateMapper(Tweet tweet) {
    try{

        List<String> words = Arrays.asList(tweet.tweetText.split("\\W+"));
        List<JsonObject> tweetsJson = new ArrayList<JsonObject>();
        for(String word: words) {
            JsonObject tweetJson = new JsonObject();
            tweetJson.add("date", new JsonPrimitive(tweet.formattedDate().toString()));
            tweetJson.add("word", new JsonPrimitive(word));
            tweetsJson.add(tweetJson);
        }

        return tweetsJson;
    }
    catch (Exception e) {
        System.out.println(e);
        System.out.println(tweet.serialize().toString());
        return new ArrayList<JsonObject>();
    }

}

public JsonObject MapValuesToIncludeColumnData(JsonObject key, Long countOfWord) {
    key.addProperty("count", countOfWord); //new JsonPrimitive(count));
    return key;
}

由于您要在groupBy()之前执行键更改操作,因此它将创建一个重新分区主题,并且该主题将依赖于默认键,值Serdes,已将其设置为String Serde.

Because you are performing a key changing operation before the groupBy(), it will create a repartition topic and for that topic, it will rely on the default key, value serdes, which you have set to String Serde.

您可以将 groupBy()调用修改为 groupBy(Grouped.with(StringSerde,JsonSerde),这应该会有所帮助.

You can modify the groupBy() call to groupBy(Grouped.with(StringSerde,JsonSerde) and this should help.