2017-07-20 9 views
0

私は入力ファイルが "1行に1つの文書"で構成されていないので、非常に大きなコーパスを処理しようとしていますので、単にsc.textFileを使ってファイルを直接読み込むことはできません。pysparkの入力として動的ジェネレータを使用

代わりに、私は停止シーケンスに遭遇するたびに文書化するジェネレータ機能を使用してファイルをロードしています。私はsc.parallelizeを使ってこのジェネレータをラップすることができますが、pysparkはすべてのデータを一度にRAMにロードすることになります。

これを回避する方法はありますか?または、私は間違いなく私のテキストファイルを変換する必要がありますか?ここで

は、私が実行したいものは基本的である:

def repaired_corpus(path): 
    _buffer = "" 
    for line in open(path): 
     doc_end = line.find(doc_end_pattern) 
     if doc_end != -1: 
      _buffer += line[:doc_end + len(doc_end_pattern)] 
      yield _buffer 
      _buffer = "" 
     else: 
      _buffer += line 

some_state = sc.broadcast(my_state) 
in_rdd = spark.sparkContext.parallelize(repaired_corpus(path)) 
json_docs = in_rdd.map(
    lambda item: process_element(
     item, some_state.value 
    ) 
).saveAsTextFile("processed_corpus.out") 
+0

オプションがありますあなたのニーズをサポートするファイル全体を読む(または以前のバージョンのテキストファイル全体を読むことができますか?) –

+0

データセット全体を一度にRAMに読み込まないようにしているので、そうは思わない。 – pdowling

+0

ファイルが大きすぎるとHDFS(またはノード間でファイルを分割するもの)がブロックサイズに分割され、2つのノード間に分割線があるとします。 –

答えて

2

あなたは基本的に答えhere

を使用して試すことができます少し古いものの:スパーク2.2で

rdd = sc.newAPIHadoopFile(path, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", 
      "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", 
      conf={"textinputformat.record.delimiter": doc_end_pattern}).map(lambda l:l[1]) 
関連する問題