2017-02-06 6 views
2

私はsparkクラスタ内の単一ノードのメモリに収まる大きな文字列IDのデータセットを持っています。問題は、単一ノードのメモリの大部分を消費することです。PySparkでメモリ効率の良いデカルト結合

これらのIDは約30文字です。例:

ids 
O2LWk4MAbcrOCWo3IVM0GInelSXfcG 
HbDckDXCye20kwu0gfeGpLGWnJ2yif 
o43xSMBUJLOKDxkYEQbAEWk4aPQHkm 

私はすべてのIDのペアのリストをファイルに書きます。例:

id1,id2 
O2LWk4MAbcrOCWo3IVM0GInelSXfcG,HbDckDXCye20kwu0gfeGpLGWnJ2yif 
O2LWk4MAbcrOCWo3IVM0GInelSXfcG,o43xSMBUJLOKDxkYEQbAEWk4aPQHkm 
HbDckDXCye20kwu0gfeGpLGWnJ2yif,O2LWk4MAbcrOCWo3IVM0GInelSXfcG 
# etc... 

したがって、データセット自体をクロスジョインする必要があります。私はPySparkで10ノードクラスタを使ってこれを行うことを望んでいましたが、メモリ効率が必要です。

+0

いくつのレコードデータセットに含まれていますか?どのノードにどのくらいのメモリがありますか?純粋なRDDまたはDataframes APIを使用していますか? – Mariusz

+0

@Mariusz今のところ、これはマスター上のテキストファイルですが、メモリ内のPythonリストに読み込むと、8GBのRAMの約80%が消費されます。リストの長さは約100Mレコードです。 RDDまたはDataframeのいずれかにデータセットを置くことができます。 – mgoldwasser

答えて

3

pySparkはあなたのデータセットを簡単かつ効率的に処理しますが、10^8 * 10^8レコード(これはクロスジョイン結果の推定サイズ)を処理するのに時間がかかります。サンプルコード:

from pyspark.sql.types import * 
df = spark.read.csv('input.csv', header=True, schema=StructType([StructField('id', StringType())])) 
df.withColumnRenamed('id', 'id1').crossJoin(df.withColumnRenamed('id', 'id2')).show() 
関連する問題