0
私は結果として通常のファイルと他のものをリストするメインファイルを生成します。マルチスレッド+ RxJavaは、条件で観察します
スケジューラは、このファイルを1日に1回cronで再生成します。
タスクフローはrx-java
を使用して実装されます。
問題は、1つの要求が入ってタスクを開始するか、タスクがスケジューラによって実行された後、タスクが進行中に別の要求が来て、タスクが完了するのを待たずに、別の実行を開始します。
タスクの実行を同期する方法が問題なので、1回だけ実行されますか?
これはサンプルコードです:
@Service
public class FileService {
@Autowired FileRepository fileRepository;
@Autowired List<Pipeline> pipelines;
public Observable<File> getMainFile() {
if (fileRepository.isMainFileExists())
return Observable.just(fileRepository.getMainFile());
else
return generate(() -> fileRepository.getMainFile());
}
public Observable<File> getFile(String fileName) {
if (fileRepository.isMainFileExists())
return Observable.just(fileRepository.getFile(fileName));
else
return generate(() -> fileRepository.getFile(fileName));
}
Observable<File> generate(Func0<File> whenGenerated) {
return Observable.from(pipelines)
// other business logic goes here
// after task execution finished just get needed file
.map(isAllPipelinesSuccessful -> {
return whenGenerated.call();
});
}
@Scheduled(cron = "0 0 4 * * ?")
void scheduleGeneration() {
generate(() -> fileRepository.getMainFile()).subscribe();
}
}
そして、それは以下のサンプルコード、コントローラから呼ばれています:
@RestController
public class FileController {
private static final Long TIMEOUT = 1_000 * 60 * 10L; //ten mins
@Autowired FileService fileService;
@RequestMapping(value = "/mainfile", produces = "application/xml")
public DeferredResult<ResponseEntity<InputStreamResource>> getMainFile() {
DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT);
Observable<File> observableMainFile = fileService.getMainFile();
observableMainFile
.map(this::fileToInputStreamResource)
.map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource))
.subscribe(deferredResult::setResult, ex -> {
deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null));
});
return deferredResult;
}
@RequestMapping(value = "/files/{filename:.+}", produces = "application/xml")
public DeferredResult<ResponseEntity<InputStreamResource>> getFile(@PathVariable("filename") String filename) {
DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT);
Observable<File> observableFile = fileService.getFile(filename);
observableFile
.map(this::fileToInputStreamResource)
.map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource))
.subscribe(deferredResult::setResult, ex -> {
boolean isFileNotFound = FileNotFoundException.class.isInstance(ex.getCause());
HttpStatus status = isFileNotFound ? HttpStatus.NOT_FOUND : HttpStatus.INTERNAL_SERVER_ERROR;
deferredResult.setErrorResult(ResponseEntity.status(status).body(null));
});
return deferredResult;
}
}
は私を修正:この方法によって生成するすべてのコールがスレッドが限定されますが、1が完了すると、彼らはそう、エンキューされていませんもう1つは呼ばれますか? – marknorkin
ええ、そうです。私は私の答えを編集します。私はあなたのチェックをCallableから追加しますが。 –
コードはちょっと面倒ですが、アイデアは1つのスレッドにサブスクリプションをエンキューして実行を延期することです。したがって、それぞれがキューから引き出されるときに追加チェックが行われますか? – marknorkin