2016-12-09 7 views
-2

私はパラメータとしてデータフレーム全体を取り込むプログラムを実装しています。私はこれがスパークのサポートではないかもしれないが、私の問題を解決する良い方法があることを知りたいと思います。Sparkの関数パラメータとしてデータフレームを渡す方法

私はこのようなスパークデータフレームを持っている:

Item_sale_table 
    item_id date Sale Amount 
    aaa  3-11  20 
    aaa  3-12  21 
    aaa  3-13  28 
    ...  ...  ... 
    bbb  3-11  17 
    bbb  3-12  12 
    ...  ...  ... 
    ccc  3-11  9 
    ...  ...  ... 

Item_List

item_id description 
aaa   xxxx 
bbb   xxxyx 
ccc   zxsa 
... 

私が何をしたいのか、それはitem_listテーブルから各項目を取得し、item_saleテーブルから履歴データを収集していますその項目に対して、関数(ここでは単純なカウント関数)を適用します。

だから、アイテム処理機能が

def ItemProcess (item_id: String, Dataset: DataFrame) = { 

     val item_count = Dataset.filter(Dataset("item_id").contains(item_id)).count() 

     println(item_id,item_count) 

    } 

のように見えるされており、この関数を呼び出す主な機能は、その後、私は

ERROR Executor: Exception in task 4.0 in stage 11.0 (TID 504) 
java.lang.NullPointerException 
    at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151) 
    at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664) 
    at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652) 

は、だから私は、全体を

val item_count_collection = item_list.select("item_id").foreach(x => ItemProcess(x(0).toString, item_sale_table)) 

渡しましたさforeach関数のデータフレーム私はここに問題があると思う。しかし、それを修正する方法は?

========更新=======

私は私も私はこのようなアイテムの処理機能を組み込む場合でもNullPointerException

val item_count_collection = item_list.select("item_id").foreach(x => Item_sale_table.filter(Item_sale_table("item_id").contains(x(0).toString)).count()) 

答えて

0

集計になるだろう発見し、 (任意に)参加:

val item_counts = item_sale_table.groupBy("item_id").count() 

は任意に参加:

item_list.join(item_counts, Seq("item_id")) 

またはcontains(以下効率的な方法)と:

item_list.join(
    item_counts, 
    item_counts("item_id").contains(item_list("item_id"))), 
    "left" 
) 
+1

私はあなたのメソッドを使用する場合はUDFを使用していない限り、私は、代わりに、カウントのカスタマイズされた機能を実装することはできません。私は正しい? – lserlohn

関連する問題