5

私はスパークで2つのデータセットを結ぶ少し問題を抱えている、私はこれを持っている:2つのsparkデータセットをjavaオブジェクトで結合するにはどうすればいいですか?

SparkConf conf = new SparkConf() 
    .setAppName("MyFunnyApp") 
    .setMaster("local[*]"); 

SparkSession spark = SparkSession 
    .builder() 
    .config(conf) 
    .config("spark.debug.maxToStringFields", 150) 
    .getOrCreate(); 
//... 
//Do stuff 
//... 
Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class); 
Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class); 

Dataset<MyOwnObject1> object1DS = spark.read() 
    .option("header","true") 
    .option("delimiter",";") 
    .option("inferSchema","true") 
    .csv(pathToFile1) 
    .as(encoderObject1); 

Dataset<MyOwnObject2> object2DS = spark.read() 
    .option("header","true") 
    .option("delimiter",";") 
    .option("inferSchema","true") 
    .csv(pathToFile2) 
    .as(encoderObject2); 

私は、スキーマを印刷し、それを正しく表示することができます。

//Here start the problem 
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = 
    object1DS.join(object2DS, object1DS.col("column01") 
    .equalTo(object2DS.col("column01"))) 
    .as(Encoders.tuple(MyOwnObject1,MyOwnObject2)); 

最後の行は、このエラーに参加して私を得ることはできません。

Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed as the number of fields does not line up.; 

Tuple2(object2は)は、すべてのVARSを持っていないので、本当のこと...

次にIこれを試してみました:

Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = object1DS 
    .joinWith(object2DS, object1DS 
     .col("column01") 
     .equalTo(object2DS.col("column01"))); 

うまくいきます!しかし、私のタプルことなく、新しいデータセットを必要とする、私はオブジェクト1とオブジェクト2からいくつかのVARSを持つオブジェクト3を、持っているが、その後、私はこの問題を持っている:!

Encoder<MyOwnObject3> encoderObject3 = Encoders.bean(MyOwnObject3.class); 
Dataset<MyOwnObject3> object3DS = joinObjectDS.map(tupleObject1Object2 -> { 
    MyOwnObject1 myOwnObject1 = tupleObject1Object2._1(); 
    MyOwnObject2 myOwnObject2 = tupleObject1Object2._2(); 
    MyOwnObject3 myOwnObject3 = new MyOwnObject3(); //Sets all vars with start values 
    //... 
    //Sets data from object 1 and 2 to 3. 
    //... 
    return myOwnObject3; 
}, encoderObject3); 

は失敗...ここにエラーがある:

17/05/10 12:17:43 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 593, Column 72: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import 

と何千ものエラー行...

どうすればよいですか?私が試した:

  • は(これ以上)のみのString、int型(または整数)、ダブル(またはダブル)でkryoまたはjavaSerialization
  • 使用JavaRDD(作品のよう
  • 使用貴様エンコーダを私のオブジェクトを作成していません!しかし、非常にゆっくり)とデータフレームを使用して行(作品が、私は多くのオブジェクトを変更する必要があります)
  • 私のすべてのJavaオブジェクトはシリアライズ可能です
  • 私は2.1.1と2.1.1、 pom.xml

JavaRDDのデータフレームとオブジェクトsintaxの速度を使用するには、データセットを使用します。

ヘルプ?

おかげ

答えて

-1

は、最後に私は私のコードは、データセットを作成していた時、私はオプションinferSchemaに問題があった、

を解決策を見つけました。私はすべての値が "数値"であるため、オプションのinferSchemaがIntegerカラムを返すStringカラムを持っていますが、文字列( "0001"、 "0002"など)として使用する必要があります。私は多くのヴァールを持っています。それから私はすべての私のクラスでこれを書いています。

List<StructField> fieldsObject1 = new ArrayList<>(); 
for (Field field : MyOwnObject1.class.getDeclaredFields()) { 
    fieldsObject1.add(DataTypes.createStructField(
     field.getName(), 
     CatalystSqlParser.parseDataType(field.getType().getSimpleName()), 
     true) 
    ); 
} 
StructType schemaObject1 = DataTypes.createStructType(fieldsObject1); 

Dataset<MyOwnObject1> object1DS = spark.read() 
    .option("header","true") 
    .option("delimiter",";") 
    .schema(schemaObject1) 
    .csv(pathToFile1) 
    .as(encoderObject1); 

うまく動作します。

「最善の」ソリューションは、このようになります:

Dataset<MyOwnObject1> object1DS = spark.read() 
    .option("header","true") 
    .option("delimiter",";") 
    .schema(encoderObject1.schema()) 
    .csv(pathToFile1) 
    .as(encoderObject1); 

しかしencoderObject1.schema()が私にアルファベット順ではなく、元の順序でVARSとスキーマを返し、私が読んだとき、このオプションは失敗しますcsv。おそらく、エンコーダは、アルファベット順ではなく、元の順序でvarsを持つスキーマを返す必要があります

関連する問題