2017-05-08 5 views
1

私は、整数の範囲のペアを含むRDDをとり、各ペアがその範囲内の可能な値を反復する第3項を持つように変換しようとしています。SparkのLambdasによるリスト変換

[[1,10], [11,20], [21,30]] 

そして、私はこれで終わるしたいと思います:基本的に、私はこれ持って

[[1,1,10], [2,1,10], [3,1,10], [4,1,10], [5,1,10]...] 

私が変換したいファイルは、私は「理由である、非常に大きいですローカルマシン上のPythonだけでなく、PySparkでこれを行うことを望んでいます(私はCSVファイルでローカルに行う方法がありますが、ファイルサイズが与えられてから数時間かかる)。これまでのところ、私はこれ持っている:私は、次のステップは、拡張範囲を反復処理するために、ここからにする必要があるかを把握し、することはできません

>>> c.collect() 
[[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 1, 10], [[11, 12, 13, 14, 15, 16, 17, 18, 19, 20], 11, 20], [[21, 22, 23, 24, 25, 26, 27, 28, 29, 30], 21, 30]] 

:得

a = [[1,10], [11,20], [21,30]] 
b = sc.parallelize(a) 
c = b.map(lambda x: [range(x[0], x[1]+1), x[0], x[1]]) 
c.collect() 

それらのそれぞれを範囲区切り文字とペアにします。

アイデア? CSV入力で動作します

EDIT 2017年5月8日午後3時

ローカルPythonの技法がある:

import csv 
import gzip 
csvfile_expanded = gzip.open('C:\output.csv', 'wb') 
ranges_expanded = csv.writer(csvfile_expanded, delimiter=',', quotechar='"') 
csvfile = open('C:\input.csv', 'rb') 
ranges = csv.reader(csvfile, delimiter=',', quotechar='"') 
for row in ranges: 
    for i in range(int(row[0]),int(row[1])+1): 
     ranges_expanded.writerow([i,row[0],row[1]) 

私はPySparkスクリプトが尋問は、CSVファイルで始まりますすでにHDFSにロードされ、RDDとしてキャストされています。

答えて

2

これを試してみてください:

c = b.flatMap(lambda x: ([y, x[0], x[1]] for y in xrange(x[0], x[1]+1))) 

flatMap()はあなたが範囲の要素ごとに1つの出力レコードを取得することを保証します。外側()xrangeと一緒に注意してください。これは、エグゼキュータのメモリ内の全範囲のマテリアライゼーションを避けるジェネレータ式です。

注:xrange()はPython2です。 Python3を使用している場合、range()

+0

これは完璧に動作します!ヘルプと説明に感謝します。私はラムダに 'for'ループを挿入する方法を理解できませんでしたが、あなたの解決策を見ても意味があります。 – nxl4

関連する問題