2016-08-16 16 views
1

特定のキーを使用してパーティションを分割し、各状態を使用して複数の変換を実行するストリームがあります。私がkeyBy()を呼び出すと、KeyedStreamとなり、次の変換は正しくパーティション化された状態にアクセスできますが、それ以降は別の変換がチェーン化されて、パーティション化された状態にアクセスしようとすると例外が発生します。例外は次のとおりです。複数の変換にわたってキー状態を維持する

状態キーシリアライザが設定で構成されていません。この操作では、パーティション化された状態を使用できません。

キー情報は最初の変換に渡され、チェーンをさらに下回らないようです。

私が実行しようとするコードは、このコードの行に沿っている(しかし、実際に何かをする):

DataStream<Long> newStream = eventsStream 
    .keyBy("username") 
    .filter(new RichFilterFunction<Event>() { 
     private ValueState<Boolean> stateStore; 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      super.open(parameters); 
      stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE1", Boolean.class, Boolean.TRUE)); 
     } 

     @Override 
     public boolean filter(Event value) throws Exception { 
      return stateStore.value(); 
     } 
    }) 
    .map(new RichMapFunction<Event, Long>() { 
     private ValueState<Long> stateStore; 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      super.open(parameters); 
      stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE2", Long.class, 0L)); 
     } 

     @Override 
     public Long map(Event value) throws Exception { 
      return Long.parseLong(value.data) + stateStore.value(); 
     } 
    }); 

このコードでは、第二getState()の呼び出しで例外がスローされます。

もう一度keyBy()と呼ぶことができますが、操作を連鎖させる機能を削除します。ストリームグラフのオブジェクトを手動で操作して、キー情報が渡されるようにすることはできますか?または、このような連鎖はサポートされていませんか?

答えて

2

できません。

keyBy()をもう一度呼び出す(または、何らかの形で "キーード"情報を下流に渡す)場合でも、状態は1人のオペレータにのみ関連付けられているため、新しい状態になります。

回避策として、両方の演算子を1つにマージする必要があります。

この機能が役立つと思われる場合は、[email protected]にお気軽にお問い合わせください。

+0

ありがとうございますが、それは例でした。操作はreduce()とmap()でも行うことができます。理論的には、それらをflatMap()に組み合わせることもできますが、個々の操作としてそれらを保持する方が理にかなっています。 –

+0

私の答えを延長しました。 –

関連する問題