2016-05-31 9 views
0

大きなデータファイルをフィルタリングするためにスパークに問題があります。スパークによるCSVパーティショニング、マップ、フィルタ

CSVファイル内にNxM次元の行列(ヘッダーやその他のタグを除く)があるため、フィルタリング機能がtrueの場合は、各列をフィルタリングして行番号を取得する必要があります。

まず、databricksパッケージを使用してファイルをロードしてから、その列をマップしてフィルタリングします。私が小さなテストケースを使用するときは動作しますが、実際のケースでは決して終了しません。

sparkで実行を高速化するためには、パーティションを取得する必要があります。そのため、各エグゼキュータはタスクを並行して実行できます。これを念頭に置いて、私はエスガナリオの中で、各エグゼキュータ(Mパーティション)にカラムを割り当てることで、メモリ内に完全なcsvをロードする必要がないと考えました。それは可能ですか?

さらに簡単にすると、15k x 5kの0と1でいっぱいのNxM行列を想像してみてください。私は各列にどれくらいの1があるかを数えたいと思う。

1 0 0 0 1 0 0 0 1 
0 1 0 0 1 0 1 0 1 
1 0 0 0 1 0 0 0 1 
0 1 0 0 1 0 1 0 1 
1 0 0 0 1 0 0 0 1 
0 1 0 0 1 0 1 0 1

databricksのデータフレームdfをので、私はこのように1列にフィルタをかけることができます:

df.rdd.map(lambda r: r.C0).filter(lambda x: x == str(1)).count()
はまだ、これはすべてのデータをロードし、決して私のクラスタで終了します。

答えて

0

整数スキーマを使用してデータをロードする場合は、合計を使用すると文字列比較よりも効率的になります。

sc.textFile(yourfile).map(lambda line: [int(x) for x in line.split(";")]) 
関連する問題