2016-04-13 5 views
2

foreachループ内でさまざまなSELECT文を使用してDFの内容を繰り返し処理し、出力をテキストファイルに書き出す必要があります。 foreachループ内のSELECT文は、NullPointerExceptionを返します。私はなぜこれが見えないのですか? "for"ループ内のSELECTステートメントはこのエラーを返しません。spark scala:foreachループのSELECTはjava.lang.NullPointerExceptionを返します

これはテストケースです。

// step 1 of 6: create the table and load two rows 
vc.sql(s"""CREATE TEMPORARY TABLE TEST1 (
c1  varchar(4) 
,username varchar(5) 
,numeric integer) USING com.databricks.spark.csv OPTIONS (path "/tmp/test.txt")""") 

// step 2 of 6: confirm that the data is queryable 
vc.sql("SELECT * FROM TEST1").show() 
+----+--------+-------+ 
| c1|username|numeric| 
+----+--------+-------+ 
|col1| USER1|  0| 
|col1| USER2|  1| 
+----+--------+-------+ 

// Step 3 of 6: create a dataframe for the table 
var df=vc.sql("""SELECT * FROM TEST1""") 


// step 4 of 6: create a second dataframe that we will use as a loop iterator 
var df_usernames=vc.sql(s"""SELECT DISTINCT username FROM TEST1 """) 

// step 5 of 6: first foreach loop works ok: 
df_usernames.foreach(t => 
    { 
     println("(The First foreach works ok: loop iterator t is " + t(0).toString()) 
    } 
) 
(The First foreach works ok: loop iterator t is USER1 
(The First foreach works ok: loop iterator t is USER2 

// step 6 of 6: second foreach with any embedded SQL returns an error 
df_usernames.foreach(t => 
    { 
     println("(The second foreach dies: loop iterator t is " +  t(0).toString()) 
     vc.sql("""SELECT c1 FROM TEST1""").show() 
    } 
)  
The second foreach dies: loop iterator t is USER1 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 158  in stage 94.0 failed 1 times, most recent failure: Lost task 158.0 in stage 94.0 (TID 3525, localhost): java.lang.NullPointerException 
    at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:195) 

答えて

1

これはできません。あなたは

>>> df_usernames.collect.foreach(
... lambda x: sqlContext.sql("""SELECT c1 FROM TEST1""").show()) 
+1

最初これはOPを使用していたものと同じforeachの思想ではありません、それはカーディナリティとデータのサイズを知らなくても収集することをお勧めではありません収集呼び出すことなく、foreachの内部でSQLクエリを起動することはできません。 1つの例につき2Mのユーザーがいるとすれば、規模は変わりません。 – eliasah

+0

収集を使用せずにこれを達成する方法はありますか? RDDの各「行」について、私は既存のデータ(SparkSession.sqlからロードできる)と比較する必要があります。 – KangarooWest

関連する問題