0
トライデントをトライデント内で動的に作成する方法はありますか? 例を教えてください。Apache storm Trident - トポロジを動的に作成する
トライデントをトライデント内で動的に作成する方法はありますか? 例を教えてください。Apache storm Trident - トポロジを動的に作成する
まず、トポロジの作成はトライデントの一部ではないことがわかります。トライデントは、マイクロバッチ処理用の単なるAPIです。
新しいトポロジの作成は定義上動的です。これは、TopologyBuilder
クラスがやっていることです。
質問に答えるには、はい、トライデントから、または単純なストームスパウトとボルトから新しいトポロジを作成することは可能です。必要なのは、トポロジ作成ロジックがStormクラスター(クラスおよびその他のリソース)にアクセスできることです。これは、Stormでロジックを実行する場合にも、定義によって満たされます。
最後に、新たに作成されたトポロジを送信する方法を見つけることです。これはStormSubmitter
クラスが作成したものです(これは再び驚きです:))あなたのクラスパストライデントや通常のスパウト/ボルトの中であなたのロジックを実行するとき。
好奇心で、なぜこれを行う予定ですか?あなたの要件は何ですか?
例:
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
public class DynamicTopologySpout implements IBatchSpout {
private static final long serialVersionUID = -3269435263455830842L;
@Override
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context) {}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
if (newTopologyNeeded()) {
TopologyBuilder builder = new TopologyBuilder();
builder
.setSpout("spout", new BaseRichSpout() {
private static final long serialVersionUID = 1L;
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
@Override public void nextTuple() {}
}, 1)
.setMaxSpoutPending(15)
.setNumTasks(1);
StormTopology topology = builder.createTopology();
Config config = new Config();
try {
StormSubmitter.submitTopology("dynamic-topology", config, topology);
} catch (Exception e) {
e.printStackTrace();
collector.reportError(e);
}
}
}
private boolean newTopologyNeeded() {
// Check if topology needed ...
return false;
}
@Override
public void ack(long batchId) {}
@Override
public void close() {}
@Override
public Map<String, Object> getComponentConfiguration() { return null; }
@Override
public Fields getOutputFields() { return null; }
}
あなたは、いくつかのプロパティファイル(JSON)でトポロジの設定を保存し、あなたは、あなたがそのファイルからそれを読むことができトポロジを展開することができます。しかし、いったん展開すれば、それを動的に変更することはできません –