8

ReactiveStreamsの近くにある自家製ログ処理ライブラリをio.projectreactorに置き換えることを検討しています。目的は、私たちが維持しているコードを減らし、コミュニティーが追加した新しい機能を利用することです(オペレーター融合を目指す)。リアクティブ・ストリーム - タイムアウト付きバッチ処理

まず、stdioを消費して、複数行のログエントリをパイプラインを流れるテキストのBLOBにマージする必要があります。ユースケースについては、Filebeatのドキュメントのmultiline log entriesの章で詳細に説明しています

これまでのところ、私が持っているコードは次のとおりです。

BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); 
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); })); 
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner()); 
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper()); 
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length())) 
      .subscribe();   

新しいログ・ヘッダーが検出されたときにこれは、複数行のマージについての世話をするが、既存のライブラリで、我々はまた、タイムアウト後に蓄積されたラインをフラッシュ(つまり、テキストが5秒以内に受信されない場合は、レコードをフラッシュします)。

これをReactorでモデル化する正しい方法は何でしょうか?私自身の演算子を書く必要がありますか、既存のものをカスタマイズすることはできますか?

Project ReactorまたはRxJavaでこのユースケースを達成するための関連するサンプルとドキュメントへのポインタは、非常に高く評価されます。

+1

buffer(long timespan、TimeUnit unit)演算子(rxjava)がありますか? – zella

+0

バッファは本当に近くに見えますが、過負荷のどれも私が必要とするものと一致しません - どちらか先に起こるどちらかの "bufferClosingSelector"と "timespan"閉じる戦略の組み合わせが必要です。 – ddimitrov

答えて

3

それは、各バッファの開始と終了を識別するため、次のRxJava 2コードはヒントとして意図される方法に依存

TestScheduler scheduler = new TestScheduler(); 
PublishProcessor<String> pp = PublishProcessor.create(); 

Function<Flowable<String>, Flowable<List<String>>> f = o -> 
     o.buffer(o.filter(v -> v.contains("Start")), 
       v -> Flowable.merge(o.filter(w -> w.contains("End")), 
            Flowable.timer(5, TimeUnit.MINUTES, scheduler))); 

pp.publish(f) 
.subscribe(System.out::println); 

pp.onNext("Start"); 
pp.onNext("A"); 
pp.onNext("B"); 
pp.onNext("End"); 

pp.onNext("Start"); 
pp.onNext("C"); 

scheduler.advanceTimeBy(5, TimeUnit.MINUTES); 

pp.onNext("Start"); 
pp.onNext("D"); 
pp.onNext("End"); 
pp.onComplete(); 

プリント:

[Start, A, B, End] 
[Start, C] 
[Start, D, End] 
バッファのゲートを開閉する主な情報源の値を使用する方法について

publishでソースを共有することで、複数のソースコピーを一度に実行することなく、上流から同じ値を再利用することができます。開始は、行の "Start"文字列の検出によって制御されます。終了は、 "End"ストリングの検出または猶予期間の後のタイマーの起動のいずれかによって制御されます。

編集:

「スタート」また、次のバッチのための指標であるならば、それは含まれていますから、あなたは、「スタート」をチェックして、バッファの内容を変更し、「終了」を置き換えることができますそれ以外の場合は前のバッファの新しいヘッダ:

pp.publish(f) 
.doOnNext(v -> { 
    int s = v.size(); 
    if (s > 1 && v.get(s - 1).contains("Start")) { 
     v.remove(s - 1); 
    } 
}) 
.subscribe(System.out::println); 
+0

ENDがない場合はどうでしょうが、次のSTARTが見えるときにバッファが閉じているか、またはtumeoutの有効期限が切れていますか?私は私のコミュニケーションに疑念を持ち始めています。質問には明確なものはありませんか? – ddimitrov

1

bufferオペレータが私にとって最も適していると思われます。

サイズと時間ベースの戦略があります。 ログがあるので、バッファのサイズとして行数を解釈できると思います。ここ

例 - 4または5秒のタイムスパンでグループ化されたアイテムを発する方法:

Observable<String> lineReader = Observable.<String>create(subscriber -> { 
     try { 
      BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); 
      for (String line = br.readLine(); line != null; line = br.readLine()) { 
       subscriber.onNext(line); 
      } 
     } catch (IOException e) { 
      throw new UncheckedIOException(e); 
     } 
    }).subscribeOn(Schedulers.newThread()); 

    lineReader 
     .buffer(5, TimeUnit.SECONDS,4) 
     .filter(lines -> !lines.isEmpty()) 
     .subscribe(System.out::println); 
+0

私はtumeoutでログヘッダでグループ化する必要があります。私。私は2行のメッセージを記録し、その後に1行のメッセージが続き、スタックトレースが続き、別の1行が混乱してから、tumeout期間は何もなかった。stacktraceまで3つのメッセージを即座に取得し、tumeoutの後には4つ目のメッセージを取得することが期待されます。 – ddimitrov