2017-09-28 7 views
1

CSVデータをJavaストリームにダンプする前に効率的にCSVデータを前処理する方法を探しています。通常の状況下ではCSVデータを並列ストリーミング前またはストリーミング前に効率的に前処理する

私は、ファイルを処理するために、このような何かをするだろう。それらをストリーミングし、私のコレクション内の各項目が依存する可能性がありながら、私は、レコードの前または前処理する必要がある。この現在のケースではしかし

File input = new File("helloworld.csv"); 
InputStream is = new FileInputStream(input); 
BufferedReader br = new BufferedReader(new InputStreamReader(is)); 
br.lines().parallel().forEach(line -> { 
    System.out.println(line); 
}); 

を以前。ここでは、問題を実証するための簡単な例であるCSVファイル:それはレコードからレコードを変更したときに私の例CSVで

species, breed, name 
dog, lab, molly 
, greyhound, stella 
, beagle, stanley 
cat, siamese, toby 
, persian, fluffy 

種の欄にのみ移入されます。私は単純な答えが私のCSV出力を修正することであることを知っているが、この場合は不可能です。

私はCSVからレコードを処理し、前のレコードの種別値を空白にしてコピーし、前処理後にパラレルストリームに渡すという合理的で効率的な方法を探しています。

ダウンストリーム処理には長い時間がかかるため、最終的には前処理が完了すると同時に処理する必要があります。私のCSVファイルも大きくすることができますので、まずファイル全体をメモリ内のオブジェクトにロードしないようにします。

私はを以下(警告悪い擬似コード)のような何かをするいくつかの方法があった期待していた。

parallelStream.startProcessing 

while read line { 
    if (line.doesntHaveSpecies) { 
     line.setSpecies 
    } 
    parallelStream.add(line) 
} 

私の現在のソリューションは、ファイル全体を処理し、それをストリーム「それは修正する」ことです。ファイルが大きくなる可能性があるので、レコードが「固定」され、ファイル全体が処理される直前にレコードの処理を開始するとよいでしょう。

答えて

2

状態をSpliteratorにカプセル化する必要があります。

try(Reader r = new FileReader(input); 
    BufferedReader br = new BufferedReader(r)) { 

    getStream(br).forEach(System.out::println); 
} 

Spliteratorとして使用することができる

private static Stream<String> getStream(BufferedReader br) { 
    return StreamSupport.stream(
     new Spliterators.AbstractSpliterator<String>(
              100, Spliterator.ORDERED|Spliterator.NONNULL) { 
      String prev; 
      public boolean tryAdvance(Consumer<? super String> action) { 
       try { 
        String next = br.readLine(); 
        if(next==null) return false; 
        final int ix = next.indexOf(','); 
        if(ix==0) { 
         if(prev==null) 
          throw new IllegalStateException("first line without value"); 
         next = prev+next; 
        } 
        else prev=ix<0? next: next.substring(0, ix); 
        action.accept(next); 
        return true; 
       } catch (IOException ex) { 
        throw new UncheckedIOException(ex); 
       } 
      } 
     }, false); 
} 

は常に順次横断します。並列処理がオンになっている場合、Stream実装はtrySplitを呼び出すことによって、他のスレッドに対して新しいSpliteratorインスタンスを取得しようとします。その操作のための効率的な戦略を提供することはできないので、配列ベースのバッファリングを行うAbstractSpliteratorからデフォルトを継承します。これは、常に正しく動作しますが、後続のストリームパイプラインで計算量が多い場合には、その恩恵を受けるだけです。それ以外の場合は、単に順次実行を続けることができます。

+0

私はこの質問と答えを週末にかけてカスタムSpliteratorで取ると思っていました。 – Eugene

+0

時間のかかる操作では必ずしも計算上重大ではない(例えばREST終点)? –

+0

I/O操作と環境によっては、利点があるかもしれませんが、Stream APIは計算に合わせた構成を使用します。つまり、CPUコアの数に一致するターゲット並行性を使用します。 I/O操作。 – Holger

1

前の行から種を取得するためには、順次実行する必要があるため、並列ストリームで開始することはできません。したがって、副作用マッパーを導入することができます:

final String[] species = new String[1]; 
final Function<String, String> speciesAppending = l -> { 
    if (l.startsWith(",")) { 
     return species[0] + l; 
    } else { 
     species[0] = l.substring(0, l.indexOf(',')); 
     return l; 
    } 
}; 

try (Stream<String> stream = Files.lines(new File("helloworld.csv").toPath())) { 
    stream.map(speciesAppending).parallel()... // TODO 
} 
+0

それで、マッパーへの後続の呼び出しで参照される先の種を追跡/保存する最良の方法は何ですか?私はあなたが種をどこで定義しているのか尋ねていると思いますか? –

+1

これは単純な文字列配列で、答えをコードに入れて更新しました。 –

関連する問題