特定のキーを使用してパーティションを分割し、各状態を使用して複数の変換を実行するストリームがあります。私がkeyBy()
を呼び出すと、KeyedStream
となり、次の変換は正しくパーティション化された状態にアクセスできますが、それ以降は別の変換がチェーン化されて、パーティション化された状態にアクセスしようとすると例外が発生します。例外は次のとおりです。複数の変換にわたってキー状態を維持する
状態キーシリアライザが設定で構成されていません。この操作では、パーティション化された状態を使用できません。
キー情報は最初の変換に渡され、チェーンをさらに下回らないようです。
私が実行しようとするコードは、このコードの行に沿っている(しかし、実際に何かをする):
DataStream<Long> newStream = eventsStream
.keyBy("username")
.filter(new RichFilterFunction<Event>() {
private ValueState<Boolean> stateStore;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE1", Boolean.class, Boolean.TRUE));
}
@Override
public boolean filter(Event value) throws Exception {
return stateStore.value();
}
})
.map(new RichMapFunction<Event, Long>() {
private ValueState<Long> stateStore;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE2", Long.class, 0L));
}
@Override
public Long map(Event value) throws Exception {
return Long.parseLong(value.data) + stateStore.value();
}
});
このコードでは、第二getState()
の呼び出しで例外がスローされます。
もう一度keyBy()
と呼ぶことができますが、操作を連鎖させる機能を削除します。ストリームグラフのオブジェクトを手動で操作して、キー情報が渡されるようにすることはできますか?または、このような連鎖はサポートされていませんか?
ありがとうございますが、それは例でした。操作はreduce()とmap()でも行うことができます。理論的には、それらをflatMap()に組み合わせることもできますが、個々の操作としてそれらを保持する方が理にかなっています。 –
私の答えを延長しました。 –