2017-08-05 16 views
0

何らかの高価なオブジェクトを使ってRDDをマップしたいとします。私は、このワーカー/スレッドごとにこのオブジェクトの1つを持っていて、各ワーカーのRDDパーティションの項目を処理する前に作成する必要があります。Apache Sparkでワーカー1人1人を作成する

私の解決策だった。この場合、

final Function0<ModelEvaluator> f =() -> { 

     if (ModelEvaluator.getInstance() == null) { 
      ModelEvaluator m = new ModelEvaluator(script); 
      ModelEvaluator.setInstance(m); 
     } 

     return ModelEvaluator.getInstance(); 
    }; 

    JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
      (t) -> { 
       try { 
        double val = f.call().evaluateModel(t); 
        return new Tuple2<>(val, t); 
       } catch (Exception ex) { 
        return null; 
       } 
      } 
    ); 



public class ModelEvaluator { 

    private static ModelEvaluator instance; 

    public static void setInstance(ModelEvaluator instance) { 
    ModelEvaluator.instance = instance; 
    } 

    public static ModelEvaluator getInstance() { 
     return instance; 
    } 
... 

、「ModelEvaluator」オブジェクトは、のために関連する応答メトリックを計算するために、モデルパラメータを設定するには、「サービス」のオブジェクトのリストを使用して、スクリプトを解析し、そのパラメータ設定。しかし、RDDの行が処理されるたびにスクリプトを解析する必要はありません。

クラスタごとにプロセスを作成するようにクラスタを構成しましたが、同じプロセス内の複数の作業者が同時に変更可能な状態のシングルトンインスタンスにアクセスすることは問題になります。

私の問題に対してより洗練されたソリューションがありますか?

答えて

1

これは、Broadcast変数で達成できます。これにより、ドライバーにオブジェクトを作成することができ、必要に応じてワーカーごとに1回送信されます。

final Broadcast<ModelEvaluator> model = jsc.broadcast(new ModelEvaluator(script)); 

JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
     (t) -> { 
      try { 
       double val = model.value().evaluateModel(t); 
       return new Tuple2<>(val, t); 
      } catch (Exception ex) { 
       return null; 
      } 
     } 
); 
+1

ありがとうございます、それは魅力的でした。私は "ModelEvaluator"クラスをSerializableにして、問題を避けるために一時的なフィールドをいくつか設定する必要がありました。そして、コンストラクタで初期化するのではなく、オブジェクトの遅延初期化を実行するためにいくつかのロジックを使用する必要がありました。 –

関連する問題