2016-06-20 3 views
0

トライデントをトライデント内で動的に作成する方法はありますか? 例を教えてください。Apache storm Trident - トポロジを動的に作成する

+0

あなたは、いくつかのプロパティファイル(JSON)でトポロジの設定を保存し、あなたは、あなたがそのファイルからそれを読むことができトポロジを展開することができます。しかし、いったん展開すれば、それを動的に変更することはできません –

答えて

0

まず、トポロジの作成はトライデントの一部ではないことがわかります。トライデントは、マイクロバッチ処理用の単なるAPIです。

新しいトポロジの作成は定義上動的です。これは、TopologyBuilderクラスがやっていることです。

質問に答えるには、はい、トライデントから、または単純なストームスパウトとボルトから新しいトポロジを作成することは可能です。必要なのは、トポロジ作成ロジックがStormクラスター(クラスおよびその他のリソース)にアクセスできることです。これは、Stormでロジックを実行する場合にも、定義によって満たされます。

最後に、新たに作成されたトポロジを送信する方法を見つけることです。これはStormSubmitterクラスが作成したものです(これは再び驚きです:))あなたのクラスパストライデントや通常のスパウト/ボルトの中であなたのロジックを実行するとき。

好奇心で、なぜこれを行う予定ですか?あなたの要件は何ですか?

例:

import java.util.Map; 

import org.apache.storm.Config; 
import org.apache.storm.StormSubmitter; 
import org.apache.storm.generated.StormTopology; 
import org.apache.storm.spout.SpoutOutputCollector; 
import org.apache.storm.task.TopologyContext; 
import org.apache.storm.topology.OutputFieldsDeclarer; 
import org.apache.storm.topology.TopologyBuilder; 
import org.apache.storm.topology.base.BaseRichSpout; 
import org.apache.storm.trident.operation.TridentCollector; 
import org.apache.storm.trident.spout.IBatchSpout; 
import org.apache.storm.tuple.Fields; 

public class DynamicTopologySpout implements IBatchSpout { 

    private static final long serialVersionUID = -3269435263455830842L; 

    @Override 
    @SuppressWarnings("rawtypes") 
    public void open(Map conf, TopologyContext context) {} 

    @Override 
    public void emitBatch(long batchId, TridentCollector collector) { 
     if (newTopologyNeeded()) { 
      TopologyBuilder builder = new TopologyBuilder(); 
      builder 
      .setSpout("spout", new BaseRichSpout() { 
       private static final long serialVersionUID = 1L; 
       @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {} 
       @Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {} 
       @Override public void nextTuple() {} 
      }, 1) 
      .setMaxSpoutPending(15) 
      .setNumTasks(1); 
      StormTopology topology = builder.createTopology(); 
      Config config = new Config(); 
      try { 
       StormSubmitter.submitTopology("dynamic-topology", config, topology); 
      } catch (Exception e) { 
       e.printStackTrace(); 
       collector.reportError(e); 
      } 
     } 
    } 

    private boolean newTopologyNeeded() { 
     // Check if topology needed ... 
     return false; 
    } 

    @Override 
    public void ack(long batchId) {} 

    @Override 
    public void close() {} 

    @Override 
    public Map<String, Object> getComponentConfiguration() { return null; } 

    @Override 
    public Fields getOutputFields() { return null; } 

} 
関連する問題