2016-11-28 4 views
2

Apache Apexでバッチ処理アプリケーションを作成するにはどうすればよいですか?Apexでバッチ処理を行う方法は?

私が見つけたすべての例はストリーミングアプリケーションでした。つまり、終了していないことを意味し、すべてのデータを処理してからアプリを終了したいと考えています。

ありがとうございました

答えて

2

アプリを実行する前に終了条件を追加できます。完全なコードの例

public void testMapOperator() throws Exception 
{ 
    LocalMode lma = LocalMode.newInstance(); 
    DAG dag = lma.getDAG(); 

    NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator()); 
    FunctionOperator.MapFunctionOperator<Integer, Integer> mapper 
    = dag.addOperator("mapper", new FunctionOperator.MapFunctionOperator<Integer, Integer>(new Square())); 
    ResultCollector collector = dag.addOperator("collector", new ResultCollector()); 

    dag.addStream("raw numbers", numGen.output, mapper.input); 
    dag.addStream("mapped results", mapper.output, collector.input); 

// Create local cluster 
    LocalMode.Controller lc = lma.getController(); 
    lc.setHeartbeatMonitoringEnabled(false); 

//Condition to exit the application 
    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() 
    { 
    @Override 
    public Boolean call() throws Exception 
    { 
     return TupleCount == NumTuples; 
    } 
    }); 

    lc.run(); 

    Assert.assertEquals(sum, 285); 
} 

ため は

+0

実行時環境の面でより一般的な解決方法はありますか?私はそれがローカルかクラスタ環境かどうかを選択する可能性を持っていたいと思います。 – Krever

4

あなたのユースケースは何ですかhttps://github.com/apache/apex-malhar/blob/master/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.javaを参照してください?基本的にバッチをサポートすることはロードマップ上で行われており、現在作業中です。

これまでの処理が完了したら、入力オペレータはShutdownException()として信号を送信し、DAGを通過してDAGをシャットダウンします。

詳細が必要な場合はお知らせください。

+0

私はmscの論文として、ほとんどすべてのオープンソースのビッグデータ処理エンジンの比較を書いています。私はそれの頂点バッチ部分(Mapreduce、Flink、Sparkに沿って)を作成したかったのです。私はおそらくそれをスキップし、ストリームの比較を続けるでしょう。 – Krever

+0

それを使用する方法は次のとおりです。endWindow()コールで、タスクが完了しているかどうかを確認します。カスタムロジックが必要です。タスクが完了している場合、ShuddownException()を呼び出すと、パイプライン全体がシャットダウンします。 –

関連する問題