こんにちは、次のように着信データを処理する必要があるFlinkジョブを実行しようとしています。 keyBy()
の直後のプロセス演算子では、データのプロパティによっては時間がかかる場合があります。入ってくるデータが異なるID(ストリームに使用されています)を持っていても、処理機能の長い処理コードは他の入力データをブロックします。私はストリーム全体を意味します。 RuleProcessFunction.java
でFlinkで非ブロッキングストリーミング
SingleOutputStreamOperator<Envelope> processingStream = deviceStream
.map(e -> (Envelope) e)
.keyBy((KeySelector<Envelope, String>) value -> value.eventId) // key by scenarios
.process(new RuleProcessFunction());
:
...
@Override
public void processElement(Envelope value, Context ctx, Collector<Envelope> out) throws Exception {
//handleEvent(value, ctx, out);
if (value.getEventId().equals("I")) {
System.out.println("hello i");
for (long i = 0; i < 10000000000L; i++) {
}
}
out.collect(value);
}
私は長時間実行コードブロックは、ストリーム全体をブロックするべきではありません期待しています。私はIOの状況をブロックするためのAsyncFunctionがあることを知っていますが、私はそれがこのための正しい解決であるとは知らない。
ありがとう@Jicaar、私は、私のProcessFunctionのprocessElement(..)メソッドでListenableFutureまたはCompletableFutureを固定スレッドプールExecutionServiceで使用します。それは合理的でしょうか? – ccobanoglu
ListenableFutureはあなたの望むものを与えるはずです。正直なところ、CompletableFutureを使用していないので、2つの間の長所と短所は分かりません。そして、はい、それは合理的だと思います。 – Jicaar
州(ValueState、MapState)はスレッドセーフですか?私が非同期のアプローチを使用する場合、プロセスオペレータの状態にアクセスする必要があるため、競合状態の問題の場合には近い必要があります。 – ccobanoglu