例えば方法pysparkにおけるデータフレームの各行をループ
sqlContext = SQLContext(sc)
sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()
に上記のステートメントは、端末上のテーブル全体を印刷するが、私は、さらに計算を実行するまたはしばらくを使用して、そのテーブルの各行にアクセスしたいです。
例えば方法pysparkにおけるデータフレームの各行をループ
sqlContext = SQLContext(sc)
sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()
に上記のステートメントは、端末上のテーブル全体を印刷するが、私は、さらに計算を実行するまたはしばらくを使用して、そのテーブルの各行にアクセスしたいです。
あなたは単純にできません。 DataFrames
は、他の分散データ構造と同じで、iterableではなく、専用の上位関数および/またはSQLメソッドのみを使用してアクセスできます。
もちろんcollect
のtoLocalIterator
を変換し、ローカル
for row in df.rdd.collect():
do_something(row)
を繰り返すが、それはスパークを使用して、すべての目的を打つことができますか。
DataFrameオブジェクトの各行に何かしたい場合は、map
を使用します。これにより、各行に対してさらに計算を実行できます。これは、0
からlen(dataset)-1
までのデータセット全体をループするのと同じです。
これは、DataFrameではなくPipelinedRDDを返します。
カスタム関数を定義してマップを使用します。
def customFunction(row):
return (row.name, row.age, row.city)
sample2 = sample.rdd.map(customFunction)
又は
sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))
カスタム関数は、データフレームの各行に適用されます。 sample2はデータフレームではなくRDD
になります。
さらに複雑な計算を実行する場合は、マップが必要です。派生カラムを追加するだけでよい場合は、withColumn
を使用して、データフレームを返します。
sample3 = sample.withColumn('age2', sample.age + 2)
、あなただけの二行使用して、リストへの値の列全体を収集することができます。上記の例で
df = sqlContext.sql("show tables in default")
tableList = [x["tableName"] for x in df.rdd.collect()]
を、私たちは、「データベース内のテーブルのリストを返しますデフォルト 'ですが、sql()で使用されているクエリを置き換えることで同様に変更できます。
以上の省略:
tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()]
3列のあなたの例のために、私たちは辞書のリストを作成することができ、その後のためのループでそれらを反復処理。
tupleList = [{name:x["name"], age:x["age"], city:x["city"]}
上記
sql_text = "select name, age, city from user"
tupleList = [{name:x["name"], age:x["age"], city:x["city"]}
for x in sqlContext.sql(sql_text).rdd.collect()]
for row in tupleList:
print("{} is a {} year old from {}".format(
row["name"],
row["age"],
row["city"]))
はname
ため
tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]}
、age
であるべき、とcity
は変数単に辞書のキーではありません。
私は正しい答えを提供したと信じています。改善のために選択することができますか、フィードバックを提供できますか? – aaronsteers