多くの列を格納する寄木細工ファイルからロードされたDataFrame
があります。これらのうちの2つはユーザー識別子の配列で、もう1つは彼が訪れた状態です。ユーザー識別子列は、配列の配列(WrappedArray
がSparkのように)として格納されます。すべてのサブ配列は、最初の要素として識別子の型を持ち、2番目の要素としてその値を持ちます。たとえば、ID 1045のJon Smithというユーザーは、WrappedArray(WrappedArray("name","Jon Smith"), WrappedArray("id","1045"))
という名前で保存されます。 (サブ配列はString
sの配列です)。Spark SQL計算行
ので、表には、次のようになります。上の
uid | state
---------------------------------------------------------------------------------------
WrappedArray(WrappedArray("name","Jon Smith"), WrappedArray("id","1045")) | TX
WrappedArray(WrappedArray("name","Jon Smith"), WrappedArray("id","1045")) | ND
WrappedArray(WrappedArray("name","Jane Katz"), WrappedArray("id","1056")) | IO
ので。このため、私はこのようになりますuid
配列、からIDを解析し、新しいUDFを作成しましたので、
id | states
--------------------
1045 | 2
1056 | 1
:私は、各ユーザーのIDと彼のあった状態の数を持つテーブルをしたいです:
import scala.collection.mutable
def IDfromUID(uid: mutable.WrappedArray[mutable.WrappedArray[String]]): String = {
val ID = uid.filter(_(0) == "id")
ID.length match {
case 0 => null
case _ => ID(0)(1) }
}
と、私は次のクエリでそれを使用します。
sqlContext.udf.register("IDfromUID",(uid: String) => IDfromUID(uid))
df.registerTempTable("RawData")
sqlContext.sql("with Data as (select IDfromUID(uid) as id, state from RawData where uid is not null) select id, count(state) as states from Data where id not null group by id")
しかし、私はwhere uid is not null
に言及しているにもかかわらず、私はまだNullPointerException
が来ますIDfromUID
。私はあることをUDFを変更する場合にのみ停止します。
import scala.collection.mutable
def IDfromUID(uid: mutable.WrappedArray[mutable.WrappedArray[String]]): String = {
if (uid==null) null
else {
val ID = uid.filter(_(0) == "id")
ID.length match {
case 0 => null
case _ => ID(0)(1) } }
}
質問で私を残している - スパークは、データの行を計算しようとしない理由、それは厳密にはないように言われますか?
私はSpark 1.6.2を使用していますが、同じUDFを使用して複数のクエリが並行して実行されています。
あなたが唯一のサブクエリを実行する場合、同じことが、uidがNULL'ないのRawDataから状態、すなわち 'IDとしてIDfromUID(UID)を選択し、発生しますか? –
@TzachZohar初めに 'id id not null'を削除しましたが、試してみませんでした。 – shakedzy