2017-06-15 9 views
0

ウォッチボルトにTikaボルトを接続するために新しいストリームを設定しようとしています。次のように私は私の新しい「WARC」ストリーム定義するoutputDeclarerFields機能を変更しているティカ定義でウォークボルトの新しいストリームの設定が失敗しました

import com.digitalpebble.stormcrawler.tika.ParserBolt; 
import com.digitalpebble.stormcrawler.warc.WARCHdfsBolt; 

builder.setBolt("tika", new ParserBolt(), numWorkers) 
    .localOrShuffleGrouping("shunt","tika"); 

WARCHdfsBolt warcbolt = getWarcBolt("XX"); 

builder.setBolt("warc", warcbolt, numWorkers) 
    .localOrShuffleGrouping("tika", "warc"); 

@Override 
public void declareOutputFields(OutputFieldsDeclarer declarer) { 
    declarer.declare(new Fields("url", "content", "metadata", "text")); 
    declarer.declareStream(StatusStreamName, new Fields("url", "metadata", "status")); 
    declarer.declareStream("warc", new Fields("url", "content", "metadata", "text")); 
} 

を私はローカルモードでトポロジを起動したときしかし、私が取得:

14308 [メイン] oasdsSlot WARN - SLOTのdebian8:1028 STARTIN:1027空 状態で開始 - SLOT debian8 - 割り当てヌル14308 [主に] oasdsSlotに警告状態EMPTYにおけるG - 割り当てヌル14308 [メイン] oasdsSlotをWARN - SLOTのdebian8:1029が空の状態で開始 - 割り当てヌル14309 [メイン] INFO oaslAsyncLocalizer - に アップ未使用のトポロジを洗浄/ TMP/a1e3b7f5-e251- 40ae-a032-b0839ca103c8/supervisor/stormdist 14318 [main] INFO oasdsSupervisor - ホストdebian8にid f42c64cd-7c36-40ab-9f85-4b7751ed2d6aのスーパーバイザを起動しています。 15030 [main] WARN o.a.s.d.nimbus - トポロジ送信例外。 (トポロジー NAME = 'xxCrawler')#エラー{:原因ゼロ:タイプ org.apache.storm.generated.InvalidTopologyException:[{経由で[org.apache.storm.daemon.common $ validate_structure_BANG_:メッセージ
ゼロトレース [[org.apache.storm.daemon.common $ validate_structure_BANG_ [org.apache.storm.daemon.common $ system_topology_BANG_共通 を呼び出す common.clj 185]
を呼び出す: common.clj 185]}]起動します。 CLJ 378]
[org.apache.storm.daemon.nimbus $ mk_reified_nimbus $ reify__10782 submitTopologyWithOpts nimbus.clj 1694]
[org.apache.storm.daemon.nimbus $ mk_reified_nimbus $再-2]
[sun.reflect.NativeMethodAccessorImpl NativeMethodAccessorImpl.java 62]
[sun.reflect.DelegatingMethodAccessorImpl を呼び出すDelegatingMethodAccessorImplを呼び出す NativeMethodAccessorImpl.java invoke0 ify__10782 submitTopologyのnimbus.clj 1726]
[sun.reflect.NativeMethodAccessorImpl .java 43] [java.lang.reflect.Method invoke Method.java 498] [clojure.lang.Reflector invokeMatchingMethod Reflector.java 93] [clojure.lang.Reflector invokeInstanceMethod Reflector.java 28] [org.apache。 storm.testing $ submit_local_topology invoke testing.clj 310]
[org.apache.storm.LocalCluster $ _submitTopology LocalCluster.clj を呼び出す49] [org.apache.storm.LocalCluster submitTopologyゼロ-1]
[com.digitalpebble.stormcrawler.ConfigurableTopology提出 ConfigurableTopology.java 76]

[ を開始com.digitalpebble.stormcrawler.ConfigurableTopology 50 ConfigurableTopology.java]を[111 xx.xx.xx.xx.xxTopology xxTopology.javaを実行する] [com.digitalpebble.stormcrawler.ConfigurableTopology ConfigurableTopology.java 65を提出します] [xx.xx.xx.xx.xxトポロジメイン xxTopology.java 53]]} 15035 [main] ERROR oassoazsNIOServerCnxnFactory - スレッドスレッド[main、5、main]が死んだ org.apache.storm.generated。InvalidTopologyException:null at org.apache.storm.daemon.common $ validate_structure_BANG_.invoke(common.clj:185) 〜[storm-core-1.1.0.jar:1.1.0] at org.apache.storm。 daemon.common $ system_topology_BANG_.invoke(common.clj:378) 〜[storm-core-1.1.0.jar:1.1.0] at org.apache.storm.daemon.nimbus $ mk_reified_nimbus $ reify__10782.submitTopologyWithOpts(nimbus) .clj:1694) 〜[storm-core-1.1.0.jar:1.1.0] (org.apache.storm.daemon.nimbus)$ mk_reified_nimbus $ reify__10782.submitTopology(nimbus.clj:1726) 〜[storm -core-1.1.0.jar:1.1.0]at sun.reflect.NativeMethodAccessorImpl.invoke0(ネイティブメソッド)〜[?:1.8.0_131] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java :62) 〜[:?1.8.0_131] [?:1.8.0_131] sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 〜で java.lang.reflect.Method.invoke(方法で.java:498)〜[?:1.8.0_131] at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) 〜[clojure-1.7.0.jar :?] at clojure.lang.Reflector。 [storm-core-1.1]。[storm-core-1.1]。[storm-core-1.1]をクリックしてください。 0.jar:1.1.0] 、org.apache.storm.LocalCluster $ _submitTopology.invoke(LocalCluster.clj:49) 〜[storm-core-1.1.0.jar:1.1.0] at org.apache.storm.LocalCluster.submitTopology〜[storm-core-1.1.0.jar:1.1.0] at com.digitalpebble.stormcrawler.ConfigurableTopology.submit(ConfigurableTopology.java:76) 〜[xx-crawler-1.1.jar :?] at com.digitalpebble.stormcrawler.ConfigurableTopology.submit(ConfigurableTopology.java:65) 〜[xx-1.1.jar :?] at xx.xx.xx.xx .xxTopology.run(xxTopology.java:111)〜[xx-crawler-1.1.jar :?] com.digitalpebble.stormcrawler.ConfigurableTopology.start(ConfigurableTopology.java:50) 〜[xx-crawler-1.1。 jar :?] at xx.xx.xx.xx.xxTopology.main(xxTopology.java:53)〜[xx-crawler-1.1.jar :?]

ご協力いただければ幸いです!

StatusStreamName( "status")ストリームを使用してtikaとwarcのボルトを接続すると、正常に動作することに注意してください。

は、

エティエンヌありがとう

答えて

0

WARCsが生、非解析されたコンテンツから生成されています。パーサーボルトの代わりにFetchcherの出力にWARCを接続する必要があります。

ワーカ用に新しいストリームを宣言する必要はありません。ウォーカーボルトをTikaボルトから出てくるデフォルトのストリームに単に接続するだけです。私は

輸入com.digitalpebble.stormcrawler.tika.ParserBoltあなたのコード内で参照

これは、デフォルトの実装( 'warc'ストリームを生成しない)に依存していることを示します。あなたはそれをあなたの変更された実装と置き換えるのを忘れていますか?

+0

Dear Julien、 ありがとう、役に立つコメントがあります。 私が望むのは、選択したWebページのみをWARCファイルにアーカイブすることです。選択は、パーサーにある正規表現に基づいています。その後、一致した場合は、ページの内容をアーカイブします(「warc」ストリームを使用)。したがって、パーサーとワルクとの間の接続はボルトで締め付けられます。 このアプローチは正しいと思いますか?それとも、それを実装するために別の方法をとるべきですか? もう一度ありがとうございます! エティエンヌ – EJO

+0

こんにちはエティエンヌ。上記の私の答えを編集しました。パーサーボルトがバイナリコンテンツを生成すると私が最初に答えたときは忘れていたので、tikaボルトをまったく変更する必要はありません。あなたは、メタデータの内容をチェックし、解析中に設定したK/Vを持つタプルだけを渡すために別注ボルトを書くことができます。 HTMLページのみを扱う場合は、JsoupParserボルトに固執してください。 インポートが正しくないため、最初の問題が発生しました。もしそうなら、私の答えを正しいものとしてマークしてください。ありがとう! –

関連する問題