-2
私はパラメータとしてデータフレーム全体を取り込むプログラムを実装しています。私はこれがスパークのサポートではないかもしれないが、私の問題を解決する良い方法があることを知りたいと思います。Sparkの関数パラメータとしてデータフレームを渡す方法
私はこのようなスパークデータフレームを持っている:
Item_sale_table
item_id date Sale Amount
aaa 3-11 20
aaa 3-12 21
aaa 3-13 28
... ... ...
bbb 3-11 17
bbb 3-12 12
... ... ...
ccc 3-11 9
... ... ...
Item_List
item_id description
aaa xxxx
bbb xxxyx
ccc zxsa
...
私が何をしたいのか、それはitem_list
テーブルから各項目を取得し、item_sale
テーブルから履歴データを収集していますその項目に対して、関数(ここでは単純なカウント関数)を適用します。
だから、アイテム処理機能が
def ItemProcess (item_id: String, Dataset: DataFrame) = {
val item_count = Dataset.filter(Dataset("item_id").contains(item_id)).count()
println(item_id,item_count)
}
のように見えるされており、この関数を呼び出す主な機能は、その後、私は
ERROR Executor: Exception in task 4.0 in stage 11.0 (TID 504)
java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151)
at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664)
at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652)
は、だから私は、全体を
val item_count_collection = item_list.select("item_id").foreach(x => ItemProcess(x(0).toString, item_sale_table))
渡しましたさforeach関数のデータフレーム私はここに問題があると思う。しかし、それを修正する方法は?
========更新=======
私は私も私はこのようなアイテムの処理機能を組み込む場合でもNullPointerException
val item_count_collection = item_list.select("item_id").foreach(x => Item_sale_table.filter(Item_sale_table("item_id").contains(x(0).toString)).count())
私はあなたのメソッドを使用する場合はUDFを使用していない限り、私は、代わりに、カウントのカスタマイズされた機能を実装することはできません。私は正しい? – lserlohn