2017-11-12 19 views
3

Stream.forEach()を使用している場合、ストリームが空でないときに実行される事前アクションと事後アクションを追加できないと考えていました。たとえば、Listを印刷するときに、ストリームが空のときに何かを前置きするか、何かを書くことができます。 Javaストリーム - 事前アクションとポストアクションのある

は、今私はシーケンシャルストリームのために

private static <T> void forEach(Stream<T> stream, Consumer<? super T> action, 
    Runnable preAction, Runnable postAction, Runnable ifEmpty) { 
    AtomicBoolean hasElements = new AtomicBoolean(false); 
    Consumer<T> preActionConsumer = x -> { 
     if (hasElements.compareAndSet(false, true)) { 
      preAction.run(); 
     } 
    }; 
    stream.forEach(preActionConsumer.andThen(action)); 
    if (hasElements.get()) { 
     postAction.run(); 
    } else { 
     ifEmpty.run(); 
    } 
} 

ような何かを思い付いた、これはいけない、動作するはず? この方法は正しいですか?それは「良いアイデア」がそのような方法を持っているのか、それとも何か警告がありますか?は、actionを実行している別のスレッドよりも遅いかもしれませんが、並列ストリームの目的を無効にする同期またはその他の並行性ユーティリティに頼らずに正しく実装するため、並列ストリームでは機能しません。 ..

編集:追加ユースケース。正規表現を使用してファイルから整数を検索し、別のファイルに書き出します。このアプローチを使用すると、メモリ内に文字列を作成する必要はなく、その後にファイルに書き込むことができます。 (もちろん、私の本当の仕事のために、私はより複雑な正規表現を使用しています。)

public static void main(String[] args) throws IOException { 
    Stream<String> lines = Files.lines(Paths.get("foo.txt")); 

    Pattern findInts = Pattern.compile("(\\d+)"); 
    Path barFile = Paths.get("bar.txt"); 
    try (BufferedWriter writer = Files.newBufferedWriter(barFile , StandardOpenOption.CREATE_NEW)) { 
     lines.flatMap(x -> findInts.matcher(x).results()) 
       .forEach(x-> convertCheckedIOException(() -> { 
          writer.write(x.group(1)); 
          writer.newLine(); 
         }) 
       ); 
    } 
} 

public static void convertCheckedIOException(Run r) { 
    try { 
     r.run(); 
    } catch (IOException e) { 
     throw new UncheckedIOException(e); 
    } 
} 

interface Run { 
    void run() throws IOException; 
} 
+2

Streamが_not_空であれば 'isEmpty'は' true'です... –

+0

真実ですが、変数名は質問の要点ではないので、私はそれを変更すると思います – user140547

+0

私はちょっと試してみました...もしかしたら、プレアクションがとにかく終了するまでアクションを止めたいのであれば、ポストアクションのようにストリームの外で実行する方が簡単でしょうか? preActionConsumerは、要素があるかどうかをチェックするために使用されます(また、ブール値へのアクセスを毎回禁止するためにデリゲートを切り替えることもできます)。 –

答えて

2

あなたの仕事のための適切なツールを使用してください:1つの警告は、並列スレッドの最初のバッチは、彼らが​​に実行されるよう終えることが彼らの最初のを待たなければならないということです。このタスクはStream APIの恩恵を受けません。

Pattern intPattern = Pattern.compile("\\d+"); 
try(Scanner scanner = new Scanner(Paths.get("foo.txt")); 
    BufferedWriter writer = Files.newBufferedWriter(Paths.get("bar.txt"), CREATE_NEW)) { 

    String s = scanner.findWithinHorizon(intPattern, 0); 
    if(s == null) { 
     // perform empty action 
    } else { 
     // perform pre action 
     do { 
      writer.append(s); 
      writer.newLine(); 
     } while((s=scanner.findWithinHorizon(intPattern, 0)) != null); 
     // perform post action 
    } 
} 

ストリーム操作を引き続き行うことができます。

Pattern intPattern = Pattern.compile("\\d+"); 
try(Scanner scanner = new Scanner(Paths.get("foo.txt")); 
    BufferedWriter writer = Files.newBufferedWriter(Paths.get("bar.txt"), CREATE_NEW)) { 

    String firstLine = scanner.findWithinHorizon(intPattern, 0); 
    if(firstLine == null) { 
     // perform empty action 
    } else { 
     // perform pre action 
     Stream.concat(Stream.of(firstLine), 
         scanner.findAll(intPattern).map(MatchResult::group)) 
      .forEach(line -> convertCheckedIOException(() -> { 
        writer.write(line); 
        writer.newLine(); 
       }) 
      ); 
     // perform post action 
    } 
} 

が、確認IOExceptionに対処することは、単に無利益のためにコードを複雑にします。

1

私はこのようなutilのを持っていることのアイデアが好きです。最初は、プレアクションによって設定/設定解除された第2のフラグを使用して、アクションが停止していると考えました。しかし、それをより複雑にするのは、preActionが最初のアクションコールだけでなくすべてのアクションコールの前に置かれるという事実です。

私は、preactionspost/emptyの順番を強制する同期化ソリューションを考え出しました。

private static <T> void forEach(Stream<T> stream, Consumer<? super T> action, Runnable preAction, Runnable postAction, Runnable ifEmpty) 
{ 
    AtomicBoolean hasElements = new AtomicBoolean(false); 

    stream.forEach(new Consumer<T>() 
    { 
     private Consumer<? super T> delegate = new Consumer<T>() 
     { 
      private Consumer<? super T> delegate2 = new Consumer<T>() 
      { 
       @Override 
       public void accept(T x) 
       { 
        System.out.println("check"); 
        hasElements.set(true); 
        preAction.run(); 
        action.accept(x); 
        delegate2 = action; // rest of first batch won't run preAction anymore 
        delegate = action; // next batches won't even synchronize anymore 
       } 
      }; 

      @Override 
      public void accept(T x) 
      { 
       synchronized (this) 
       { 
        delegate2.accept(x); 
       } 
      } 
     }; 

     @Override 
     public void accept(T x) 
     { 
      delegate.accept(x); 
     } 
    }); 

    if (hasElements.get()) { postAction.run(); } else { ifEmpty.run(); } 
} 

public static void main(String[] args) 
{ 
    Stream<Integer> s = Stream.generate(() -> 1).limit(1000).parallel(); 
    forEach(s, i -> System.out.println(Thread.currentThread().getId()),() -> System.out.println("pre"), 
      () -> System.out.println("post"),() -> System.out.println("empty")); 
} 

Output: 
check 
pre 
... 
many thread IDs 
... 
post 
+0

このような複雑なソリューションを作成する前に、実際の作業に関するOPからの詳細をご希望です。たとえば、並列サポートが意図されている場合、 'forEach'は順番に実行されませんが、' forEachOrdered'は同期を廃止します。それで逐次実行もそうです。しかし、異なる順序で実行することが問題ではない場合、「コレクタ」ベースのソリューションはより簡単になります。 – Holger

+0

コレクタがフィニッシャで 'postAction'と' empty'をどのように扱うことができるかを見ることができますが、 'preAction'をどう扱うのでしょうか?それと同様の状態を保つ必要はありませんか? –

+0

おそらく私はそれについてもう少し考えなければならないかもしれませんが、最初の質問は、そのような解決策が適切であるかどうかは明らかではないと考えているかどうかということです。 – Holger

関連する問題