私のプロジェクトにStorm(see here)を統合しようとしています。私はトポロジー、スパウト、ボルトの概念を突き止めました。しかし、今、私はいくつかの実際の実装を理解しようとしています。Storm> Howto JavaコールバックをSpoutに統合
A)私は、JavaとClojureを使用する多言語環境を使用しています。私のJavaコードは、ストリーミングデータを起動するメソッドを持つコールバッククラスです。これらのメソッドにプッシュされたイベントデータは、私がスパウトとして使用したいものです。
最初の質問は、これらのメソッドに入ってくるデータをスパウトに接続する方法です。私はbacktype.storm.topology.IRichSpoutを渡すI)にしようとしている、そしてII)は(その注ぎ口のオープン機能にbacktype.storm.spout.SpoutOutputCollector(see here)を渡しますsee here)。しかし、私は実際にどのような種類の地図やリストを渡す方法も見当たりません。
B)私のプロジェクトの残りの部分はすべてClojureです。これらの方法では多くのデータが得られます。各イベントのIDは1〜100です。Clojureでは、スパウトからのデータを別の実行スレッドに分割したいと考えています。それらはボルトとなります。
Clojureボルトを設定してスパウトからイベントデータを取得し、受信イベントのIDに基づいてスレッドを分割するにはどうすればよいですか?事前 ティム
[EDIT 1]で
おかげで
私は実際にこの問題を過ぎてもらいました。私は1)私自身のIRichSpoutを実装しました。私は2)は、Javaコールバッククラスの受信ストリームデータにそのスパウトの内部タプルを接続しました。これが慣用かどうかは分かりません。しかし、それはコンパイルされ、エラーなく実行されます。ただし、3)printstuffボルトを介して入ってくる着信ストリームデータは表示されません(間違いありません)。
イベントデータが確実に伝達されるようにするには、吐き出しやボルトの実装やトポロジ定義で何か特別なことがありますか?ありがとう。
;; tie Java callbacks to a Spout that I created (.setSpout java-callback ibspout) (storm/defbolt printstuff ["word"] [tuple collector] (println (str "printstuff --> tuple["tuple"] > collector["collector"]")) ) (storm/topology { "1" (storm/spout-spec ibspout) } { "3" (storm/bolt-spec { "1" :shuffle } printstuff ) })
[EDIT 2]
SOメンバーAnkurのアドバイスで、私は私のトポロジをrejiggingよ。 Javaコールバックを作成した後、(.setTuple ibspout (.getTuple java-callback))
を使用して、それを下のIBSpoutに渡します。 NotSerializableエラーが発生するため、Javaコールバックオブジェクト全体を渡すことはありません。すべてがコンパイルされ、エラーなく実行されます。しかし、再び、私にはデータがありません。printstuffボルト。うーん。パートBへ
public class IBSpout implements IRichSpout { /** * Storm spout stuff */ private SpoutOutputCollector _collector; private List _tuple = new ArrayList(); public void setTuple(List tuple) { _tuple = tuple; } public List getTuple() { return _tuple; } /** * Storm ISpout interface functions */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() {} public void activate() {} public void deactivate() {} public void nextTuple() { _collector.emit(_tuple); } public void ack(Object msgId) {} public void fail(Object msgId) {} public void declareOutputFields(OutputFieldsDeclarer declarer) {} public java.util.Map getComponentConfiguration() { return new HashMap(); } }
ねえ、これを見ていただきありがとうございます。私は実際に作業負荷のバランスを取るために**:シャッフル**を指定しました。私が今行っている問題は、私のイベントデータが私のボルトに伝わるのを見ていないということです(編集を参照)。どんな洞察もありがとうございます。 – Nutritioustim
@Nutritioustimあなたは実際に何が問題だったか把握しましたか? – Vor
@ Vor、いいえ。ストームは私がやろうとしていることに対してはあまりにもうまくいかないようです。今のところ*** [Lamina](https://github.com/ztellman/lamina)***が私のニーズを満たしています。 HTH。 – Nutritioustim