2012-05-25 7 views
11

最近、私はAmazon Web Services(AWS)で作業していましたが、その件に関する資料があまりないことに気付きましたので、私のソリューションを追加しました。JavaアプリケーションでElastic MapReduceジョブフローの完了を待つ方法はありますか?

私はAmazon Elastic MapReduce(Amazon EMR)を使用してアプリケーションを作成していました。計算が終了した後、私はそれらによって作成されたファイルについていくつかの作業を実行する必要があったので、ジョブフローがいつ仕事を完了したかを知る必要がありました。

これはあなたのジョブフローが完了したかどうかをチェックする方法です。

AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials); 

DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowsRequest() 
    .withJobFlowStates("COMPLETED"); 

List<JobFlowDetail> jobs = mapReduce.describeJobFlows(jobAttributes).getJobFlows(); 
JobFlowDetail detail = jobs.get(0); 

detail.getJobFlowId(); //the id of one of the completed jobs 

ます。またDescribeJobFlowsRequestに特定のジョブIDを探すことができますし、そのジョブは失敗で終了しているかどうかを確認します。

他人に役立つことを願っています。

+5

すぐしかし、希望のアプローチは見、質問とまだ答えにこれを分割することです、ここは非常に歓迎されている[それはあなた自身の質問を尋ね、回答するOKです](HTTP ://blog.stackoverflow.com/2011/07/its-ok-to-ask-and-answer-your-own-questions/)これは物事を適切に分類/分類するのに役立ちます。適用、ありがとう! –

+0

ありがとう、私はそれを将来の参考資料として書きます。 – siditom

+0

完了した他の状態も含める必要があります。与えられたように 'jobAttributes'を初期化すれば、これを読んでいる人々はいつまでもループするかもしれません。 'DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowRequest().JobFlowStates(" COMPLETED "、" TERMINATED "、" FAILED ");' –

答えて

1

ジョブフローが完了すると、クラスタは停止し、HDFSパーティションは失われます。 データの損失を防ぐため、Amazon S3に結果を格納するジョブフローの最後のステップを設定します。

JobFlowInstancesDetail場合:KeepJobFlowAliveWhenNoStepsパラメータがTRUEに設定され、ジョブフローが待機状態への遷移が 意志ではなく、手順が完了したら、シャットダウン。

各ジョブフローで最大256ステップが許可されます。

あなたの仕事に時間がかかる場合は、結果を定期的に保存することをお勧めします。

短いストーリー:それがいつ行われたかを知る方法はありません。 代わりに、ジョブの一部としてデータを保存する必要があります。

1

ジョブフローを作成するときは、--wait-for-stepsオプションを使用してください。

./elastic-mapreduce --create \ 
... 
--wait-for-steps \ 
... 
3

私もこの問題に遭遇しました。ここで私が思いついた解決策があります。完璧ではありませんが、うまくいけば助けになります。参考までに、私はJava 1.7とAWS Java SDKバージョン1.9.13を使用しています。 ;このコードは、あなたが厳密に言えばの手順ではなく、終了するクラスタを待っていることを前提としていることを

注意すべてのステップが完了した時点でクラスタが終了しても問題ありませんが、ステップの完了後に生き残っているクラスタを使用している場合は、あまり役に立ちません。

また、このコードでは、クラスタの状態の変化を監視および記録し、さらにクラスタがエラーで終了したかどうかを診断し、エラーが発生した場合に例外をスローします。あなたの問題に独自のソリューションを提出

private void yourMainMethod() { 
    RunJobFlowRequest request = ...; 

    try { 
     RunJobFlowResult submission = emr.runJobFlow(request); 
     String jobFlowId = submission.getJobFlowId(); 
     log.info("Submitted EMR job as job flow id {}", jobFlowId); 

     DescribeClusterResult result = 
      waitForCompletion(emr, jobFlowId, 90, TimeUnit.SECONDS); 
     diagnoseClusterResult(result, jobFlowId); 
    } finally { 
     emr.shutdown(); 
    } 
} 

private DescribeClusterResult waitForCompletion(
      AmazonElasticMapReduceClient emr, String jobFlowId, 
      long sleepTime, TimeUnit timeUnit) 
     throws InterruptedException { 
    String state = "STARTING"; 
    while (true) { 
     DescribeClusterResult result = emr.describeCluster(
       new DescribeClusterRequest().withClusterId(jobFlowId) 
     ); 
     ClusterStatus status = result.getCluster().getStatus(); 
     String newState = status.getState(); 
     if (!state.equals(newState)) { 
      log.info("Cluster id {} switched from {} to {}. Reason: {}.", 
        jobFlowId, state, newState, status.getStateChangeReason()); 
      state = newState; 
     } 

     switch (state) { 
      case "TERMINATED": 
      case "TERMINATED_WITH_ERRORS": 
      case "WAITING": 
       return result; 
     } 

     timeUnit.sleep(sleepTime); 
    } 
} 

private void diagnoseClusterResult(DescribeClusterResult result, String jobFlowId) { 
    ClusterStatus status = result.getCluster().getStatus(); 
    ClusterStateChangeReason reason = status.getStateChangeReason(); 
    ClusterStateChangeReasonCode code = 
     ClusterStateChangeReasonCode.fromValue(reason.getCode()); 
    switch (code) { 
    case ALL_STEPS_COMPLETED: 
     log.info("Completed EMR job {}", jobFlowId); 
     break; 
    default: 
     failEMR(jobFlowId, status); 
    } 
} 

private static void failEMR(String jobFlowId, ClusterStatus status) { 
    String msg = "EMR cluster run %s terminated with errors. ClusterStatus = %s"; 
    throw new RuntimeException(String.format(msg, jobFlowId, status)); 
} 
関連する問題