2017-07-17 2 views
1

enter image description herespoutクラスの私の失敗メソッドは、最初のボルトに対してのみ働き、2番目のボルト以降では機能しません。

注:
Bolt1は、最初の3つの素数(2,3,5)のリストが含まれています。
Bolt2には、素数(7,11,13)の2番目の3つのセットのリストが含まれています。
Bolt3では、番号がプライムであるかどうかをチェックするだけです。
最初のボルトから、私はspoutクラスからFail()を呼び出すことができますが、2番目のボルト以降では、spoutクラスからFail()を呼び出すことができません。

トポロジクラス:

 ...... 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("spout", new SpoutClass(), 1); 
     builder.setBolt("bolt1", new Bolt1(), 1).shuffleGrouping("spout"); 
     builder.setBolt("bolt2", new Bolt2(), 1).shuffleGrouping("bolt1"); 
     builder.setBolt("bolt3", new Bolt3(), 1).shuffleGrouping("bolt2"); 

スパウトクラス:

SpoutClass implements IRichSpout{ 
    private SpoutOutputCollector collector; 
    private TopologyContext context; 

    public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { 
     this.context = context; 
     this.collector = collector; 
     } 

    public void nextTuple() { 
     try { 
      //messageQueue is blocking queue which contains data 
      String msg = messageQueue.take(); 
      String ackId = msg; 
      this.collector.emit(new Values(msg), ackId); 

     }catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
    public void ack(Object msgId) { 

     System.out.println("Acknowledges that this tuple has been processed ........... " + msgId); 

    } 

    public void fail(Object msgId) { 

     System.out.println("FAILED To Process Message :-" + msgId); 

    } 
} 

Bolt1クラス:

public class Bolt1 extends BaseRichBolt { 
private OutputCollector collector; 
ArrayList<Integer> firstthreePrime = new ArrayList<Integer>(); 
     firstthreePrime.add(2); 
     firstthreePrime.add(3); 
     firstthreePrime.add(5); 
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.collector = collector; 
    } 
    public void execute(Tuple tuple) { 

     String message = (String) tuple.getValueByField("msg"); 

     System.out.println("Received " + message + " in Bolt1."); 
     Integer number = Integer.valueOf(message); 
     if (check this number contains bolt1 or not) { 
      //if number is contains 
      System.out.println(" Number is prime ............." + number + " and Throw from Bolt1"); 
      this.collector.fail(tuple); 
     } else { 
      collector.emit(new Values(message)); 
      collector.ack(tuple); 
     } 
    } 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("msg")); 
    } 
} 

Bolt2クラス:

public class Bolt2 extends BaseRichBolt { 
private OutputCollector collector; 
ArrayList<Integer> secondthreePrime = new ArrayList<Integer>(); 
     secondthreePrime.add(7); 
     secondthreePrime.add(11); 
     secondthreePrime.add(13); 
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.collector = collector; 

    } 
    public void execute(Tuple tuple) { 

     String message = (String) tuple.getValueByField("msg"); 

     System.out.println("Received " + message + " in Bolt2."); 
     Integer number = Integer.valueOf(message); 
     if (check this number contains bolt2 or not) { 
      //if number is contains 
      System.out.println(" Number is prime ............." + number + " and Throw from Bolt2"); 
      this.collector.fail(tuple); 
     } else { 
      collector.emit(new Values(message)); 
      collector.ack(tuple); 
     } 
    } 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("msg")); 
    } 
} 

Bolt3クラス:

public class Bolt3 extends BaseRichBolt { 
private OutputCollector collector; 

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.collector = collector; 

    } 
    public void execute(Tuple tuple) { 

     String message = (String) tuple.getValueByField("msg"); 

     System.out.println("Received " + message + " in Bolt3."); 
     Integer number = Integer.valueOf(message); 
     if (check this number is prime or not) { 
      //if number is prime 
      System.out.println(" Number is prime ............." + number + " and Throw from Bolt3"); 
      this.collector.fail(tuple); 
     } else { 
      collector.emit(new Values(message)); 
      collector.ack(tuple); 
     } 
    } 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
    } 
} 

答えて

1

あなたがBaseRichBoltを使用しているので、あなたは、発信タプルを固定したくないですか?

_collector.emit(tuple, new Values(message)); 

アンカーしないと、スパウトからのタプルへのリンクがありません。ドキュメントをチェックしてください:Guaranteeing Message Processing

+0

spoutクラスでは、タプルでは出力できません。これは、ボルトクラスに追加した後に動作します: collector.emit(タプル、新しい値(メッセージ)); – Ashish

+0

はい、申し訳ありませんが、タイプミスはBaseRichBoltを意味していました。あなたの問題を解決したら、私の答えを受け入れてください。 –

関連する問題