2015-12-07 3 views
5

私は2つのRDDs PySparkで持っているの日付データに基づいて行の集計を行う方法:単一RDDからの2 RDDSの列を追加し、その後PySpark

RDD1:

[(u'2013-01-31 00:00:00', u'a', u'Pab', u'abc', u'd'),(u'2013-01-31 00:00:00', u'a', u'ab', u'abc', u'g'),.....] 

RDD2:

[(u'41',u'42.0'),(u'24',u'98.0'),....] 

どちらRDDsは同じ番号または行を持っています。ここでは、各行のすべての列をRDD1(unicodeを標準stringに変換)からRDD2の各行(unicode stringからfloatに変換)の各行から取り出し、新しいRDDを作成します。だから、新しいRDDは、次のようになります。

RDD3:

[('2013-01-31 00:00:00', 'a', 'Pab', 'abc', 'd',42.0),('2013-01-31 00:00:00', 'a', 'ab', u'abc', 'g',98.0),.....] 

それは、私がすることで、この新しいRDD3の各行(float値)の最後の値のaggregationをしたい完了したら第1列の値はdateです。それはdate2013-01-31 00:00:00であるすべての行を処理し、最後の数値を追加する必要があります。

どのように私はPySparkでこれを行うことができますか?あなたの質問の最初の部分については

+1

あなたは、このように私はあなたがzip圧縮しなければならない... –

+0

@AlbertoBonsantoはあなたが私はそれを行うことができますどのように示すことができると考え、それらを結合するための鍵を持っていませんか? –

+0

@AlbertoBonsantoは 'rdd3 = izip(rdd1。toLocalIterator()、rdd2.toLocalIterator()) 'で十分ですか? –

答えて

0

、それは各行が7のタプルである1に2 RDDSを組み合わせることで、あなたはこれを行うことができます。

rdd3 = rdd1.zip(rdd2).map(lambda ((a,b,c,d,e), (f,g)): (a,b,c,d,e,f,g)) 

を私はあなたが最終的に必要なものはよく分かりませんそれはちょうど日付と2番目の値の合計ですか?もしそうなら、あなたはすべての値を必要としない:

rdd3 = rdd1.zip(rdd2).map(lambda ((a,b,c,d,e), (f,g)): (a,g)) 
rdd4 = rdd3.reduceByKey(lambda x, y: x+y) 
+0

はい、私はあなたのために働いて、この答えは、そう集約 –

+0

後の日付と最後の値を必要とする、またはあなたが必要これをもっと助けてくれますか? –

2

をあなたのRDDszipWithIndexする必要があり、この方法は、したがって、あなたが参加することができ、あなたのデータとし、そのエントリのインデックスを表し、別の値でタプルを作成し、両方ともRDDsによってindexである。

あなたのアプローチは(私は、より効率的な方法がある賭け)のようになります。

rdd1 = sc.parallelize([u"A", u"B", u"C", u"A", u"Z"]) 
rdd2 = sc.parallelize(xrange(5)) 

zdd1 = rdd1.zipWithIndex().map(lambda (v, k): (k, v)) 
zdd2 = rdd2.zipWithIndex().map(lambda (v, k): (k, v)) 

print zdd1.join(zdd2).collect() 

出力は次のようになります。 [(0, (u'A', 0)), (4, (u'Z', 4)), (1, (u'B', 1)), (2, (u'C', 2)), (3, (u'A', 3))]、これだけmapは、データを再構成するために必要とされた後。例えば。以下:

combinedRDD = zdd1.join(zdd2).map(lambda (k, v): v) 
print combinedRDD.collect() 

# You can use the .zip method combinedRDD = rdd1.zip(rdd2) 

出力は次のようになります。 [(u'A', 0), (u'Z', 4), (u'B', 1), (u'C', 2), (u'A', 3)]

データ型の変換については、私は前にその問題を抱えていると私はthis snippetを使用し、これを解決するために。

import unicodedata 

convert = lambda (v1, v2): (unicodedata.normalize('NFKD', v1) 
             .encode('ascii','ignore'), v2) 

combinedRDD = combinedRDD.map(convert) 
print combinedRDD.collect() 

ウィル出力:[('A', 0), ('Z', 4), ('B', 1), ('C', 2), ('A', 3)]

関連する問題