rでcbindを行うように、PySparkに2つのデータフレームをバインドする具体的な方法はありますか?PySparkカラムワイズバインド
例:
- データフレーム1は、10列
- データフレーム2を有するIは、データフレームの両方をCBINDとPySparkにおける1つのデータフレームとして作成する必要が1列
を有しています。
rでcbindを行うように、PySparkに2つのデータフレームをバインドする具体的な方法はありますか?PySparkカラムワイズバインド
例:
を有しています。
最初に私たちのデータフレームを作成してみましょう:
df1 = spark.createDataFrame(sc.parallelize([10*[c] for c in range(10)]), ["c"+ str(i) for i in range(10)])
df2 = spark.createDataFrame(sc.parallelize([[c] for c in range(10, 20, 1)]), ["c10"])
+---+---+---+---+---+---+---+---+---+---+
| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|
+---+---+---+---+---+---+---+---+---+---+
| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1|
| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2|
| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3|
| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4|
| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5|
| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6|
| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7|
| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8|
| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9|
+---+---+---+---+---+---+---+---+---+---+
+---+
|c10|
+---+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
はその後、我々は、我々はそれらを結合することができ、最後にこのzipWithIndex
from pyspark.sql.types import LongType
from pyspark.sql import Row
def zipindexdf(df):
schema_new = df.schema.add("index", LongType(), False)
return df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
df1_index = zipindexdf(df1)
df1_index.show()
df2_index = zipindexdf(df2)
df2_index.show()
+---+---+---+---+---+---+---+---+---+---+-----+
| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|index|
+---+---+---+---+---+---+---+---+---+---+-----+
| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1|
| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2|
| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3|
| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4|
| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5|
| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6|
| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7|
| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8|
| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9|
+---+---+---+---+---+---+---+---+---+---+-----+
+---+-----+
|c10|index|
+---+-----+
| 10| 0|
| 11| 1|
| 12| 2|
| 13| 3|
| 14| 4|
| 15| 5|
| 16| 6|
| 17| 7|
| 18| 8|
| 19| 9|
+---+-----+
を行うことができますRDD
する機能があり、行を一意に識別するためにしたいです:
df = df1_index.join(df2_index, "index", "inner")
+-----+---+---+---+---+---+---+---+---+---+---+---+
|index| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|c10|
+-----+---+---+---+---+---+---+---+---+---+---+---+
| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 10|
| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 17|
| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 16|
| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 19|
| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 15|
| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 11|
| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 13|
| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 18|
| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 12|
| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 14|
+-----+---+---+---+---+---+---+---+---+---+---+---+
IDが単調に増加する列を取得するには、一意のとが連続している場合は、各DataFrameで次のように使用します。colName
は、各DataFrameを並べ替える列名です。
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
df = df\
.withColumn('int', F.lit(1))\
.withColumn('consec_id', F.sum('int').over(window))\
.drop('int')\
すべてが尾を見て、次のコードを使用し、正しく並んで、またはデータフレームの最後rownums
されていることを確認するには。
rownums = 10
df.where(F.col('consec_id')>df.count()-rownums).show()
データフレームのend_row
にstart_row
から行を見て、次のコードを使用します。
start_row = 20
end_row = 30
df.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
働くもう一つの方法は、RDD方式zipWithIndex()
です。私だけで、このRDD方式を使用して、連続したIDのカラムで既存のデータフレームを修正するには、:
zipWithIndex()
法を適用し、zipWithIndex()
で作成された整数のID列。Iはまた@MaFFがしたものと同様zipWithIndex()
の出力を含むインデックス列と元のデータフレームを修正する方法を試みたが、結果も遅かったです。ウィンドウ関数は、これらのいずれかよりも約1桁高速です。この時間の増加の大部分は、DataFrameをRDDに変換して元に戻すことにあるようです。
zipWithIndex()
RDDメソッドの出力を元のDataFrameの列として追加する方が速い方法がある場合はお知らせください。
42,000行の90列のDataFrameでのテストでは、次の結果が得られます。
import time
def test_zip(df):
startTime = time.time()
df_1 = df \
.rdd.zipWithIndex().toDF() \
.rdd.map(lambda row: (row._1) + (row._2,)) \
.toDF(df_all_indexed.columns +['consec_id'])
start_row = 20000
end_row = 20010
df_1.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_zip(df) for _ in range(5)]
['59 0.813秒、 '39 0.574秒、 '36 0.074秒、 '35 0.436秒、 '35 0.636秒 ']
import time
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
def test_win(df):
startTime = time.time()
window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
df_2 = df \
.withColumn('int', F.lit(1)) \
.withColumn('IDcol', F.sum('int').over(window)) \
.drop('int')
start_row = 20000
end_row = 20010
df_2.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_win(df) for _ in range(5)]
[' 4.19秒、 '4.508秒'、 '4.099秒'、 '4.012秒'、 '4.045秒']
import time
from pyspark.sql.types import StructType, StructField
import pyspark.sql.types as T
def test_zip2(df):
startTime = time.time()
schema_new = StructType(list(df.schema) + [StructField("consec_id", T.LongType(), False)])
df_3 = df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
start_row = 20000
end_row = 20010
df_3.where((F.col('IDcol')>start_row) & (F.col('consec_id')<end_row)).show()
endTime = time.time() - startTime
return str(round(endTime,3)) + " seconds"
[test_zip2(testdf) for _ in range(5)]
['82 0.795秒、 '61 0.689秒、 '58 0.181秒、 '58 0.01秒、 '57 0.765秒]
どうもありがとう..その細かい作業:) –
をこれはしていません異なるパーティションにまたがって格納される2つの別々の大きなDataFramesで動作し、各DataFrameは異なる行のパーティション間で分離されます。 [documentation](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id)から "現在の実装では、パーティションIDは上位31ビット、下位33ビットの各パーティション内のレコード番号 "*** – Clay
あなたは正しいです、私はそれを書いたとは思えません..." MonotonicallyIncreasingID "のカウントは、すべてのタスクの起点 – MaFF