2017-02-02 5 views
1

私はSparkを使用して郵便番号プレフィックスを抽出しようとしていますが、Sparkで生成されたコードはjava.lang.Doubleをorg.apache.sparkで初期化しようとしているためコンパイルできません。 unsafe.types.UTF8Stringを引数として指定します。これがSparkの問題であるかどうか、私がどのように使用しているかは、私にはっきりと分かりません。私はJava 1.8とspark-mllib_2.10をローカルモードで使用しています。失敗コード:org.codehaus.commons.compiler.CompileException:によって引き起こさdataset.groupByKey()内のSpark CompileException

public static void read(Dataset<ZipCodeLatLon> zipCodes) { 
    zipCodes.groupByKey(new MapFunction<ZipCodeLatLon, String>() { 
     @Override 
     public String call(ZipCodeLatLon value) throws Exception { 
      return value.getZip().substring(0, 3); 
     } 
    }, Encoders.STRING()).keys().show(); 
} 

結果ファイル 'generated.java'、ライン50、コラム58:見つかりません該当するコンストラクタ/メソッド実際のパラメータは "org.apache.spark.unsafe.types.UTF8String"; "java.lang.Double(double)"、 "java.lang.Double(java.lang.String)"

生成されたコードはかなり長いので、ここでは全体を入れません。しかし、それは失敗の原因となっている重要な部分である:

private UTF8String argValue; 
final alex.floyd.lc.geocode.ZipCodeLatLon value1 = false ? null : new alex.floyd.lc.geocode.ZipCodeLatLon(); 
... 
public java.lang.Object apply(java.lang.Object _i) { 
... 
    resultIsNull = false; 
    if (!resultIsNull) { 
     boolean isNull3 = i.isNullAt(1); 
     UTF8String value3 = isNull3 ? null : (i.getUTF8String(1)); 
     resultIsNull = isNull3; 
     argValue = value3; 
    } 

    final java.lang.Double value2 = resultIsNull ? null : new java.lang.Double(argValue); 
    javaBean.setLat(value2); 
... 
} 

エラーがgroupByKey機能(Iは整数であり、代わりに文字列のJava Beanを試してみました)の戻り値の型とは独立しているようです。しかし、入力データセットタイプをZipCodeLatLonの代わりにStringのように変更すると、このコードが機能します。しかし、私がZipCodeLatLonに必要なすべてのJava Bean規約に従っているようだから、変更するために何をする必要があるのか​​分かりません。私はSparkを使ってCSVからZipCodeLatLonを読み込んだので、SparkはgroupByKeyメソッドのコンテキストではなくクラスを処理できます。

public class ZipCodeLatLon implements Serializable{ 
private String zip; 
private Double lat; 
private Double lng; 
public String getZip() { 
    return zip; 
} 
public void setZip(String zip) { 
    this.zip = zip; 
} 
public Double getLat() { 
    return lat; 
} 
public void setLat(Double lat) { 
    this.lat = lat; 
} 
public Double getLng() { 
    return lng; 
} 
public void setLng(Double lng) { 
    this.lng = lng; 
} 
} 

いくつかの追加情報:これは、ZipCodeLatLonがCSVから読み込まれる方法に関連しているようです。手動でデータセットを作成すると、コードは正常に動作します。

完全に罰金:完全に壊れ

ZipCodeLatLon l = new ZipCodeLatLon(); 
l.setZip("12345"); 
l.setLat(0.0); 
l.setLng(0.0); 
read(spark.createDataset(Lists.newArrayList(l, l), Encoders.bean(ZipCodeLatLon.class))); 

Dataset<ZipCodeLatLon> dataset = spark.read() 
    .option("header", true) 
    .csv(zipCodeData.getAbsolutePath()) 
    .as(Encoders.bean(ZipCodeLatLon.class)); 
dataset.show(); // works - reading in the CSV succeeds 
read(dataset); // fails on groupByKey 

答えて

1

はそれを考え出しました。 csvリーダー用のスキーマを作成する必要があります。私は、エンコーダがスキーマを提供すると仮定しましたが、そうではありません。エラーメッセージがもっと役に立つと願っています!前

public static Dataset<ZipCodeLatLon> read(SparkSession spark) { 
    return spark.read() 
      .option("header", true) 
      .csv(ZIP_CODE_DATA.getAbsolutePath()) 
      .as(Encoders.bean(ZipCodeLatLon.class)); 
} 

後:

public static Dataset<ZipCodeLatLon> read(SparkSession spark) { 
    return spark.read() 
      .option("header", true) 
      .option("inferSchema", "true") 
      .csv(ZIP_CODE_DATA.getAbsolutePath()) 
      .as(Encoders.bean(ZipCodeLatLon.class)); 
} 
関連する問題