2017-12-29 28 views
1

Javaスパークアプリケーションのフィルタと型付きデータセットのマップでラムダ関数を使用する際に問題があります。Spark CSV - 実際のパラメータに該当するコンストラクタ/メソッドが見つかりません

私は以下のクラスと火花2.2.0を使用しています。このランタイムエラーに

ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 130, Column 126: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)" 

を取得しています。サンプルデータと 全例がhttps://gitlab.com/opencell/test-bigdata

Dataset<CDR> cdr = spark 
      .read() 
      .format("csv") 
      .option("header", "true") 
      .option("inferSchema", "true") 
      .option("delimiter", ";") 
      .csv("CDR_SAMPLE.csv") 
      .as(Encoders.bean(CDR.class)); 

    long v = cdr.filter(x -> (x.timestamp != null && x.getAccess().length()>0)).count(); 

    System.out.println("validated entries :" + v); 

CDRファイルの定義で利用可能ですgitlab link

EDIT

val cdrCSVSchema = StructType(Array(
    StructField("timestamp", DataTypes.TimestampType), 
    StructField("quantity", DataTypes.DoubleType), 
    StructField("access", DataTypes.StringType), 
    StructField("param1", DataTypes.StringType), 
    StructField("param2", DataTypes.StringType), 
    StructField("param3", DataTypes.StringType), 
    StructField("param4", DataTypes.StringType), 
    StructField("param5", DataTypes.StringType), 
    StructField("param6", DataTypes.StringType), 
    StructField("param7", DataTypes.StringType), 
    StructField("param8", DataTypes.StringType), 
    StructField("param9", DataTypes.StringType), 
    StructField("dateParam1", DataTypes.TimestampType), 
    StructField("dateParam2", DataTypes.TimestampType), 
    StructField("dateParam3", DataTypes.TimestampType), 
    StructField("dateParam4", DataTypes.TimestampType), 
    StructField("dateParam5", DataTypes.TimestampType), 
    StructField("decimalParam1", DataTypes.DoubleType), 
    StructField("decimalParam2", DataTypes.DoubleType), 
    StructField("decimalParam3", DataTypes.DoubleType), 
    StructField("decimalParam4", DataTypes.DoubleType), 
    StructField("decimalParam5", DataTypes.DoubleType), 
    StructField("extraParam", DataTypes.StringType))) 

であり、私はCSVドキュメントをロードするには、このコマンドを使用

val cdr = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(cdrCSVSchema).csv("CDR_SAMPLE.csv") 
その後、

とエンコードとラムダ関数を実行するには、このコマンドを試してみましたが、私はまだTLエラー

cdr.as[CDR].filter(c => c.timestamp != null).show 

答えて

0

を取得しています。入力データセットは、(のための型を推論する値を持っていないので、DRは、明示的にスキーマを定義しますjava.sql.Dateフィールド)。あなたのケースでは

、型指定されていないデータセットのAPIを使用すると、(おそらく回避策と正直私は内部行形式から不要な直列化復元を避けるためにそれをお勧めします)解決策が考えられます。

cdr.filter(!$"timestamp".isNull).filter(length($"access") > 0).count 

(それはスカラ座と私の」私は家庭での運動としてJavaに翻訳しています)。

inferSchemaオプションを使用すると、ほとんどのフィールドが入力文字CDR_SAMPLE.csvで使用できなくなり、ほとんどのフィールドタイプがStringになります(より具体的なタイプを推測するための値がない場合のデフォルトタイプ)。

java.sql.DateのフィールドをdateParam1までdateParam5のString型のフィールドにします。

import org.opencell.spark.model.CDR 
import org.apache.spark.sql.Encoders 
implicit val cdrEnc = Encoders.bean(classOf[CDR]) 
val cdrs = spark.read. 
    option("inferSchema", "true"). 
    option("delimiter", ";"). 
    option("header", true). 
    csv("/Users/jacek/dev/sandbox/test-bigdata/CDR_SAMPLE.csv") 
scala> cdrs.printSchema 
root 
|-- timestamp: timestamp (nullable = true) 
|-- quantity: integer (nullable = true) 
|-- access: string (nullable = true) 
|-- param1: string (nullable = true) 
|-- param2: string (nullable = true) 
|-- param3: string (nullable = true) 
|-- param4: string (nullable = true) 
|-- param5: string (nullable = true) 
|-- param6: string (nullable = true) 
|-- param7: string (nullable = true) 
|-- param8: string (nullable = true) 
|-- param9: string (nullable = true) 
|-- dateParam1: string (nullable = true) 
|-- dateParam2: string (nullable = true) 
|-- dateParam3: string (nullable = true) 
|-- dateParam4: string (nullable = true) 
|-- dateParam5: string (nullable = true) 
|-- decimalParam1: string (nullable = true) 
|-- decimalParam2: string (nullable = true) 
|-- decimalParam3: string (nullable = true) 
|-- decimalParam4: string (nullable = true) 
|-- decimalParam5: string (nullable = true) 
|-- extraParam: string (nullable = true) 

dateParam5に、すなわちdateParam1興味のある分野は、すべての文字列であること。

|-- dateParam1: string (nullable = true) 
|-- dateParam2: string (nullable = true) 
|-- dateParam3: string (nullable = true) 
|-- dateParam4: string (nullable = true) 
|-- dateParam5: string (nullable = true) 

あなたは「ふり」問題の表面が言うCDRクラスで定義されているフィールドの種類は、エンコーダを使用することにより異なります。問題の根本的な原因だ

private Date dateParam1; 
private Date dateParam2; 
private Date dateParam3; 
private Date dateParam4; 
private Date dateParam5; 

を。 Sparkがクラスから推論できるものには違いがあります。変換せずにコードが働いてきただろうが、あなたが主張...

cdrs.as[CDR]. // <-- HERE is the issue = types don't match 
    filter(cdr => cdr.timestamp != null). 
    show // <-- trigger conversion 

ので、それは本当に問題であなたがfilterオペレータにアクセスするものをフィールドしません。問題は、変換が正しく実行されないこと(および全段階のJavaコード生成)につながることです。

inferSchemaにタイプ推論に使用する値のないデータセットをリクエストしてから、Sparkがそれについて多くのことを行うことはできません。最善の策は、スキーマを明示的に定義し、schema(...)演算子を使用して設定することです。

+1

私の編集を確認してください、私は同じエラーが発生しています... –

関連する問題