0
の集約データにSessionWindowsを使用しています。私はKafka(0.11)の集計関数でSessionWindowsを使用しようとしていますが、なぜエラーが出るのか理解できません。ここでKafkaStreams(0.11)
は私のコード・スニペットです:
// defining some values:
public static final Integer SESSION_TIMEOUT_MS = 6000000;
public static final String INTOPIC = "input";
public static final String HOST = "host";
// setting up serdes:
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
// some more code to build up the streams
KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> dataStream = builder.stream(Serdes.String(), jsonSerde, INTOPIC);
// constructing the initalMessage ObjectNode:
ObjectNode initialMessage = JsonNodeFactory.instance.objectNode();
initialMessage.put("count", 0);
initialMessage.put("endTime", "");
// transforming data to KGroupedStream<String,JsonNode>
KGroupedStream<String, JsonNode> data = dataStream.map((key, value) ->{return new KeyValue<>(value.get(HOST).asText(), value); }).groupByKey(Serdes.String(), jsonSerde);
// finally aggregate the data usind SessionWindows
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
() -> initialMessage,
(key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage),
SessionWindows.with(SESSION_TIMEOUT_MS),
jsonSerde,
"aggregated-data");
private static JsonNode countData(JsonNode incomingMessage, JsonNode initialMessage){
// some dataprocessing
}
私は
KTable<String, JsonNode>
に
KTable<Windowed<String>,JsonNode>
を変更し、
SessionWindows.with(SESSION_TIMEOUT_MS)
を削除する場合集計関数からの
はすべて正常です。
私がいない場合は、日食が
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate([...])
タイプKGroupedStreamのメソッドの集計(初期化子、アグリゲータ、Windowsの、Serde、文字列)((引数には適用されませんラインのために私に指示) - > {}、(キー、incomingMessage、initialMessage) - > {}、SessionWindows、Serde、String)を
とライン
() -> initialMessage
用
型の不一致:VR
にObjectNodeから変換できない
と:タイプDataWindowedにおける方法countData(JsonNode、JsonNode)は引数に適用されない
(key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage),
( JsonNode、VR)
実際には、タイプが紛失することはありません! ヒントは素晴らしいでしょう!
Thxを:D
はそれだけで誤植 '(key.valueです) ' - >'(key、value) '(ドットではなくカンマ)? –
はい、申し訳ありません。ちょうどこのポストにそれを修正しました。しかし、それは解決策ではありませんでした。 他のアイデア、この問題を解決する方法はありますか? – sunjazz
コードの主演者ではありません。たぶん私たちの例のレポが助けてくれるかもしれません:https://github.com/confluentinc/kafka-streams-examplesラムダを使ったいくつかの例があります。 –