2017-01-25 1 views
1

私は分散したpub-subを異なるクラスタシステムに作成しようとしていますが、何を試みても機能しません。エラー:ActorからActorへのメッセージが配信されませんでした。[1]デッドレターが発生しました。分散したpub-subは動作しないクラスタで動作します

私がしようとしているのは、ここで簡単な例を作成することです。

1)私はトピック、例えば "content"を作成します。

2)jvm Aの1つのノードがトピックを作成し、サブスクリプションを作成し、パブリッシャーにも公開します。

3)別のノードで、別のポートのjvm Bと言うと、私はサブスクライバを作成します。

4)私はjvm Aからトピックにメッセージを送信したとき、jvm Bのサブスクライバも同じトピックを購読してもらいたいと思います。

Javaの異なるポートにある異なるクラスタシステムのサブスクライバとパブリッシャとの分散pubサブの簡単な実例です。

ここにapp1とその設定ファイルのコードがあります。

public class App1{ 

    public static void main(String[] args) { 

    System.setProperty("akka.remote.netty.tcp.port", "2551"); 
    ActorSystem clusterSystem = ActorSystem.create("ClusterSystem"); 
    ClusterClientReceptionist clusterClientReceptionist1 = ClusterClientReceptionist.get(clusterSystem); 
    ActorRef subcriber1=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber1"); 
    clusterClientReceptionist1.registerSubscriber("content", subcriber1); 
    ActorRef publisher1=clusterSystem.actorOf(Props.create(Publisher.class), "publisher1"); 
    clusterClientReceptionist1.registerSubscriber("content", publisher1); 
    publisher1.tell("testMessage1", ActorRef.noSender()); 

    } 
} 

app1.confi

akka { 
loggers = ["akka.event.slf4j.Slf4jLogger"] 
loglevel = "DEBUG" 
stdout-loglevel = "DEBUG" 
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" 
actor { 
provider = "akka.cluster.ClusterActorRefProvider" 
} 
remote { 
log-remote-lifecycle-events = off 
enabled-transports = ["akka.remote.netty.tcp"] 
netty.tcp { 
    hostname = "127.0.0.1" 
    port = 2551 
    } 
} 
cluster { 
seed-nodes = [ 
    "akka.tcp://[email protected]:2551" 
] 
auto-down-unreachable-after = 10s 
} 
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub", 
"akka.contrib.pattern.ClusterReceptionistExtension"] 
    akka.cluster.pub-sub { 
name = distributedPubSubMediator 
role = "" 
routing-logic = random 
gossip-interval = 1s 
removed-time-to-live = 120s 
max-delta-elements = 3000 
use-dispatcher = "" 
} 

akka.cluster.client.receptionist { 
name = receptionist 
role = "" 
number-of-contacts = 3 
response-tunnel-receive-timeout = 30s 
use-dispatcher = "" 
heartbeat-interval = 2s 
acceptable-heartbeat-pause = 13s 
failure-detection-interval = 2s 
    } 
} 

APP2ためのコードと、その設定ファイル

public class App 
{ 
    public static Set<ActorPath> initialContacts() { 
    return new HashSet<ActorPath>(Arrays.asList(   
    ActorPaths.fromString("akka.tcp://[email protected]:2551/system/receptionist"))); 
} 

public static void main(String[] args) { 
    System.setProperty("akka.remote.netty.tcp.port", "2553"); 
    ActorSystem clusterSystem = ActorSystem.create("ClusterSystem2"); 
    ClusterClientReceptionist clusterClientReceptionist2 = ClusterClientReceptionist.get(clusterSystem); 
    final ActorRef clusterClient = clusterSystem.actorOf(ClusterClient.props(ClusterClientSettings.create(
      clusterSystem).withInitialContacts(initialContacts())), "client"); 
    ActorRef subcriber2=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber2"); 
    clusterClientReceptionist2.registerSubscriber("content", subcriber2); 
    ActorRef publisher2=clusterSystem.actorOf(Props.create(Publisher.class), "publisher2"); 
    publisher2.tell("testMessage2", ActorRef.noSender()); 
    clusterClient.tell(new ClusterClient.Send("/user/publisher1", "hello", true), null); 

} 
}    

app2.confi

akka { 
loggers = ["akka.event.slf4j.Slf4jLogger"] 
loglevel = "DEBUG" 
stdout-loglevel = "DEBUG" 
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" 
actor { 
provider = "akka.cluster.ClusterActorRefProvider" 
} 
remote { 
log-remote-lifecycle-events = off 
enabled-transports = ["akka.remote.netty.tcp"] 
netty.tcp { 
    hostname = "127.0.0.1" 
    port = 2553 
    } 
} 
cluster { 
seed-nodes = [ 
    "akka.tcp://[email protected]:2553" 
] 
auto-down-unreachable-after = 10s 
} 
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub", 
"akka.contrib.pattern.ClusterReceptionistExtension"] 
    akka.cluster.pub-sub { 
name = distributedPubSubMediator 
role = "" 
routing-logic = random 
gossip-interval = 1s 
removed-time-to-live = 120s 
max-delta-elements = 3000 
use-dispatcher = "" 
} 

akka.cluster.client.receptionist { 
name = receptionist 
role = "" 
number-of-contacts = 3 
response-tunnel-receive-timeout = 30s 
use-dispatcher = "" 
heartbeat-interval = 2s 
acceptable-heartbeat-pause = 13s 
failure-detection-interval = 2s 
    } 
} 

パブリッシャとサブスクライバクラスの両方のアプリケーションのために同じです以下に示す。

出版社:

public class Publisher extends UntypedActor { 
private final ActorRef mediator = 
     DistributedPubSub.get(getContext().system()).mediator(); 

@Override 
public void onReceive(Object msg) throws Exception { 
    if (msg instanceof String) { 
     mediator.tell(new DistributedPubSubMediator.Publish("events", msg), getSelf()); 
    } else { 
     unhandled(msg); 
    } 
} 

} 

サブスクライバ:

public class Subscriber extends UntypedActor { 
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); 

public Subscriber(){ 

    ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator(); 
    mediator.tell(new DistributedPubSubMediator.Subscribe("events", getSelf()), getSelf()); 

} 

public void onReceive(Object msg) throws Throwable { 
    if (msg instanceof String) { 
     log.info("Got: {}", msg); 
    } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) { 
     log.info("subscribing"); 
    } else { 
     unhandled(msg); 
    } 
} 
} 

両方のアプリを実行している間、私は、受信側のアプリでこのエラーを得ました。 デッド文字は

[ClusterSystem-akka.actor.default-dispatcher-21] INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ClusterSystem/system/receptionist/akka.tcp%3A%2F%2FClusterSystem2%40127.0.0.1%3A2553%2FdeadLetters#188707926] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#1119990682] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 

と送信側のアプリのメッセージでは、正常に送信encounterdログに表示されています。

[ClusterSystem2-akka.actor.default-dispatcher-22] DEBUG akka.cluster.client.ClusterClient - Sending buffered messages to receptionist 

答えて

0

本当に意味がないという方法でClusterClientを使用して、分散パブサブを使用して、両方のあなたのノードがクラスタの一部であるとして、あなただけの分散パブのサブを使用することができるとは何の関係もありません。 APIを直接。ここで

は期待通りに動作するあなたの正確なパブリッシャーとサブスクライバーの俳優を使って2つのノードクラスタを作成する簡単なメイン含むconfigです:配達のいずれかの保証を与えるものではありませんパブサブを分散

public static void main(String[] args) throws Exception { 

    final Config config = ConfigFactory.parseString(
    "akka.actor.provider=cluster\n" + 
    "akka.remote.netty.tcp.port=2551\n" + 
    "akka.cluster.seed-nodes = [ \"akka.tcp://[email protected]:2551\"]\n"); 

    ActorSystem node1 = ActorSystem.create("ClusterSystem", config); 
    ActorSystem node2 = ActorSystem.create("ClusterSystem", 
    ConfigFactory.parseString("akka.remote.netty.tcp.port=2552") 
     .withFallback(config)); 

    // wait a bit for the cluster to form 
    Thread.sleep(3000); 

    ActorRef subscriber = node1.actorOf(
    Props.create(Subscriber.class), 
    "subscriber"); 

    ActorRef publisher = node2.actorOf(
    Props.create(Publisher.class), 
    "publisher"); 

    // wait a bit for the subscription to be gossiped 
    Thread.sleep(3000); 

    publisher.tell("testMessage1", ActorRef.noSender()); 
} 

注、そうであればメディエータが互いに連絡を取る前にメッセージを送信すると、メッセージは単純に失われます(したがって、実際のコードで行うべきではありません。Thread.sleepステートメント)。

関連する問題