2017-08-24 5 views
0

Apache Flink Streaming APIに問題があります。カスタムデータシンク(Flink CEP)のフィールドを初期化する

私はCEP-Environment全体をカスタムDataSourceでセットアップできました。そして、 "print()"のようなソースで標準シンクを使用すると、すべてうまく動作します。

これが私のシンクは、今のように見えるものです:私は、ISTを達成しようと何

@RequiredArgsConstructor 
public class EventDataConsumer extends RichSinkFunction<EventData>{ 

private final transient Consumer<EventData> consumer; 

    @Override 
    public void invoke(EventData eventData) throws Exception { 
     consumer.accept(eventData); 
    } 
} 

は私でDataStreamの各要素に対して実行されなければならないこのSinkFunctionにメソッド参照を渡します。

は、これは私がSinkFunctionを初期化する方法である:

EventDataConsumer consumer = new EventDataConsumer(someService::handleEventData); 
outStream.addSink(consumer); 

私の問題は、私は私のカスタムシンクの「起動」メソッドにブレークポイントを設定すると、消費者は、私が呼んでいてもNULLに表示されていること、ですコンシューマを明示的に指定するコンストラクタ。

答えて

1

シンクがシンクの並列性と同じくらい多くのインスタンスに分散されるので、シンクはシリアライズ可能でなければなりません。クラスタ上で実行する場合、SinkはシリアライズされてTaskManagersに送信され、そこでデシリアライズされます。

例ではconsumerフィールドはtransientです。そのため、シリアル化後はnullになります。

+0

本当に問題の理解に役立ちます。どのようにしてメソッドのリファレンスをシリアライズ可能にするかについてのアイディアはありますか? –

+0

私はConsumer-Fieldを意味しました。私はConsumerとSerializable-Interfaceを拡張するSerializableConsumer-Interfaceを作成しようとしましたが、うまくいかないようです。 –

+0

これはうまくいくはずです。 'transient'キーワードを削除してもよろしいですか?また、メソッドが実際にシリアライズ可能であることを確認してください。ラムダには通常、シリアライズ可能なクロージャがあります。あなたの例では 'someService'もまたシリアライズ可能でなければなりません。 –

関連する問題