2016-05-24 20 views
0

スパークストリーミングアプリケーションでは、行ブロックの後にlines.map()関数を実行する方法。 foreachRDD()は実行を完了します。私は私が何をしたいの最小限の例を含めています:sparkストリーミングアプリケーションでは、lines.foreachRDD()ブロックが実行を完了した後にlines.map()関数を実行する方法

public class Stackoverflow implements Serializable { 

    public static List<DummyClass> list = null; 

    public void init(String str) throws Exception { 
     if (list == null) { 
      synchronized (Stackoverflow.class) { 
       if (list == null) { 
        list = new ArrayList<>(); 
        for (int i = 0; i < 3; i++) { 
         list.add(new DummyClass()); 
        } 
       } 
      } 
     } 
    } 

    public JavaDStream<DataTuple> initFunction(JavaDStream<DataTuple> lines, final String str) throws Exception { 

     lines.foreachRDD(
       new VoidFunction<JavaRDD<DataTuple>>() { 
        @Override 
        public void call(JavaRDD<DataTuple> dataTupleJavaRDD) throws Exception { 
         init(str); 
        } 
       } 
     ); 

     lines.map(new FinalTransformation(list)); 
     return lines; 
    } 

} 

それは実行時にそのリストがnullでないので、私はその上のセクションの後に実行するコードのlines.map()の部分をしたいです。

+0

達成したいことは明確ではありません。 いくつか例を挙げて説明してください。 上記の例から 'list'オブジェクトを一度だけ初期化し、初期化するときに' list'を使って 'lines'にある種の変換を適用したいと思うようです。 –

答えて

1

foreachrddを実行する代わりに、入力を目的の文字列と別のマップに変更するマップを実行します。 Foreachrddはrddの変換のためのものではなく、それはマップのためのものです。

input.map(init).map(whatever) 
関連する問題