2016-10-25 7 views
0

私は結果として通常のファイルと他のものをリストするメインファイルを生成します。マルチスレッド+ RxJavaは、条件で観察します

スケジューラは、このファイルを1日に1回cronで再生成します。

タスクフローはrx-javaを使用して実装されます。

問題は、1つの要求が入ってタスクを開始するか、タスクがスケジューラによって実行された後、タスクが進行中に別の要求が来て、タスクが完了するのを待たずに、別の実行を開始します。

タスクの実行を同期する方法が問題なので、1回だけ実行されますか?

これはサンプルコードです:

@Service 
public class FileService { 
    @Autowired FileRepository fileRepository; 
    @Autowired List<Pipeline> pipelines; 

    public Observable<File> getMainFile() { 
     if (fileRepository.isMainFileExists()) 
      return Observable.just(fileRepository.getMainFile()); 
     else 
      return generate(() -> fileRepository.getMainFile()); 
    } 

    public Observable<File> getFile(String fileName) { 
     if (fileRepository.isMainFileExists()) 
      return Observable.just(fileRepository.getFile(fileName)); 
     else 
      return generate(() -> fileRepository.getFile(fileName)); 
    } 

    Observable<File> generate(Func0<File> whenGenerated) { 
     return Observable.from(pipelines) 
       // other business logic goes here 
       // after task execution finished just get needed file 
       .map(isAllPipelinesSuccessful -> { 
        return whenGenerated.call(); 
       }); 
    } 

    @Scheduled(cron = "0 0 4 * * ?") 
    void scheduleGeneration() { 
     generate(() -> fileRepository.getMainFile()).subscribe(); 
    } 
} 

そして、それは以下のサンプルコード、コントローラから呼ばれています:

@RestController 
public class FileController { 
    private static final Long TIMEOUT = 1_000 * 60 * 10L; //ten mins 
    @Autowired FileService fileService; 

    @RequestMapping(value = "/mainfile", produces = "application/xml") 
    public DeferredResult<ResponseEntity<InputStreamResource>> getMainFile() { 
     DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT); 
     Observable<File> observableMainFile = fileService.getMainFile(); 
     observableMainFile 
       .map(this::fileToInputStreamResource) 
       .map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource)) 
       .subscribe(deferredResult::setResult, ex -> { 
       deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null)); 
       }); 
     return deferredResult; 
    } 
    @RequestMapping(value = "/files/{filename:.+}", produces = "application/xml") 
    public DeferredResult<ResponseEntity<InputStreamResource>> getFile(@PathVariable("filename") String filename) { 
     DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT); 
     Observable<File> observableFile = fileService.getFile(filename); 
     observableFile 
       .map(this::fileToInputStreamResource) 
       .map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource)) 
       .subscribe(deferredResult::setResult, ex -> { 
        boolean isFileNotFound = FileNotFoundException.class.isInstance(ex.getCause()); 
        HttpStatus status = isFileNotFound ? HttpStatus.NOT_FOUND : HttpStatus.INTERNAL_SERVER_ERROR; 
        deferredResult.setErrorResult(ResponseEntity.status(status).body(null)); 
       }); 
     return deferredResult; 
    } 
} 

答えて

1

私は次のようなものを持っているが、私はこれまでの方法より良い解決策があると思います。私はRxJava2-RC5を使用しています。

  1. 回答がないと、そのタスクが実行されました。 https://gist.github.com/anonymous/7b4717cea7ddce270a2e39850a3bd2a4

私が間違っている場合はUPDATE ::

interface FileRepository { 
     String getFile(); 

     Boolean isMainFileExists(); 
} 

private static Scheduler executorService = Schedulers.from(Executors.newFixedThreadPool(1)); 

@org.junit.Test 
public void schedulerTest123() throws Exception { 
     FileRepository fRepo = mock(FileRepository.class); 

     when(fRepo.getFile()).thenReturn(""); 
     when(fRepo.isMainFileExists()).thenReturn(false); 

     Thread t1 = new Thread(() -> { 
      getFile(fRepo, executorService).subscribe(); 
     }); 

     Thread t2 = new Thread(() -> { 
      getFile(fRepo, executorService).subscribe(); 
     }); 

     t1.start(); 
     t2.start(); 

     Thread.sleep(3_000); 

     when(fRepo.getFile()).thenReturn("DasFile"); 
     when(fRepo.isMainFileExists()).thenReturn(true); 

     Thread t3 = new Thread(() -> { 
      getFile(fRepo, executorService).subscribe(); 
     }); 

     t3.start(); 

     Thread.sleep(5_000); 
} 

private Observable<String> getFile(FileRepository fileRepo, Scheduler scheduler) { 
     return Observable.defer(() -> { 
      try { 
       if (fileRepo.isMainFileExists()) { 
        return Observable.fromCallable(fileRepo::getFile) 
          .subscribeOn(Schedulers.io()) 
          .doOnNext(s -> printCurrentThread("Get File from Repo")); 
       } else { 
        return startLongProcess().doOnNext(s -> printCurrentThread("Push long processValue")); 
       } 

      } catch (Exception ex) { 
       return Observable.error(ex); 
      } 
     }).subscribeOn(scheduler).doOnSubscribe(disposable -> printCurrentThread("SUB")); 
    } 

private Observable<String> startLongProcess() { 
     return Observable.fromCallable(() -> { 
      printCurrentThread("Doing LongProcess"); 

      Thread.sleep(5_000); 

      return "leFile"; 
     }); 
} 

private void printCurrentThread(String additional) { 
     System.out.println(additional + "_" + Thread.currentThread()); 
} 
+0

は私を修正:この方法によって生成するすべてのコールがスレッドが限定されますが、1が完了すると、彼らはそう、エンキューされていませんもう1つは呼ばれますか? – marknorkin

+0

ええ、そうです。私は私の答えを編集します。私はあなたのチェックをCallableから追加しますが。 –

+0

コードはちょっと面倒ですが、アイデアは1つのスレッドにサブスクリプションをエンキューして実行を延期することです。したがって、それぞれがキューから引き出されるときに追加チェックが行われますか? – marknorkin

関連する問題