2016-08-23 12 views
1

私はStormの初心者です。私は大学のプロジェクトに使っています。Stormでタプル処理を停止して他のコードを実行する方法

MySqlデータベースにリンクされたSpoutと2つのボルトでトポロジを作成しました。スパウトにリンクされた最初のボルトは、タプルに必要でない情報を準備し、削除します。 2番目は、タプルのフィルタリングを行います。

私はローカルモードで作業しています。

私の質問は なぜトポロジを実行した後、私のコンソールに以下のような出力が表示されるのですか?

38211 [Thread-14-movie-SPOUT] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30] 
67846 [Thread-10-__acker] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
67846 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
67852 [Thread-10-__acker] INFO backtype.storm.daemon.task - Emitting: __acker __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=0, write_pos=1, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {}]>]] 
67853 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.task - Emitting: cleaning-genre-bolt __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {default=1680}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1621, write_pos=1622, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {default=1680}]> #<DataPoint [__execute-latency = {movie-SPOUT:default=0.15476190476190477}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=1680, write_pos=1680, capacity=1024, population=0}]> #<DataPoint [__execute-count = {movie-SPOUT:default=1680}]>]] 
67854 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60] 
67855 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.task - Emitting: filtering-genre-BOLT __metrics [#<TaskInfo [email protected]> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1681, write_pos=1682, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {cleaning-genre-bolt:default=0.08333333333333333}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {cleaning-genre-bolt:default=1680}]>]] 

私は最後のタプルが処理された後、これらの行は、通常考慮されるべきであることを読みました。ではない?

トポロジの送信後に他のコードを実行するにはどうすればよいですか?たとえば、2番目のボルトで行われたフィルタリングの結果を、HashMapに保存して印刷したいとします。 submitTopology()メソッドを含む行の後にコードを置くと、コードはタプルの完了前に実行されます。

秒と最後の質問です:なぜ嵐のすべての例では、私はスパウト

「(1000)のThread.sleep」で参照してください?

多分私の最初の質問にリンクしています。

私の質問がはっきりしていることを願います。 ありがとうございます!

答えて

0

処理された最後のタプルの後のこれらの行は通常とみなされます。ではない?

これらはただメッセージINFOです。だから、心配する必要はありません。

submitTopology()メソッドを含む行の後にコードを挿入すると、コードはタプルの完了前に実行されます。

トポロジを送信すると、トポロジはバックグラウンドで(つまりマルチスレッドで)実行されます。これは、トポロジが「永遠に」実行されるときに必要です(明示的に停止するか、ローカルモードを実行しているときにJavaアプリケーションが終了するまで)。

ストロームはストリーミングシステムであり、「処理中は終了しません」(無限に入力ストリームが存在するため、処理は永遠に実行されるため)、「トポロジが完成した後」というコードはストームの概念と一致しません。有限のデータセットを処理したい場合は、FlinkやSparkのようなバッチ処理フレームワークを考慮する必要があります。

このように、Stormでこの機能を使用するには、すべてのデータがいつ処理されたかを判断できる必要があります。したがって、トポロジの送信後、すべてのデータが処理された後、明示的にブロックして待機します。

しかし、あなたのユースケースでは、最後のボルト内から結果を印刷しないのはなぜですか?

Thread.sleep()あなたが参照している例がわかりません。なぜ誰かが生産のためにそれを入れなければならない理由は考えられません。処理を人為的に遅くする目的をデモするためにあるかもしれません。

+0

徹底的な返信ありがとう、マティアス! –

+0

これがあなたの質問に答えるなら、私の答えを受け入れるか自由にしてください。 –

関連する問題