2016-06-28 6 views
0

がサポートされていません:あなたは、列の記述が これを持って見ることができるようにここでスパークデータフレーム:型ユニットのスキーマが、私はスパーク1.5.0を使用していますが、私はこの問題を持っている

val df = paired_rdd.reduceByKey { 
    case (val1, val2) => val1 + "|" + val2 
}.toDF("user_id","description") 

は、DFのサンプルデータですフォーマット(text1#text3#weight | text1#text3#weight|....)

user1の

book1#author1#0.07841217886795074|tool1#desc1#0.27044260397331488|song1#album1#-0.052661673730870676|item1#category1#-0.005683148395350108

私はここに降順で重量に基づいてDFこれをソートしたい

は、私が試したものです。

最初にコンテンツを「|」に分割します。その後、それらの文字列ごとに、「#」でそれらを分割し、重量で3番目の文字列を取得し、その後、計量値に基づいて、double値にその変換

val getSplitAtWeight = udf((str: String) => { 
    str.split("|").foreach(_.split("#")(2).toDouble) 
}) 

ソート降順に(UDFによって返さやり方)

val df_sorted = df.sort(getSplitAtWeight(col("description")).desc) 

私は次のエラーを取得:あなたのudfにして

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Unit is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) at org.apache.spark.sql.functions$.udf(functions.scala:2242)

答えて

0

変更foreachを次のようには、例外を排除します:

def getSplitAtWeight = udf((str: String) => { 
    str.split('|').map(_.split('#')(2).toDouble) 
}) 

あなたの方法に問題がListforeach方法は、すなわち、その結果はあなたがExceptionを得る理由ですタイプUnitであり、何も返さないということです。 foreachの詳細については、this blogをご確認ください。

+0

スレッド「main」の例外org.apache.spark.sql.AnalysisException:データ型の不一致により 'UDF(注釈)DESC'を解決できません:データ型配列をソートできません。 \t(org.apache.spark.sql.catalyst.analysis.package)$ AnalysisErrorAt.failAnalysis(package.scala:42) – user3803714

+0

正確にはわかりません。スパーク1.6.0では、例外はありません。しかし、私はそれが意味することは、分割後の記述内のすべての文字列に対して、 'array 'がどこに由来するかという4つの数字を得ることです。そして、sparkは、配列を要素として含む列をどのようにソートするかを知らない。また、どの番号を並べ替えるかは、先に進む前に聞くのが良い質問です。 – Psidom

関連する問題