2017-09-05 8 views
0

私は動的スキーマ生成を使用してDataframeを作成しようとしています。ここでは、コードスニペットです:RDD用DataFrameを作成できません

def mapMetricList(row: Row): Seq[Metric] = ??? 

val fields = Seq("Field1", "Field2") 

case class Metric(name: String, count: Long) 
def convertMetricList(df: DataFrame): DataFrame = { 
    val outputFields = df.schema.fieldNames.filter(f => fields.contains(f)) 

    val rdd = df.rdd.map(row => { 
    val schema = row.schema 
    val metrics = mapMetricList(row) 
    val s = outputFields.map(name => row.get(schema.fieldIndex(name))) 
    Row.fromSeq(s ++ Seq(metrics)) 
    }) 

    val nonMetricsSchema = outputFields.map(f => df.schema.apply(f)) 
    val metricField = StructField("total",ArrayType(ScalaReflection.schemaFor[Metric].dataType.asInstanceOf[StructType]),nullable=true) 
    val schema = StructType(nonMetricsSchema ++ Seq(metricField)) 
    schema.printTreeString() 
    val dff = spark.createDataFrame(rdd, schema) 
    dff 
} 

私は実行時にこれらの例外を得続けるしかし:

Caused by: java.lang.RuntimeException: Metric is not a valid external type for schema of struct<name:string,count:bigint> 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfCondExpr3$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr4$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290) 

私はスパーク2.1.0

+0

"Metric"クラスが内部にある場合、このようなエラーが発生することがあります。クラス「メトリック」を自分のファイルに移動します。 – pasha701

+0

私はケースクラスを別のファイルに移動しようとしましたが、エラーはまだあります。 –

答えて

0

を使用しています私のコンピュータ上でスパーク1.6で正常に働いていた、私は"convertMetricList"関数の結果を出力します。 「metricField」フィールドの「count」タイプで問題が発生する可能性があります。言及したあなたのトレース "BIGINT" では、私のenvタイプの "LongType" です:

StructField(total,ArrayType(
    StructType(StructField(name,StringType,true), 
    StructField(count,LongType,false) 
),true),true) 

あなたはあなたのENVの "METRICFIELD" タイプを確認することができます。異なる場合、回避策は、メトリック構造をハードコードすることです。

+0

あなたの答えをありがとう、私はスパーク1.6で自分のコードをテストし、それが動作します。私は2.0で何が変わったのか分かりません。 –

+0

明らかに、これは誤って1.6で実装され、2.0から削除されました。 https://issues.apache.org/jira/browse/SPARK-15507 –

+0

私も同じ問題に直面しています。これらの問題を解決するためのソリューションまたは回避策 –