2016-11-08 12 views
0

タスクがSparkでシリアル化できないというエラーが表示されます。私は検索し、いくつかの投稿で示唆されているように静的関数を使用しようとしましたが、それでも同じエラーが発生します。タスクがシリアル化可能ではない - Spark Java

コードは以下の通りです:

public class Rating implements Serializable { 
    private SparkSession spark; 
    private SparkConf sparkConf; 
    private JavaSparkContext jsc; 
    private static Function<String, Rating> mapFunc; 

    public Rating() { 
     mapFunc = new Function<String, Rating>() { 
      public Rating call(String str) { 
       return Rating.parseRating(str); 
      } 
     }; 
    } 

    public void runProcedure() { 
     sparkConf = new SparkConf().setAppName("Filter Example").setMaster("local"); 
     jsc = new JavaSparkContext(sparkConf); 
     SparkSession spark = SparkSession.builder().master("local").appName("Word Count") 
      .config("spark.some.config.option", "some-value").getOrCreate();   

     JavaRDD<Rating> ratingsRDD = spark.read().textFile("sample_movielens_ratings.txt") 
       .javaRDD() 
       .map(mapFunc); 
    } 

    public static void main(String[] args) { 
     Rating newRating = new Rating(); 
     newRating.runProcedure(); 
    } 
} 

エラーが得られます。 enter image description here

がどのように私はこのエラーを解決するのですか? ありがとうございます。

答えて

7

明らかに、Ratingは、属性としてSpark構造(つまり、SparkSession,など)への参照を含んでいるため、Serializableにはなりません。

問題ここでは、あなたがmapFuncの定義を見れば、あなたはRatingオブジェクトを返します

JavaRDD<Rating> ratingsRD = spark.read().textFile("sample_movielens_ratings.txt") 
      .javaRDD() 
      .map(mapFunc); 

です。

mapFunc = new Function<String, Rating>() { 
    public Rating call(String str) { 
     return Rating.parseRating(str); 
    } 
}; 

この関数は、(スパークに関して変換map内で使用されています。 の変換は、ドライバノードではなくワーカーノードに直接実行されるため、コードがシリアライズ可能である必要があります。これにより、SparkはRatingクラスをシリアル化しようとしますが、これは不可能です。

Ratingから必要な機能を抽出し、Spark構造を持たない別のクラスに配置してください。最後に、この新しいクラスをmapFunc関数の戻り値の型として使用します。

+0

評価と手順を2つのクラスに分けました。ありがとう:) – Fleur

関連する問題