2013-07-18 3 views
5

java Files.walkFileTreeまたは類似のものと同等のものを知っている人はいますか?これは、JavaまたはScalaライブラリにすることができます。Files.walkFileTree(javaまたはscala)のパラレルバージョン

+2

すべての並列スレッドが同じボトルネック(HDD)を持つため、意味をなさないと思います。そして、それはネットワークIO操作と並行することはできません。 – aim

+0

なぜあなたのファイルツリーを並行して歩くのが良いアイデアですか?これは、通常はIOバウンドであり、CPUバウンドではありません。 –

+0

私の場合、ファイル処理はCPUに依存し、I/Oの利用率は10%〜20%程度です。 – matt

答えて

3

各ファイルでコールバックを実行すれば十分だとします。

このコードは、ファイルシステム内でループを処理しません。そのためのレジストリが必要です(例:java.util.concurrent.ConcurrentHashMap)。追加することができるあらゆる種類の改善があります。例外を黙って無視するのではなく、例外を報告するなどです。

import java.io.File 
import scala.util._ 
def walk(f: File, callback: File => Unit, pick: File => Boolean = _ => true) { 
    Try { 
    val (dirs, fs) = f.listFiles.partition(_.isDirectory) 
    fs.filter(pick).foreach(callback) 
    dirs.par.foreach(f => walk(f, callback, pick)) 
    } 
} 

倍を使用してファイルを収集する代わりにforeach大幅に難しくはありませんが、私は読者への課題としてそのままにしておきます。 (ConcurrentLinkedQueueは、あなたが本当に遅いスレッドと素晴らしいファイルシステムを持っていない限り、とにかくコールバックでそれらすべてを受け入れるために、おそらく十分な速さである。)この演習では、Scalaの答えとしても、Javaの答えとしてのJava-などもないように簡単です

+0

実際に私はそれをし、いくつかの追加の未来を持っている '成熟した'ライブラリへのリンクを得ることを望んだが、あなたの例は私の現在のニーズに十分です。ありがとう! – matt

7

他の人も指摘しているように、ファイルツリーを歩くことはCPUバインディングではなくIOバインドであるため、マルチスレッドファイルツリーウォークの利点は疑問です。しかし、あなたが本当にしたいのであれば、おそらくForkJoinPoolまたはそれに類するもので自分自身をロールバックすることができます。

import java.io.IOException; 
import java.nio.file.FileVisitResult; 
import java.nio.file.Files; 
import java.nio.file.Path; 
import java.nio.file.Paths; 
import java.nio.file.SimpleFileVisitor; 
import java.nio.file.attribute.BasicFileAttributes; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.ForkJoinPool; 
import java.util.concurrent.RecursiveAction; 

public class MultiThreadedFileTreeWalk { 
    private static class RecursiveWalk extends RecursiveAction { 
     private static final long serialVersionUID = 6913234076030245489L; 
     private final Path dir; 

     public RecursiveWalk(Path dir) { 
      this.dir = dir; 
     } 

     @Override 
     protected void compute() { 
      final List<RecursiveWalk> walks = new ArrayList<>(); 
      try { 
       Files.walkFileTree(dir, new SimpleFileVisitor<Path>() { 
        @Override 
        public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { 
         if (!dir.equals(RecursiveWalk.this.dir)) { 
          RecursiveWalk w = new RecursiveWalk(dir); 
          w.fork(); 
          walks.add(w); 

          return FileVisitResult.SKIP_SUBTREE; 
         } else { 
          return FileVisitResult.CONTINUE; 
         } 
        } 

        @Override 
        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { 
         System.out.println(file + "\t" + Thread.currentThread()); 
         return FileVisitResult.CONTINUE; 
        } 
       }); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 

      for (RecursiveWalk w : walks) { 
       w.join(); 
      } 
     } 
    } 

    public static void main(String[] args) throws IOException { 
     RecursiveWalk w = new RecursiveWalk(Paths.get(".").toRealPath()); 
     ForkJoinPool p = new ForkJoinPool(); 
     p.invoke(w); 
    } 
} 

この例では、各ディレクトリを別のスレッドで処理します。 Java 7のfork/joinライブラリのチュートリアルです。

+0

各要素で実行する機能がある場合、過去の経験から、ファイルツリーを歩いて各ノードでタスクを並行して同時に実行すると、かなりのパフォーマンスが得られます。 – Hazok

+0

@ハゾク機能によります。機能がCPUを大量に使用する場合は、ファイルツリーを歩くIO境界よりも重要です。そうであれば、コードを並行させることは価値があるかもしれません。しかし、これは常にそうであるとは限りません。 – Jeffrey

+0

合意しました。それが私がこの声明を承認した理由です。答えに「疑わしい」と述べられて以来、パフォーマンスの向上が達成できるケースがあることを指摘したかっただけです。 – Hazok

3

ここでのアイデアは、デバイスごとにスレッドのようなもので平行歩行を開始することでした。

ウォーカーはForkJoinPoolスレッド上にあります。したがって、各パステストで未来を切り開くときは、プール上のフォークされたタスクです。ディレクトリテストでは、ディレクトリを読み込んでファイルを検索するときにマネージブロックが使用されます。

結果は、将来のパステストによって約束を完了することによって返されます。

さらに興味深いテストには、zipファイルの読み込みが含まれています。これは、圧縮解除によってCPUが一部使用されるためです。

paulp will do something clever with deep listingかと思います。

import util._ 
import collection.JavaConverters._ 
import concurrent.{ TimeoutException => Timeout, _ } 
import concurrent.duration._ 
import ExecutionContext.Implicits._ 
import java.io.IOException 
import java.nio.file.{ FileVisitResult => Result, _ } 
import Result.{ CONTINUE => Go, SKIP_SUBTREE => Prune, TERMINATE => Stop } 
import java.nio.file.attribute.{ BasicFileAttributes => BFA } 

object Test extends App { 
    val fileSystem = FileSystems.getDefault 
    val starts = (if (args.nonEmpty) args.toList else mounts) map (s => (fileSystem getPath s)) 
    val p = Promise[(Path, BFA)] 

    def pathTest(path: Path, attrs: BFA) = 
    if (attrs.isDirectory) { 
     val entries = blocking { 
     val res = Files newDirectoryStream path 
     try res.asScala.toList finally res.close() 
     } 
     List("hello","world") forall (n => entries exists (_.getFileName.toString == n)) 
    } else { 
     path.getFileName.toString == "enough" 
    } 

    def visitor(root: Path) = new SimpleFileVisitor[Path] { 
    def stopOrGo = if (p.isCompleted) Stop else Go 
    def visiting(path: Path, attrs: BFA) = { 
     future { pathTest(path, attrs) } onComplete { 
     case Success(true) => p trySuccess (path, attrs) 
     case Failure(e) => p tryFailure e 
     case _    => 
     } 
     stopOrGo 
    } 
    override def preVisitDirectory(dir: Path, attrs: BFA) = (
     if ((starts contains dir) && dir != root) Prune 
     else visiting(dir, attrs) 
    ) 
    override def postVisitDirectory(dir: Path, e: IOException) = { 
     if (e != null) p tryFailure e 
     stopOrGo 
    } 
    override def visitFile(file: Path, attrs: BFA) = visiting(file, attrs) 
    } 
    //def walk(p: Path): Path = Files walkFileTree (p, Set().asJava, 10, visitor(p)) 
    def walk(p: Path): Path = Files walkFileTree (p, visitor(p)) 

    def show(store: FileStore) = { 
    val ttl = store.getTotalSpace/1024 
    val used = (store.getTotalSpace - store.getUnallocatedSpace)/1024 
    val avail = store.getUsableSpace/1024 
    Console println f"$store%-40s $ttl%12d $used%12d $avail%12d" 
    store 
    } 
    def mounts = { 
    val devs = for { 
     store <- fileSystem.getFileStores.asScala 
     if store.name startsWith "/dev/" 
     if List("ext4","fuseblk") contains store.`type` 
    } yield show(store) 
    val devstr = """(\S+) \((.*)\)""".r 
    (devs.toList map (_.toString match { 
     case devstr(name, dev) if devs.toList exists (_.name == dev) => Some(name) 
     case s => Console println s"Bad dev str '$s', skipping" ; None 
    })).flatten 
    } 

    starts foreach (f => future (walk(f))) 

    Try (Await result (p.future, 20.seconds)) match { 
    case Success((name, attrs)) => Console println s"Result: ${if (attrs.isDirectory) "dir" else "file"} $name" 
    case Failure(e: Timeout) => Console println s"No result: timed out." 
    case Failure(t)    => Console println s"No result: $t." 
    } 
} 
+0

このコードを書いてくれてありがとうございました。私はRex Kerrソリューションを受け入れることに決めました。なぜなら、それは簡単でデバッグしやすいからです。 – matt

+0

@lucek Rexが最高です。 Thxの質問には、APIを探索するのは楽しいことでした。私は他の答えも上げました。 –

関連する問題