2
foreachループ内でさまざまなSELECT文を使用してDFの内容を繰り返し処理し、出力をテキストファイルに書き出す必要があります。 foreachループ内のSELECT文は、NullPointerExceptionを返します。私はなぜこれが見えないのですか? "for"ループ内のSELECTステートメントはこのエラーを返しません。spark scala:foreachループのSELECTはjava.lang.NullPointerExceptionを返します
これはテストケースです。
// step 1 of 6: create the table and load two rows
vc.sql(s"""CREATE TEMPORARY TABLE TEST1 (
c1 varchar(4)
,username varchar(5)
,numeric integer) USING com.databricks.spark.csv OPTIONS (path "/tmp/test.txt")""")
// step 2 of 6: confirm that the data is queryable
vc.sql("SELECT * FROM TEST1").show()
+----+--------+-------+
| c1|username|numeric|
+----+--------+-------+
|col1| USER1| 0|
|col1| USER2| 1|
+----+--------+-------+
// Step 3 of 6: create a dataframe for the table
var df=vc.sql("""SELECT * FROM TEST1""")
// step 4 of 6: create a second dataframe that we will use as a loop iterator
var df_usernames=vc.sql(s"""SELECT DISTINCT username FROM TEST1 """)
// step 5 of 6: first foreach loop works ok:
df_usernames.foreach(t =>
{
println("(The First foreach works ok: loop iterator t is " + t(0).toString())
}
)
(The First foreach works ok: loop iterator t is USER1
(The First foreach works ok: loop iterator t is USER2
// step 6 of 6: second foreach with any embedded SQL returns an error
df_usernames.foreach(t =>
{
println("(The second foreach dies: loop iterator t is " + t(0).toString())
vc.sql("""SELECT c1 FROM TEST1""").show()
}
)
The second foreach dies: loop iterator t is USER1
org.apache.spark.SparkException: Job aborted due to stage failure: Task 158 in stage 94.0 failed 1 times, most recent failure: Lost task 158.0 in stage 94.0 (TID 3525, localhost): java.lang.NullPointerException
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:195)
最初これはOPを使用していたものと同じforeachの思想ではありません、それはカーディナリティとデータのサイズを知らなくても収集することをお勧めではありません収集呼び出すことなく、foreachの内部でSQLクエリを起動することはできません。 1つの例につき2Mのユーザーがいるとすれば、規模は変わりません。 – eliasah
収集を使用せずにこれを達成する方法はありますか? RDDの各「行」について、私は既存のデータ(SparkSession.sqlからロードできる)と比較する必要があります。 – KangarooWest