2016-11-07 11 views
1

コンフィグレーションでストリーミング環境を作成し、RichMapFunctionopen()メソッドでこのコンフィグレーションにアクセスしようとしました。DataStreamプログラムでユーザー機能を設定する方法は?

例:open()メソッドをデバッグするとき

Configuration conf = new Configuration(); 
    conf.setBoolean("a", true); 
    StreamExecutionEnvironment env = 
     StreamExecutionEnvironment.createLocalEnvironment(8, conf); 

    DataStreamSource<Integer> source = env.fromElements(5,5,5,5,5); 
    source.map(new RichMapFunction<Integer, Integer>() { 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      boolean a = parameters.getBoolean("a", false); 
      super.open(parameters); 
     } 

     @Override 
     public Integer map(Integer value) throws Exception { 
      return value; 
     } 
    }).print(); 

    env.execute(); 

は、しかし、私は設定が空であることがわかります。

私は間違っていますか?ストリーミング環境のRichFunctionに設定を正しく渡すにはどうすればよいですか?

答えて

1

FlinkのDataStream APIとDataSet APIは、同じ例ではRichMapFucntionのようなユーザー関数インターフェイスを共有しています。

openメソッドのFlinkのRichFunctionのパラメータは、DataSet APIの最初のバージョンのレガシーであり、DataStream APIでは使用されていません。 Flinkは、map()コールで指定したオブジェクトをシリアル化し、それをパラレルワーカーに送ります。したがって、オブジェクト内のパラメータを通常のフィールドとして直接設定することができます。

+0

この情報でドキュメントを更新することを検討してください。最新のドキュメンテーションはすべて、 'withParameters'メソッドが利用可能であることを示していますが、見つかる場所はありません。 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html –

+0

はい、良い点です。これは、次のバージョン(1.4)https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/best_practices.htmlのために既に更新されています。修正が1.3ブランチにバックポートされなかった理由は不明です。 –

関連する問題