2017-08-30 4 views
0

rでcbindを行うように、PySparkに2つのデータフレームをバインドする具体的な方法はありますか?PySparkカラムワイズバインド

例:

  1. データフレーム1は、10列
  2. データフレーム2を有するIは、データフレームの両方をCBINDとPySparkにおける1つのデータフレームとして作成する必要が1列

を有しています。

答えて

0

最初に私たちのデータフレームを作成してみましょう:

df1 = spark.createDataFrame(sc.parallelize([10*[c] for c in range(10)]), ["c"+ str(i) for i in range(10)]) 
df2 = spark.createDataFrame(sc.parallelize([[c] for c in range(10, 20, 1)]), ["c10"]) 
    +---+---+---+---+---+---+---+---+---+---+ 
    | c0| c1| c2| c3| c4| c5| c6| c7| c8| c9| 
    +---+---+---+---+---+---+---+---+---+---+ 
    | 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 
    | 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 
    | 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 
    | 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 
    | 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 
    | 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 
    | 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 
    | 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 
    | 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 
    | 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 
    +---+---+---+---+---+---+---+---+---+---+ 

    +---+ 
    |c10| 
    +---+ 
    | 10| 
    | 11| 
    | 12| 
    | 13| 
    | 14| 
    | 15| 
    | 16| 
    | 17| 
    | 18| 
    | 19| 
    +---+ 

はその後、我々は、我々はそれらを結合することができ、最後にこのzipWithIndex

from pyspark.sql.types import LongType 
from pyspark.sql import Row 
def zipindexdf(df): 
    schema_new = df.schema.add("index", LongType(), False) 
    return df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new) 

df1_index = zipindexdf(df1) 
df1_index.show() 
df2_index = zipindexdf(df2) 
df2_index.show() 

    +---+---+---+---+---+---+---+---+---+---+-----+ 
    | c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|index| 
    +---+---+---+---+---+---+---+---+---+---+-----+ 
    | 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 
    | 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 
    | 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 
    | 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 
    | 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 
    | 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 
    | 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 
    | 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 
    | 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 
    | 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 
    +---+---+---+---+---+---+---+---+---+---+-----+ 

    +---+-----+ 
    |c10|index| 
    +---+-----+ 
    | 10| 0| 
    | 11| 1| 
    | 12| 2| 
    | 13| 3| 
    | 14| 4| 
    | 15| 5| 
    | 16| 6| 
    | 17| 7| 
    | 18| 8| 
    | 19| 9| 
    +---+-----+ 

を行うことができますRDDする機能があり、行を一意に識別するためにしたいです:

df = df1_index.join(df2_index, "index", "inner") 

    +-----+---+---+---+---+---+---+---+---+---+---+---+ 
    |index| c0| c1| c2| c3| c4| c5| c6| c7| c8| c9|c10| 
    +-----+---+---+---+---+---+---+---+---+---+---+---+ 
    | 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 10| 
    | 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 7| 17| 
    | 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 6| 16| 
    | 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 9| 19| 
    | 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 5| 15| 
    | 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 11| 
    | 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 3| 13| 
    | 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 8| 18| 
    | 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 2| 12| 
    | 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 4| 14| 
    +-----+---+---+---+---+---+---+---+---+---+---+---+ 
+0

どうもありがとう..その細かい作業:) –

+0

をこれはしていません異なるパーティションにまたがって格納される2つの別々の大きなDataFramesで動作し、各DataFrameは異なる行のパーティション間で分離されます。 [documentation](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id)から "現在の実装では、パーティションIDは上位31ビット、下位33ビットの各パーティション内のレコード番号 "*** – Clay

+1

あなたは正しいです、私はそれを書いたとは思えません..." MonotonicallyIncreasingID "のカウントは、すべてのタスクの起点 – MaFF

0

IDが単調に増加する列を取得するには、一意のが連続している場合は、各DataFrameで次のように使用します。colNameは、各DataFrameを並べ替える列名です。

import pyspark.sql.functions as F 
from pyspark.sql.window import Window as W 

window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow) 

df = df\ 
.withColumn('int', F.lit(1))\ 
.withColumn('consec_id', F.sum('int').over(window))\ 
.drop('int')\ 

すべてが尾を見て、次のコードを使用し、正しく並んで、またはデータフレームの最後rownumsされていることを確認するには。

rownums = 10 
df.where(F.col('consec_id')>df.count()-rownums).show() 

データフレームのend_rowstart_rowから行を見て、次のコードを使用します。

start_row = 20 
end_row = 30 
df.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show() 

更新

働くもう一つの方法は、RDD方式zipWithIndex()です。私だけで、このRDD方式を使用して、連続したIDのカラムで既存のデータフレームを修正するには、:

  1. はRDDにDFを変換し、
  2. zipWithIndex()法を適用し、
  3. は、データフレームに戻っRDDを変換
  4. RDDにデータフレームを変換し、
  5. は、インデックスの元のデータフレームのRDD行オブジェクトを組み合わせることRDDラムダ関数をマッピングし
  6. を用いてデータフレームに最終RDD変換元の列名+ zipWithIndex()で作成された整数のID列。

Iはまた@MaFFがしたものと同様zipWithIndex()の出力を含むインデックス列と元のデータフレームを修正する方法を試みたが、結果も遅かったです。ウィンドウ関数は、これらのいずれかよりも約1桁高速です。この時間の増加の大部分は、DataFrameをRDDに変換して元に戻すことにあるようです。

zipWithIndex() RDDメソッドの出力を元のDataFrameの列として追加する方が速い方法がある場合はお知らせください。

42,000行の90列のDataFrameでのテストでは、次の結果が得られます。

import time 

def test_zip(df): 
    startTime = time.time() 
    df_1 = df \ 
    .rdd.zipWithIndex().toDF() \ 
    .rdd.map(lambda row: (row._1) + (row._2,)) \ 
    .toDF(df_all_indexed.columns +['consec_id']) 

    start_row = 20000 
    end_row = 20010 
    df_1.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show() 
    endTime = time.time() - startTime 
    return str(round(endTime,3)) + " seconds" 

[test_zip(df) for _ in range(5)] 

['59 0.813秒、 '39 0.574秒、 '36 0.074秒、 '35 0.436秒、 '35 0.636秒 ']

import time 
import pyspark.sql.functions as F 
from pyspark.sql.window import Window as W 

def test_win(df): 
    startTime = time.time() 
    window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow) 
    df_2 = df \ 
    .withColumn('int', F.lit(1)) \ 
    .withColumn('IDcol', F.sum('int').over(window)) \ 
    .drop('int') 

    start_row = 20000 
    end_row = 20010 
    df_2.where((F.col('consec_id')>start_row) & (F.col('consec_id')<end_row)).show() 
    endTime = time.time() - startTime 
    return str(round(endTime,3)) + " seconds" 

[test_win(df) for _ in range(5)] 

[' 4.19秒、 '4.508秒'、 '4.099秒'、 '4.012秒'、 '4.045秒']

import time 
from pyspark.sql.types import StructType, StructField 
import pyspark.sql.types as T 

def test_zip2(df): 
    startTime = time.time() 
    schema_new = StructType(list(df.schema) + [StructField("consec_id", T.LongType(), False)]) 
    df_3 = df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new) 

    start_row = 20000 
    end_row = 20010 
    df_3.where((F.col('IDcol')>start_row) & (F.col('consec_id')<end_row)).show() 
    endTime = time.time() - startTime 
    return str(round(endTime,3)) + " seconds" 

[test_zip2(testdf) for _ in range(5)] 

['82 0.795秒、 '61 0.689秒、 '58 0.181秒、 '58 0.01秒、 '57 0.765秒]