Apache Flink Streaming APIに問題があります。カスタムデータシンク(Flink CEP)のフィールドを初期化する
私はCEP-Environment全体をカスタムDataSourceでセットアップできました。そして、 "print()"のようなソースで標準シンクを使用すると、すべてうまく動作します。
これが私のシンクは、今のように見えるものです:私は、ISTを達成しようと何
@RequiredArgsConstructor
public class EventDataConsumer extends RichSinkFunction<EventData>{
private final transient Consumer<EventData> consumer;
@Override
public void invoke(EventData eventData) throws Exception {
consumer.accept(eventData);
}
}
は私でDataStreamの各要素に対して実行されなければならないこのSinkFunctionにメソッド参照を渡します。
は、これは私がSinkFunctionを初期化する方法である:
EventDataConsumer consumer = new EventDataConsumer(someService::handleEventData);
outStream.addSink(consumer);
私の問題は、私は私のカスタムシンクの「起動」メソッドにブレークポイントを設定すると、消費者は、私が呼んでいてもNULLに表示されていること、ですコンシューマを明示的に指定するコンストラクタ。
本当に問題の理解に役立ちます。どのようにしてメソッドのリファレンスをシリアライズ可能にするかについてのアイディアはありますか? –
私はConsumer-Fieldを意味しました。私はConsumerとSerializable-Interfaceを拡張するSerializableConsumer-Interfaceを作成しようとしましたが、うまくいかないようです。 –
これはうまくいくはずです。 'transient'キーワードを削除してもよろしいですか?また、メソッドが実際にシリアライズ可能であることを確認してください。ラムダには通常、シリアライズ可能なクロージャがあります。あなたの例では 'someService'もまたシリアライズ可能でなければなりません。 –