2016-11-24 196 views
3

ファイルを追加するディレクトリを監視するコードがあります。新しいファイルがディレクトリに追加されるたびに、ファイルの内容が選択され、kafkaに公開され、ファイルが削除されます。java - プロセスが別のプロセスで使用されているため、ファイルにアクセスできません。

これは私が1回のリクエストをしたときに動作しますが、コードをjMeterから5または10のユーザー要求に従わせると、内容はkafkaに正常に公開されますが、コードはファイルを削除できません。私はThe process cannot access the file because it is being used by another process.というメッセージでFileSystemExceptionを得る。

私は見ることができないいくつかの並行性の問題があると思います。助けてください !

public void monitor() throws IOException, InterruptedException { 
    Path faxFolder = Paths.get(TEMP_FILE_LOCATION); 
    WatchService watchService = FileSystems.getDefault().newWatchService(); 
    faxFolder.register(watchService, StandardWatchEventKinds.ENTRY_CREATE); 
    boolean valid = true; 
    do { 
     WatchKey watchKey = watchService.take(); 
     for (WatchEvent<?> event : watchKey.pollEvents()) { 
      if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { 
       String fileName = event.context().toString(); 
       publishToKafka(new File(TEMP_FILE_LOCATION + fileName).toPath(), "topic"); 
      } 
     } 
     valid = watchKey.reset(); 
    } while (valid); 
} 

private void publishToKafka(Path path, String topic) { 
    try (BufferedReader reader = Files.newBufferedReader(path)) { 
     String input = null; 
     while ((input = reader.readLine()) != null) { 
      kafkaProducer.publishMessageOnTopic(input, topic); 
     } 
    } catch (IOException e) { 
     LOG.error("Could not read buffered file to send message on kafka.", e); 
    } finally { 
     try { 
      Files.deleteIfExists(path); // This is where I get the exception 
     } catch (IOException e) { 
      LOG.error("Problem in deleting the buffered file {}.", path.getFileName(), e); 
     } 
    } 
} 

例外ログ:

java.nio.file.FileSystemException: D:\upload\notif-1479974962595.csv: The process cannot access the file because it is being used by another process. 

    at sun.nio.fs.WindowsException.translateToIOException(Unknown Source) 
    at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source) 
    at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source) 
    at sun.nio.fs.WindowsFileSystemProvider.implDelete(Unknown Source) 
    at sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(Unknown Source) 
    at java.nio.file.Files.deleteIfExists(Unknown Source) 
    at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.publishToKafka(MonitorDirectory.java:193) 
    at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.sendData(MonitorDirectory.java:125) 
    at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.run(MonitorDirectory.java:113) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
+1

ファイルを作成して削除しようとする前に、ファイルを作成するプロセスが作成を完了していない(閉じていない)ことはありますか? –

+0

ところで、WatchServiceはクローズ可能ですので、try-with-resourcesの中に 'newWatchService()'を入れてください。また、あなたはどこにでもnioを使っているので、 'new File(...)の代わりに一貫性を持たせるためにtoPath()'は 'faxFolder.resolve(filename)'を実行します。 –

+3

@Klitos Kyriacou:ファイルシステムを見ているときに 'WatchEvent'のコンテキストが既に' Path'であるため、stringへの変換も廃止され、 'faxFolder.resolve((Path)event.context())'で十分です。私はあなたが正しいと思う、作成イベントに反応することは、すぐにクリエイターにファイルを閉じるのに十分な時間を与えるものではない。興味深いことに、私のシステムでは、これとは逆の効果があります。ファイルの読み込みは失敗しますが、ファイルの読み込みは失敗しますが、実際の削除がポイントに実際に延期されるので、クリエイターはファイルを閉じます。 – Holger

答えて

0

あなたはそれを削除する前に、そのファイルにアクセスしているすべての接続を閉じる必要があります。

+1

'try-with-resources'は' finally'コードを実行する前にそれをやっていると思います。 –

0

あなたのコードを見ると、1つのファイルがもう1つのスレッドが別のスレッドを公開するためにスレッドによって選択されたときに、そのスレッドが公開用に選択されたようです。だから誰もそれを削除することができない。 同時実行の問題でなければなりません。同時に実行できる手順とできない手順を基準にコードを再設計する必要があります。スレッドと呼ばれる

  1. は、ファイル(メインスレッドがそれを行う必要があります)
  2. を拾うファイルを公開(それを行うには、他のスレッドを呼び出す)
  3. ファイルを削除します(必要があります。 だから、全体のプロセスのステップがありますまた、任意のファイルの存在する場合のチェック(再びメインスレッドがそれを行うことができます)

ファイルが選択された瞬間

  • )それを削除し、あなたは、バッファにそれを読んで、それを削除して公開を続行することができます。これは、メインスレッドがこのファイルを他のスレッドに割り当てていないことを確認します。

  • +1

    OPの投稿にスレッドが何も表示されません。 –

    0

    動的に作成されたファイルをキューサービスにアップロードしようとすると、解決するのに2日かかっていたときに、次のスレッドと同様の問題が発生しました。Multithreading on Queue Holger上記のように答えを出してくれたおかげで、ロックが発生したのは、別のスレッドが読み込んだときに作成が完全に行われなかったことが原因である可能性があります。

    WatchEvent<Path> ev = cast(event); 
    Path name = ev.context(); 
    Path child = dir.resolve(name); 
    //queueUploadFile(child); 
    if (kind == ENTRY_CREATE) { 
        uploadToQueue(this.queueId, child); 
    } 
    

    は、私はそれを変更:

    インターネットから見つかったたくさんのように私の最初の溶液であったが、

    WatchEvent<Path> ev = cast(event); 
    Path name = ev.context(); 
    Path child = dir.resolve(name); 
    //queueUploadFile(child); 
    if (kind == ENTRY_MODIFY) { 
        uploadToQueue(this.queueId, child); 
    } 
    

    、すべてが完璧に動作します。 「何らかの形で」複数のENTRY_MODIFYイベントを処理するために、アップロードされたuploadToQueue()メソッド内のファイルに対して削除を実行します。

    上記の貢献に基づいて私のアプローチを取っても、同様の問題を持つ他の人に役立つことを望みます。

    関連する問題