2016-04-01 7 views
10

例えば方法pysparkにおけるデータフレームの各行をループ

sqlContext = SQLContext(sc) 

sample=sqlContext.sql("select Name ,age ,city from user") 
sample.show() 

に上記のステートメントは、端末上のテーブル全体を印刷するが、私は、さらに計算を実行するまたはしばらくを使用して、そのテーブルの各行にアクセスしたいです。

+0

私は正しい答えを提供したと信じています。改善のために選択することができますか、フィードバックを提供できますか? – aaronsteers

答えて

13

あなたは単純にできません。 DataFramesは、他の分散データ構造と同じで、iterableではなく、専用の上位関数および/またはSQLメソッドのみを使用してアクセスできます。

もちろんcollecttoLocalIteratorを変換し、ローカル

for row in df.rdd.collect(): 
    do_something(row) 

を繰り返すが、それはスパークを使用して、すべての目的を打つことができますか。

2

DataFrameオブジェクトの各行に何かしたい場合は、mapを使用します。これにより、各行に対してさらに計算を実行できます。これは、0からlen(dataset)-1までのデータセット全体をループするのと同じです。

これは、DataFrameではなくPipelinedRDDを返します。

21

カスタム関数を定義してマップを使用します。

def customFunction(row): 

    return (row.name, row.age, row.city) 

sample2 = sample.rdd.map(customFunction) 

又は

sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city)) 

カスタム関数は、データフレームの各行に適用されます。 sample2はデータフレームではなくRDDになります。

さらに複雑な計算を実行する場合は、マップが必要です。派生カラムを追加するだけでよい場合は、withColumnを使用して、データフレームを返します。

Pythonでリストの内包表記を使用して
sample3 = sample.withColumn('age2', sample.age + 2) 
2

、あなただけの二行使用して、リストへの値の列全体を収集することができます。上記の例で

df = sqlContext.sql("show tables in default") 
tableList = [x["tableName"] for x in df.rdd.collect()] 

を、私たちは、「データベース内のテーブルのリストを返しますデフォルト 'ですが、sql()で使用されているクエリを置き換えることで同様に変更できます。

以上の省略:

tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()] 

3列のあなたの例のために、私たちは辞書のリストを作成することができ、その後のためのループでそれらを反復処理。

tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 

上記

sql_text = "select name, age, city from user" 
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
      for x in sqlContext.sql(sql_text).rdd.collect()] 
for row in tupleList: 
    print("{} is a {} year old from {}".format(
     row["name"], 
     row["age"], 
     row["city"])) 
0

nameため

tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]} 

ageであるべき、とcityは変数単に辞書のキーではありません。

関連する問題