kafka-streamsアプリケーションを開発するのは初めてです。私のストリームプロセッサは、入力jsonメッセージのユーザキーの値に基づいてjsonメッセージをソートすることを意図しています。kafka-streamsを使用してjson入力ストリームを条件付きでソートする
Message 1: {"UserID": "1", "Score":"123", "meta":"qwert"}
Message 2: {"UserID": "5", "Score":"780", "meta":"mnbvs"}
Message 3: {"UserID": "2", "Score":"0", "meta":"fghjk"}
私はダイナミックな解決策がないことをここにDynamically connecting a Kafka input stream to multiple output streams読みました。
私の場合、私は入力ストリームをソートするために必要なユーザーキーと出力トピックを知っています。そこで私は、各プロセッサアプリケーションが異なるUserIDと一致する各ユーザーに固有のプロセッサアプリケーションを個別に作成しています。
すべての異なるストリームプロセッサアプリケーションは、kafkaの同じjson入力トピックから読み込みますが、プリセットされたユーザー条件が満たされている場合は、それぞれのユーザーが出力トピックにメッセージを書き込みます。
public class SwitchStream extends AbstractProcessor<String, String> {
@Override
public void process(String key, String value) {
HashMap<String, String> message = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
message = mapper.readValue(value, HashMap.class);
} catch (IOException e){}
// User condition UserID = 1
if(message.get("UserID").equals("1")) {
context().forward(key, value);
context().commit();
}
}
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sort-stream-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("Source", "INPUT_TOPIC");
builder.addProcessor("Process", SwitchStream::new, "Source");
builder.addSink("Sink", "OUTPUT_TOPIC", "Process");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}
質問1: は、それがあれば代わりに低レベルプロセッサのAPIを高レベルストリームDSLを使用して簡単に同じ機能を実現することは可能ですか? (高水準ストリームDSLの他のオンライン例を理解するのは難しいと思っています)
質問2: 入力されたjsonトピックは、20K-25K EPSの高レートで入力されています。私のプロセッサアプリケーションは、この入力ストリームに追いつくことができないようです。私は各プロセスの複数のインスタンスを展開しようとしましたが、結果はどこにでもありたいです。理想的には、各プロセッサインスタンスは3-5K EPSを処理できる必要があります。
ハイレベルストリームDSLを使用してプロセッサロジックを改善したり、同じプロセッサロジックを書き込む方法はありますか?それは違いをもたらすだろうか?
ありがとうマティアス!私が低EPSで抱えていた問題は、Kafka +環境の古いバージョンに関連していました。 – iron3rd