2016-05-12 6 views
0

私はこのようなファイルにドライバ/メインクラスを持っています。 (基本的に私はSTORM & AKKAを混ぜようとしています)。 TenderEventSpout2クラスでは、俳優との間でメッセージの送受信をしようとしています。Akka-Javaは依頼待ちでDeadLetterExceptionをスローしています

public class TenderEventSpout2 extends BaseRichSpout { 
     ActorSystemHandle actorSystemHandle; 
     ActorSystem _system; 
     ActorRef eventSpoutActor; 
     Future<Object> future; 
     Timeout timeout; 
     String result; 

    @Override 
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { 
     //String[] message = {"WATCH_DIR"}; 
     timeout = new Timeout(Duration.create(60, "seconds")); 
     List<Object> messageList = new ArrayList<Object>(); 

     messageList.add("WATCH_DIR"); 

     messageList.add(this.inputDirName); 

     actorSystemHandle = new ActorSystemHandle(); 
     _system = actorSystemHandle.getActorSystem(); 
     eventSpoutActor = _system.actorOf(Props.create(EventSpoutActor.class)); 


     future = Patterns.ask(eventSpoutActor, messageList, timeout); 

    } 

    @Override 
    public void nextTuple() { 
     String result = null; 
     try{ 
      result = (String) Await.result(future, timeout.duration()); 
     } 
     catch(Exception e){ 
      e.printStackTrace(); 
     } 
} 

私の俳優である:

public class EventSpoutActor extends UntypedActor { 
public ConcurrentLinkedQueue<String> eventQueue = new ConcurrentLinkedQueue<>(); 
Inbox inbox; 
@Override 
public void onReceive(Object message){// throws IOException { 
    if (message instanceof List<?>) { 
     System.out.println(((List<Object>)message).get(0)+"*******************"); 
     if(((List<Object>)message).get(0).equals("WATCH_DIR")){ 
      final List<Object> msg = (List<Object>)message; 
      Thread fileWatcher = new Thread(new Runnable(){ 

       @Override 
       public void run() { 

         System.out.println(msg.get(1)+"*******************"); 
         try { 

          String result = "Hello"; 
          System.out.println("Before Sending Message *******************"); 
          getSender().tell(result, getSelf()); 
          } 
         catch (Exception e) { 
          getSender().tell(new akka.actor.Status.Failure(e), getSelf()); 
          throw e; 
          } 

       } 
      }); 
      fileWatcher.setDaemon(true); 
      fileWatcher.start(); 
      System.out.println("Started file watcher"); 
     } 
    } 
    else{ 
     System.out.println("Unhandled !!"); 
     unhandled(message); 
    } 
} 

}

私はEventSpoutActorにメッセージを送信することができますよ。しかし、メッセージを受け取ることに直面する問題。何故ですか??コンソールに次のメッセージが表示されます。

[EventProcessorActorSystem-akka.actor.default-dispatcher-3] 
[akka://EventProcessorActorSystem/deadLetters] Message [java.lang.String]  
from Actor[akka://EventProcessorActorSystem/user/$a#-1284357486] to 
Actor[akka://EventProcessorActorSystem/deadLetters] was not delivered. [1] 
dead letters encountered. 
This logging can be turned off or adjusted with configuration settings 
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 

答えて

0

メッセージが配信されなかった理由がわかりました。

getSender().tell(result, getSelf()); 

送信者にメッセージを送信することになっている。このラインは、それはそれはスレッド・コードの内部で使用されたときに、コンテキストデータです失わ:

Thread fileWatcher = new Thread(new Runnable(){ 

      @Override 
      public void run() { 

        System.out.println(msg.get(1)+"*******************"); 
        try { 

         String result = "Hello"; 
         System.out.println("Before Sending Message *******************"); 
         getSender().tell(result, getSelf()); 

私は外に「伝える」のコードを移動した場合スレッド、それは働いた。

関連する問題