2016-04-20 8 views
0

私のSpark 1.6アプリケーションでは、パーティションを選択して特定のパーティションのみをクエリするいくつかのコードがあります。私はこの使用してください:Apache Spark 1.6:単一パーティションをクエリすると、属性にエラーが発生する

val rdd = df.rdd.mapPartitionsWithIndex((idx, iter) => if (idx == 0) iter else Iterator(), true) 
val newDF = sqlContext.createDataFrame(rdd, df.schema) 

newDF.withColumn("newField", myUDF(df("oldField")).mapPartitions(...) 

に、私は私に

resolved attribute(s) oldField#36 missing from idField#51L,oldField#52 in operator !Project [idField#51L,oldField#52,UDF(oldField#36) AS newField#53]; 

をそれを得るように、新しいmapPartitionsは、フィールド上で呼び出すと、私はその後、一緒にUDFを呼び出す場合"oldField"というフィールドが何らかの形で現れているようですが、新しいDataFrameを作成したためでしょうか? - 間違ったID(oldField#52oldField#36を比較)。私の古いDataFrameとnewDFのスキーマを印刷すると、どちらも同じように見えます。

このエラーを回避するにはどうすればよいですか(コード内の操作の順序を変更することはできませんが、現行の構造はあまり役に立ちません)。

答えて

2

スコープに含まれていないDataFrameに名前をバインドしないでください。あなたはcol functionを使用することができます。

newDF.withColumn("newField", myUDF(col("oldField")) 

implicit conversions

newDF.withColumn("newField", myUDF($"oldField")) 

または現在DataFrame

newDF.withColumn("newField", myUDF(newDF("oldField")) 
+0

はい!ありがとう!完璧に動作します!どうすれば... – navige

関連する問題