2016-10-04 5 views
3

私は、フリンクストリーミングでルールエンジンを作成する予定です。flinkクラスタにたくさんのジョブを作成するのがよい方法ですか?

実行に関するいくつかの要件があります

  • がルールセットに対して実行されるすべてのイベントは、カフカから読み取る必要があります。
  • すべてのルールは、限られた時間内に実行する必要があります。

問題は、実行時にルールを追加できるため、ルールを実行できる最大時間を超える可能性があるため、すべての受信メッセージを処理するためのジョブをたくさん作成できません。

私は期限内に1つのルールを実行できるという保証があります。

ルールごとに1つのジョブを作成し、新しいルールが来たときにジョブを追加するのがよいのでしょうか? (これは数百のルールでもよい)。

私は、これが問題を処理する方法ではなく、理由を説明するのに本当に合理的なものではないという直感を持っています。

第2の方法は、ルールがどのイベントに対して実行されたかについてのトレースを保持するために、(例えば、飼い猫のような)待ち行列を維持することである。だから、各ジョブの作業のみに構成されています

  • は、すべてのルールがイベント
+0

次のようになります。 – goodie

+0

@goodie CEPエンジンを正しく理解していれば、自分の仕事に新しいルールを動的に追加することはできません。 ジョブが実行されたときに実行グラフが作成され、その後に変更することはできないため、 – cju

+0

私が知っているすべてのCEPエンジンでは、実行時にルールを追加/管理できます。私はそれがCEPエンジンであることの全体的なポイントだと思う。 Esperでは、 "epAdministrator.createEPL(...)"でそれを行います。 – goodie

答えて

0
に対して実行されるまで、再びそれを行うイベント
  • に対してそれを実行
  • キュー内のルールを選択

    プログラムロジックを動的に変更する場合は、co-flatmap演算子を使用できます。 co-flatmap演算子には2つの入力があり、1つは通常のイベント入力で、もう1つはルール入力です。内部的にルールを保存し、それを他の入力からの着信イベントに適用します。あなたはまた、CEPエンジンを使用してFLINKにそれを統合することができ

    DataStream<Input> input = ... 
    DataStream<Rule> rules = ... 
    
    input 
        .connect(rules) 
        .keyBy(keySelector1, keySelector2) 
        .flatMap(new MyCoFlatMap()); 
    
    public static class MyCoFlatMap implements CoFlatMapFunction<Input, Rule, Output> { 
    
        @Override 
        public void flatMap1(Input input, Collector<Output> collector) throws Exception { 
         // process input 
        } 
    
        @Override 
        public void flatMap2(Rule rule, Collector<Output> collector) throws Exception { 
         // store rules 
        } 
    } 
    
  • +0

    あなたの答えをありがとう、私はできるだけ早くそれを試みる – cju

    関連する問題