2017-01-04 12 views
0

これは私のコードです。 、ストリームを分割しています上記のコードでは30 ...SplitStream for dynamic output key(選択)

:80、圧力:70、湿度:80、一時

SplitStream<MonitoringEvent> splitStream = inputStream.split(new OutputSelector<MonitoringEvent>() { 

    @Override 
    public Iterable<String> select(MonitoringEvent me) { 

     List<String> ml = new ArrayList<String>();    
     ml.add(me.getEventType());        
     return ml; 
} 

私はランダムな順序で 一時来るイベントの監視の流れを持っていますイベントタイプは、すなわち、temperatureStream、pressureStreamです。

問題は、私はのeventTypeを知っていれば、私は

splitStream.select('temperatureStream') 

ようsplitStreamからそれを選択することができますが、eventTypeをは動的であり、事前に定義されていない、です。

この動的ストリームにCEPを適用する方法を教えてください。私が間違っている場合

temperate is > 90 for past 10 minutes ... 

pressure is > 90 for past 10 minutes ... 
+0

あなたのイベントタイプは有限で小さいので、圧力、湿度など)、複数のストリームを持つことができ、それらの個々のストリームに特定の処理を入力できます。 eventTypesが大きく増加した場合は、管理するのが難しくなります。 – Aurvoir

+0

、またはソース/プロデューサでのイベントを先に分割するか、メッセージングのようなキーベースのルーティングを使用してください。 – Aurvoir

答えて

0

が私を修正した場合CEPは、のようなものだろうが、私はそれを選択によるのFLINKのparallism上の動的な検索を行うことが可能ではないと思います。あなたのプログラムは、タスク管理者のためのパラレル命令に変換され、ジョブマネージャーはこれらのアクションを調整します。あなたの抽象構文木についての全体的な知識がなければ、すべてのメッセージが共通であることが分かります...

+0

これをどのように達成できますか?複数のイベントタイプを持つ単一のストリーム。それに応じてストリームを分割してCEPを適用する必要がありました – madhairsilence

+0

解決策がまだ見つかりませんでした。ソース上でDeserializationSchemaを使用して、着信メッセージを指定してクラスに解析する必要があります。ここでいくつかのspeudoコードは:class mySchema extends AbstractDeserializationSchema {public LogEvent deserialize(byte [] message)throws IOException {return(Temperature)message;}}この受信メッセージはすべてTemperature Classとして型指定されています。 .select()。型(Temperature.class).dosomething with CEP-Rulesを使用して – bl4ckbird