2017-02-21 7 views
0

私のトポロジにtikaパーサーを含める必要があります。私は設定でjsoup.treat.non.html.as.errorfalseと設定しました。そして、私は嵐のクローラの文書で説明されているようにtikaトポロジーを設定しました。次のようにクロールトポロジの再帰的クロール用のTikaを使用したStorm Crawlerの設定

設定がされています

builder.setSpout("spout", new MemorySpout(testURLs)); 

builder.setBolt("partitioner", new URLPartitionerBolt()).shuffleGrouping("spout"); 

builder.setBolt("fetch", new FetcherBolt()).fieldsGrouping("partitioner", new Fields("key")); 

builder.setBolt("sitemap", new SiteMapParserBolt()).localOrShuffleGrouping("fetch"); 

builder.setBolt("jsoup", new JSoupParserBolt()).localOrShuffleGrouping("sitemap"); 

builder.setBolt("shunt", new RedirectionBolt()).localOrShuffleGrouping("jsoup"); 

builder.setBolt("tika", new ParserBolt()).localOrShuffleGrouping("shunt", "tika"); 

builder.setBolt("indexer", new HBaseIndexerBolt(), numWorkers).localOrShuffleGrouping("shunt") 
        .localOrShuffleGrouping("tika"); 

builder.setBolt("status", new MemoryStatusUpdater()).localOrShuffleGrouping(Constants.StatusStreamName) 
        .localOrShuffleGrouping("sitemap", Constants.StatusStreamName) 
        .localOrShuffleGrouping("shunt", Constants.StatusStreamName) 
        .localOrShuffleGrouping("tika", Constants.StatusStreamName) 
        .localOrShuffleGrouping("indexer", Constants.StatusStreamName); 

return submit("crawl", conf, builder); 

このトポロジでは、私は、無効なトポロジの例外を受け取りました。この問題はステータスボルトによって引き起こされたようです。ステータスボルトを除外すると、クロールトポロジが問題なく動作するためです。ステータスボルトはどのように設定する必要がありますか?

答えて

1

最初の接続には 'fetch'がありません。また、 'shunt'ではなく 'jsoup'に接続する必要があります。後者はステータスストリームへのアウトリンクを生成しません。JSoupが処理できなかったタプルをTika使用する。いくつかの背景については、StatusStream wikiを参照してください。

以下の定義が有効です。

builder.setBolt("status", new MemoryStatusUpdater()). 
    .localOrShuffleGrouping("fetch", Constants.StatusStreamName) 
    .localOrShuffleGrouping("sitemap", Constants.StatusStreamName) 
    .localOrShuffleGrouping("jsoup", Constants.StatusStreamName) 
    .localOrShuffleGrouping("tika", Constants.StatusStreamName) 
    .localOrShuffleGrouping("indexer", Constants.StatusStreamName); 

これはあなたのHBaseIndexerがAbstractIndexerBoltを拡張し、状態ストリームにタプルを送信することを想定しています。

MemoryStatusUpdaterは主にテストとデバッグのためのものです。複数のワーカーがいる場合は必ずしも機能しません。また、ワーカープロセスが再起動されるとトポロジが失われます。

+0

ありがとうございました。できます。 – isspek

関連する問題