2016-06-29 6 views
2

Apache Toreeを使用すると、Sparkで任意の式を実行できます。 sqlContext.sql(..)Apache Toreeジョブの進捗状況

このようなSQLクエリで進捗状況(Zeppelinなど)を取得することは可能ですか?たぶんToreeはいくつかのクエリメトリック(X tasks from N are doneなど)を提供することができますか?

+1

Spark REST APIがおそらく役に立ちます –

答えて

1

Apache Zeppelinの使用方法はsc.dagSchedulerです。

REST APIは、SparkContextに直接アクセスできない場合は、より適切な選択肢にする必要があります。

package org.apache.zeppelin.spark 

class SparkInterpreter { 
    @Override 
    public int getProgress(InterpreterContext context) { 
    String jobGroup = getJobGroup(context); 
    int completedTasks = 0; 
    int totalTasks = 0; 

    DAGScheduler scheduler = sc.dagScheduler(); 
    if (scheduler == null) { 
     return 0; 
    } 
    HashSet<ActiveJob> jobs = scheduler.activeJobs(); 
    if (jobs == null || jobs.size() == 0) { 
     return 0; 
    } 
    Iterator<ActiveJob> it = jobs.iterator(); 
    while (it.hasNext()) { 
     ActiveJob job = it.next(); 
     String g = (String) job.properties().get("spark.jobGroup.id"); 
     if (jobGroup.equals(g)) { 
     int[] progressInfo = null; 
     try { 
      Object finalStage = job.getClass().getMethod("finalStage").invoke(job); 
      if (sparkVersion.getProgress1_0()) { 
      progressInfo = getProgressFromStage_1_0x(sparkListener, finalStage); 
      } else { 
      progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage); 
      } 
     } catch (IllegalAccessException | IllegalArgumentException 
      | InvocationTargetException | NoSuchMethodException 
      | SecurityException e) { 
      logger.error("Can't get progress info", e); 
      return 0; 
     } 
     totalTasks += progressInfo[0]; 
     completedTasks += progressInfo[1]; 
     } 
    } 

    if (totalTasks == 0) { 
     return 0; 
    } 
    return completedTasks * 100/totalTasks; 
    } 
} 
+0

sc.dagSchedulerとは何ですか? – felansu

関連する問題