2016-05-21 7 views
0

火花の初心者でデータフレームにmap関数に関する問題があります。pysparkのマップ関数によってsparkのsqlデータフレームの数を数えます。

temp = sqlContext.sql("SELECT * FROM df WHERE tag = 'A00000000001'") 
temp.show(1) 

はその後、我々が持っている:私はコマンドを使用してタグ値に基づいて、それらの一部を選択することができます

+----------+------------+------+ 
|  time|   tag| value| 
+----------+------------+------+ 
|1399766400|A00000000001|1000.0| 
|1399766401|A00000000002|1001.0| 
+----------+------------+------+ 

:私は次のようにそれがあると仮定すると、dfという名前のスパークのSQLデータフレームを、持っている

+----------+------------+------+ 
|  time|   tag| value| 
+----------+------------+------+ 
|1399766400|A00000000001|1000.0| 
+----------+------------+------+ 

現在、私はリスト

x = ["SELECT * FROM df WHERE tag = 'A00000000001'", "SELECT * FROM df WHERE tag = 'A00000000002'"] 
を持って

ようRDD変数として格納されていると私はそれらに基づいて選択されたデータフレームの数をカウントすることでmap関数を適用したいと思いますどの、私は機能を試してみました:私は、戻り値がすべきことが想定

y = x.map(lambda x: sqlContext.sql(x).count()) 
y.take(2) 

[1, 1]なるが、それはエラーを与える:

TypeError: 'JavaPackage' object is not callable 

が、それはこの方法でデータフレームにマップ機能を実行することは可能ですか?もしそうでなければ、私はどうすればいいですか?

答えて

2

すでに述べたように、分散データ構造に対してネストされた操作を実行することはできません。より一般的な意味で、スパークはデータベースではありません。 DataFramesを含むスパークデータ構造は、単一レコード検索などのタスク用に設計されていません。

tags = sc.parallelize([("A00000000001",), ("A00000000002",)]).toDF(["tag"]) 
tags.join(df, ["tag"]).groupBy("tag").count() 
0

これはできません。リスト内包表記を使用することができます。

>>> xs = ["SELECT * FROM df WHERE tag = 'A00000000001'", "SELECT * FROM df WHERE tag = 'A00000000002'"] 
>>> [sqlContext.sql(x).count() for x in xs] 
+0

ので、RDD変数にリストなら、私が持っている:すべてのクエリを使用すると、列によって単純なフィルタを使用するのと同じパターンに従っている場合

はそれは、単純な集計の問題だけとし、参加しています最初に「収集」しますか?収集に時間がかかる。 –

+0

あなたのRDDが "少数"の要素に束縛されていない限り、すべてのデータをマスターにして収集することを避けることをお勧めします。私は間違いなくクリーナーソリューションとして@ zero323答えを検討するだろう。 – eliasah

関連する問題