0
私は入力ファイルが "1行に1つの文書"で構成されていないので、非常に大きなコーパスを処理しようとしていますので、単にsc.textFile
を使ってファイルを直接読み込むことはできません。pysparkの入力として動的ジェネレータを使用
代わりに、私は停止シーケンスに遭遇するたびに文書化するジェネレータ機能を使用してファイルをロードしています。私はsc.parallelize
を使ってこのジェネレータをラップすることができますが、pysparkはすべてのデータを一度にRAMにロードすることになります。
これを回避する方法はありますか?または、私は間違いなく私のテキストファイルを変換する必要がありますか?ここで
は、私が実行したいものは基本的である:
def repaired_corpus(path):
_buffer = ""
for line in open(path):
doc_end = line.find(doc_end_pattern)
if doc_end != -1:
_buffer += line[:doc_end + len(doc_end_pattern)]
yield _buffer
_buffer = ""
else:
_buffer += line
some_state = sc.broadcast(my_state)
in_rdd = spark.sparkContext.parallelize(repaired_corpus(path))
json_docs = in_rdd.map(
lambda item: process_element(
item, some_state.value
)
).saveAsTextFile("processed_corpus.out")
オプションがありますあなたのニーズをサポートするファイル全体を読む(または以前のバージョンのテキストファイル全体を読むことができますか?) –
データセット全体を一度にRAMに読み込まないようにしているので、そうは思わない。 – pdowling
ファイルが大きすぎるとHDFS(またはノード間でファイルを分割するもの)がブロックサイズに分割され、2つのノード間に分割線があるとします。 –