私は2つのRDDs
PySparkで持っているの日付データに基づいて行の集計を行う方法:単一RDDからの2 RDDSの列を追加し、その後PySpark
RDD1:
[(u'2013-01-31 00:00:00', u'a', u'Pab', u'abc', u'd'),(u'2013-01-31 00:00:00', u'a', u'ab', u'abc', u'g'),.....]
RDD2:
[(u'41',u'42.0'),(u'24',u'98.0'),....]
どちらRDDs
は同じ番号または行を持っています。ここでは、各行のすべての列をRDD1(unicode
を標準string
に変換)からRDD2の各行(unicode string
からfloat
に変換)の各行から取り出し、新しいRDDを作成します。だから、新しいRDDは、次のようになります。
RDD3:
[('2013-01-31 00:00:00', 'a', 'Pab', 'abc', 'd',42.0),('2013-01-31 00:00:00', 'a', 'ab', u'abc', 'g',98.0),.....]
それは、私がすることで、この新しいRDD3
の各行(float値)の最後の値のaggregation
をしたい完了したら第1列の値はdate
です。それはdate
が2013-01-31 00:00:00
であるすべての行を処理し、最後の数値を追加する必要があります。
どのように私はPySparkでこれを行うことができますか?あなたの質問の最初の部分については
あなたは、このように私はあなたがzip圧縮しなければならない... –
@AlbertoBonsantoはあなたが私はそれを行うことができますどのように示すことができると考え、それらを結合するための鍵を持っていませんか? –
@AlbertoBonsantoは 'rdd3 = izip(rdd1。toLocalIterator()、rdd2.toLocalIterator()) 'で十分ですか? –