2013-04-02 6 views
5

私のプロジェクトにStorm(see here)を統合しようとしています。私はトポロジー、スパウト、ボルトの概念を突き止めました。しかし、今、私はいくつかの実際の実装を理解しようとしています。Storm> Howto JavaコールバックをSpoutに統合

A)私は、JavaとClojureを使用する多言語環境を使用しています。私のJavaコードは、ストリーミングデータを起動するメソッドを持つコールバッククラスです。これらのメソッドにプッシュされたイベントデータは、私がスパウトとして使用したいものです。

最初の質問は、これらのメソッドに入ってくるデータをスパウトに接続する方法です。私はbacktype.storm.topology.IRichSpoutを渡すI)にしようとしている、そしてII)は(その注ぎ口のオープン機能にbacktype.storm.spout.SpoutOutputCollectorsee here)を渡しますsee here)。しかし、私は実際にどのような種類の地図やリストを渡す方法も見当たりません。

B)私のプロジェクトの残りの部分はすべてClojureです。これらの方法では多くのデータが得られます。各イベントのIDは1〜100です。Clojureでは、スパウトからのデータを別の実行スレッドに分割したいと考えています。それらはボルトとなります。

Clojureボルトを設定してスパウトからイベントデータを取得し、受信イベントのIDに基づいてスレッドを分割するにはどうすればよいですか?事前 ティム

[EDIT 1]で

おかげで

私は実際にこの問題を過ぎてもらいました。私は1)私自身のIRichSpoutを実装しました。私は2)は、Javaコールバッククラスの受信ストリームデータにそのスパウトの内部タプルを接続しました。これが慣用かどうかは分かりません。しかし、それはコンパイルされ、エラーなく実行されます。ただし、3)printstuffボルトを介して入ってくる着信ストリームデータは表示されません(間違いありません)。

イベントデータが確実に伝達されるようにするには、吐き出しやボルトの実装やトポロジ定義で何か特別なことがありますか?ありがとう。

 

     ;; tie Java callbacks to a Spout that I created 
     (.setSpout java-callback ibspout) 

     (storm/defbolt printstuff ["word"] [tuple collector] 
     (println (str "printstuff --> tuple["tuple"] > collector["collector"]")) 
    ) 
     (storm/topology 
     { "1" (storm/spout-spec ibspout) 
     } 
     { "3" (storm/bolt-spec { "1" :shuffle } 
           printstuff 
      ) 
     }) 

[EDIT 2]

SOメンバーAnkurのアドバイスで、私は私のトポロジをrejiggingよ。 Javaコールバックを作成した後、(.setTuple ibspout (.getTuple java-callback))を使用して、それを下のIBSpoutに渡します。 NotSerializableエラーが発生するため、Javaコールバックオブジェクト全体を渡すことはありません。すべてがコンパイルされ、エラーなく実行されます。しかし、再び、私にはデータがありません。printstuffボルト。うーん。パートBへ

 

    public class IBSpout implements IRichSpout { 

     /** 
     * Storm spout stuff 
     */ 
     private SpoutOutputCollector _collector; 

     private List _tuple = new ArrayList(); 
     public void setTuple(List tuple) { _tuple = tuple; } 
     public List getTuple() { return _tuple; } 

     /** 
     * Storm ISpout interface functions 
     */ 
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 
     _collector = collector; 
     } 
     public void close() {} 
     public void activate() {} 
     public void deactivate() {} 
     public void nextTuple() { 
     _collector.emit(_tuple); 
     } 
     public void ack(Object msgId) {} 
     public void fail(Object msgId) {} 


     public void declareOutputFields(OutputFieldsDeclarer declarer) {} 
     public java.util.Map getComponentConfiguration() { return new HashMap(); } 

    } 

答えて

0

回答:あなたはIDによる実行時に一緒にグループ化されますどのように動作を制御できるように、フィールドのグループ化を探しているよう

簡単な答えは私に聞こえます。

あなたはなぜこのようにしようとしているのかわからないので、これは本当に完全な答えではないと私は確信しています。バランスのとれたワークロードが必要な場合は、シャッフルグループ化が最適です。

+0

ねえ、これを見ていただきありがとうございます。私は実際に作業負荷のバランスを取るために**:シャッフル**を指定しました。私が今行っている問題は、私のイベントデータが私のボルトに伝わるのを見ていないということです(編集を参照)。どんな洞察もありがとうございます。 – Nutritioustim

+0

@Nutritioustimあなたは実際に何が問題だったか把握しましたか? – Vor

+0

@ Vor、いいえ。ストームは私がやろうとしていることに対してはあまりにもうまくいかないようです。今のところ*** [Lamina](https://github.com/ztellman/lamina)***が私のニーズを満たしています。 HTH。 – Nutritioustim

3

ちょっと変わったようなコールバッククラスにスパウトを渡しているようです。トポロジが実行されると、ストームは定期的にスパウトnextTupleメソッドを呼び出します。したがって、あなたが行う必要があるのは、カスタムスパウト実装にJavaコールバックを渡すことです。これにより、スパウトがあなたのスパウトを呼び出すとき、スパウトはJavaコールバックを呼び出してトポロジに供給されるタプル。

理解するための重要な概念は、噴出は、嵐によって要求されたとき、あなたは注ぎ口にデータをプッシュしていないデータを引っ張るということです。あなたのコールバックはデータをプッシュするためにスパウトを呼び出すことはできません。スパウトのnextTupleメソッドが呼び出されると、あなたのスパウトが(いくつかのJavaメソッドまたはメモリバッファから)データをプルする必要があります。

+0

ああいいね。洞察に感謝します。しかし、私はまだデータがスパウトを通して私のボルトに来るのを見ていない。私はより良い説明を与えました。たぶん、私はSpoutで特定のことをするべきでしょうか?データ構造がSpoutに渡される特別な方法はありますか?ありがとう。 – Nutritioustim

+0

@Nutritioustimあなたは答えを得ましたか? – hawkeye

+0

Heyya。 abouveを参照してください。私は欲しいものをするために嵐を得ることができませんでした。そして、*** [Lamina](https://github.com/ztellman/lamina)***は、私の問題を解決するより軽量のツールでした。 HTH。 – Nutritioustim

関連する問題