2017-10-10 9 views
0

の集約データにSessionWindowsを使用しています。私はKafka(0.11)の集計関数でSessionWindowsを使用しようとしていますが、なぜエラーが出るのか理解できません。ここでKafkaStreams(0.11)

は私のコード・スニペットです:

// defining some values: 
public static final Integer SESSION_TIMEOUT_MS = 6000000; 
public static final String INTOPIC = "input"; 
public static final String HOST = "host"; 

// setting up serdes: 
final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); 
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); 
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); 

// some more code to build up the streams 
KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, JsonNode> dataStream = builder.stream(Serdes.String(), jsonSerde, INTOPIC); 

// constructing the initalMessage ObjectNode: 
ObjectNode initialMessage = JsonNodeFactory.instance.objectNode(); 
initialMessage.put("count", 0); 
initialMessage.put("endTime", ""); 

// transforming data to KGroupedStream<String,JsonNode> 
KGroupedStream<String, JsonNode> data = dataStream.map((key, value) ->{return new KeyValue<>(value.get(HOST).asText(), value); }).groupByKey(Serdes.String(), jsonSerde); 

// finally aggregate the data usind SessionWindows 
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
      () -> initialMessage, 

      (key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage), 

      SessionWindows.with(SESSION_TIMEOUT_MS), 

      jsonSerde, 

      "aggregated-data"); 

private static JsonNode countData(JsonNode incomingMessage, JsonNode initialMessage){ 
// some dataprocessing 
} 

私は

KTable<String, JsonNode> 

KTable<Windowed<String>,JsonNode> 

を変更し、

SessionWindows.with(SESSION_TIMEOUT_MS) 
を削除する場合集計関数からの

はすべて正常です。

私がいない場合は、日食が

KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate([...]) 

タイプKGroupedStreamのメソッドの集計(初期化子、アグリゲータ、Windowsの、Serde、文字列)((引数には適用されませんラインのために私に指示) - > {}、(キー、incomingMessage、initialMessage) - > {}、SessionWindows、Serde、String)を

とライン

() -> initialMessage 

型の不一致:VR

にObjectNodeから変換できない

と:タイプDataWindowedにおける方法countData(JsonNode、JsonNode)は引数に適用されない

(key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage), 

( JsonNode、VR)

実際には、タイプが紛失することはありません! ヒントは素晴らしいでしょう!

Thxを:D

+0

はそれだけで誤植 '(key.valueです) ' - >'(key、value) '(ドットではなくカンマ)? –

+0

はい、申し訳ありません。ちょうどこのポストにそれを修正しました。しかし、それは解決策ではありませんでした。 他のアイデア、この問題を解決する方法はありますか? – sunjazz

+0

コードの主演者ではありません。たぶん私たちの例のレポが助けてくれるかもしれません:https://github.com/confluentinc/kafka-streams-examplesラムダを使ったいくつかの例があります。 –

答えて

1

私は本当に合併実現するために必要な:

Merger<? super String, JsonNode>tmpMerger = new MergerClass<String, JsonNode>(); 

をし、集約関数に追加します。

KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
     () -> initialMessage, 

     (key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage), 

     tmpMerger, 

     SessionWindows.with(SESSION_TIMEOUT_MS), 

     jsonSerde, 

     "aggregated-data"); 
関連する問題