ReactiveStreamsの近くにある自家製ログ処理ライブラリをio.projectreactor
に置き換えることを検討しています。目的は、私たちが維持しているコードを減らし、コミュニティーが追加した新しい機能を利用することです(オペレーター融合を目指す)。リアクティブ・ストリーム - タイムアウト付きバッチ処理
まず、stdioを消費して、複数行のログエントリをパイプラインを流れるテキストのBLOBにマージする必要があります。ユースケースについては、Filebeatのドキュメントのmultiline log entriesの章で詳細に説明しています
これまでのところ、私が持っているコードは次のとおりです。
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
.subscribe();
新しいログ・ヘッダーが検出されたときにこれは、複数行のマージについての世話をするが、既存のライブラリで、我々はまた、タイムアウト後に蓄積されたラインをフラッシュ(つまり、テキストが5秒以内に受信されない場合は、レコードをフラッシュします)。
これをReactorでモデル化する正しい方法は何でしょうか?私自身の演算子を書く必要がありますか、既存のものをカスタマイズすることはできますか?
Project ReactorまたはRxJavaでこのユースケースを達成するための関連するサンプルとドキュメントへのポインタは、非常に高く評価されます。
buffer(long timespan、TimeUnit unit)演算子(rxjava)がありますか? – zella
バッファは本当に近くに見えますが、過負荷のどれも私が必要とするものと一致しません - どちらか先に起こるどちらかの "bufferClosingSelector"と "timespan"閉じる戦略の組み合わせが必要です。 – ddimitrov