2017-12-06 7 views
2

こんにちは、次のように着信データを処理する必要があるFlinkジョブを実行しようとしています。 keyBy()の直後のプロセス演算子では、データのプロパティによっては時間がかかる場合があります。入ってくるデータが異なるID(ストリームに使用されています)を持っていても、処理機能の長い処理コードは他の入力データをブロックします。私はストリーム全体を意味します。 RuleProcessFunction.javaFlinkで非ブロッキングストリーミング

SingleOutputStreamOperator<Envelope> processingStream = deviceStream 
    .map(e -> (Envelope) e) 
    .keyBy((KeySelector<Envelope, String>) value -> value.eventId) // key by scenarios 
    .process(new RuleProcessFunction()); 

... 
@Override 
public void processElement(Envelope value, Context ctx, Collector<Envelope> out) throws Exception { 
    //handleEvent(value, ctx, out); 
    if (value.getEventId().equals("I")) { 
     System.out.println("hello i"); 
     for (long i = 0; i < 10000000000L; i++) { 

     } 
    } 
    out.collect(value); 
} 

私は長時間実行コードブロックは、ストリーム全体をブロックするべきではありません期待しています。私はIOの状況をブロックするためのAsyncFunctionがあることを知っていますが、私はそれがこのための正しい解決であるとは知らない。

答えて

0

Cassandraのような外部データベースからデータを取得しているわけではないので、私はAsyncFunctionを使う必要はないと思います。

あなたは、単一の並列処理でフリンクジョブを実行している可能性があります。並列処理を強化して、1つのコアがすべての処理とデータの受信を担当しないようにしてください。確かに、あなたがこれを行うなら、背圧が依然としてあるかもしれません。ソースからのデータの取り込みを担当するコアが、processFunctionを実行しているコアよりも速くデータを読み込んでいる場合、Flinkの背圧処理は取り込み速度を遅くします。

+0

ありがとう@Jicaar、私は、私のProcessFunctionのprocessElement(..)メソッドでListenableFutureまたはCompletableFutureを固定スレッドプールExecutionServiceで使用します。それは合理的でしょうか? – ccobanoglu

+0

ListenableFutureはあなたの望むものを与えるはずです。正直なところ、CompletableFutureを使用していないので、2つの間の長所と短所は分かりません。そして、はい、それは合理的だと思います。 – Jicaar

+0

州(ValueState、MapState)はスレッドセーフですか?私が非同期のアプローチを使用する場合、プロセスオペレータの状態にアクセスする必要があるため、競合状態の問題の場合には近い必要があります。 – ccobanoglu

関連する問題