2017-04-25 5 views
0

AxonFrameworkでJGroupsを実装していますが、私はthisリンクを参照しています。 Dockerを使わずにコードを変更してプロジェクトを実行しました。CommandGatewayからすべてのイベントを単一のイベントハンドラにルーティングします

メインクラス - - 以下は、私のコードです

public class ClusterRunner { 

    public static void main(String[] args) { 

     Thread t1 = new Thread(new PrimaryNode()); 
     Thread t2 = new Thread(new SecondaryNode()); 

     t1.start(); 
     t2.start(); 
    } 
} 

プライマリノード -

import org.axonframework.commandhandling.AggregateAnnotationCommandHandler; 
import org.axonframework.commandhandling.CommandBus; 
import org.axonframework.commandhandling.SimpleCommandBus; 
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy; 
import org.axonframework.commandhandling.distributed.DistributedCommandBus; 
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll; 
import org.axonframework.commandhandling.gateway.CommandGateway; 
import org.axonframework.commandhandling.gateway.DefaultCommandGateway; 
import org.axonframework.commandhandling.model.Repository; 
import org.axonframework.eventsourcing.EventSourcingRepository; 
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore; 
import org.axonframework.eventsourcing.eventstore.EventStore; 
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine; 
import org.axonframework.jgroups.commandhandling.JGroupsConnector; 
import org.axonframework.serialization.xml.XStreamSerializer; 
import org.jgroups.JChannel; 

public class PrimaryNode implements Runnable { 

    private JGroupsConnector connector; 

    private CommandGateway commandGateway; 

    private EventStore eventStore; 

    private CommandBus commandBus; 

    public PrimaryNode() { 

     eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine()); 

     try { 

      commandBus = configureDistributedCommandBus(); 

     } catch (Exception e) { 

      e.printStackTrace(); 
     } 

     Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore); 

     new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus); 

     commandGateway = new DefaultCommandGateway(commandBus); 
    } 

    public void run() { 

     for (int a = 0; a < 5; a++) { 

      System.out.println("Primary Node Created item " + a + " id: " + System.currentTimeMillis()); 
      commandGateway.sendAndWait(new CreateItem(Long.toString(a), Long.toString(System.currentTimeMillis()))); 
     } 
    } 

    private CommandBus configureDistributedCommandBus() throws Exception { 

     CommandBus commandBus = new SimpleCommandBus(); 

     JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml")); 

     connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(), 
      new AnnotationRoutingStrategy()); 
     connector.updateMembership(100, AcceptAll.INSTANCE); 

     connector.connect(); 
     connector.awaitJoined(); 

     return new DistributedCommandBus(connector, connector); 
    } 
} 

Secondayノード -

import org.axonframework.commandhandling.AggregateAnnotationCommandHandler; 
import org.axonframework.commandhandling.CommandBus; 
import org.axonframework.commandhandling.SimpleCommandBus; 
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy; 
import org.axonframework.commandhandling.distributed.DistributedCommandBus; 
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll; 
import org.axonframework.commandhandling.gateway.CommandGateway; 
import org.axonframework.commandhandling.gateway.DefaultCommandGateway; 
import org.axonframework.commandhandling.model.Repository; 
import org.axonframework.eventhandling.EventListener; 
import org.axonframework.eventhandling.SimpleEventHandlerInvoker; 
import org.axonframework.eventhandling.SubscribingEventProcessor; 
import org.axonframework.eventsourcing.EventSourcingRepository; 
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore; 
import org.axonframework.eventsourcing.eventstore.EventStore; 
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine; 
import org.axonframework.jgroups.commandhandling.JGroupsConnector; 
import org.axonframework.serialization.xml.XStreamSerializer; 
import org.jgroups.JChannel; 

public class SecondaryNode implements Runnable { 

    private JGroupsConnector connector; 

    private EventStore eventStore; 

    public SecondaryNode() { 

     eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine()); 

     CommandBus commandBus = null; 

     try { 
      commandBus = configureDistributedCommandBus(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

     Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore); 

     new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus); 

     @SuppressWarnings("unused") 
     CommandGateway commandGateway = new DefaultCommandGateway(commandBus); 
    } 

    public void run() { 

     new SubscribingEventProcessor("processor", new SimpleEventHandlerInvoker((EventListener) event -> { 

      System.out.println("Secondary Node -- " + event.getPayload()); 
     }), eventStore).start(); 
    } 

    private CommandBus configureDistributedCommandBus() throws Exception { 

     CommandBus commandBus = new SimpleCommandBus(); 

     JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp_test.xml")); 

     connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(), 
       new AnnotationRoutingStrategy()); 
     connector.updateMembership(100, AcceptAll.INSTANCE); 

     connector.connect(); 
     connector.awaitJoined(); 

     return new DistributedCommandBus(connector, connector); 
    } 
} 

項目は - 私はメインクラスを実行すると

import org.axonframework.commandhandling.CommandHandler; 
import org.axonframework.commandhandling.TargetAggregateIdentifier; 
import org.axonframework.commandhandling.model.AggregateIdentifier; 
import org.axonframework.eventhandling.EventHandler; 

import static org.axonframework.commandhandling.model.AggregateLifecycle.apply; 

class CreateItem { 

    @TargetAggregateIdentifier 
    private final String itemId; 
    private final String name; 

    public CreateItem(String itemId, String naam) { 
     this.itemId = itemId; 
     this.name = naam; 
    } 

    public String getItemId() { 
     return itemId; 
    } 

    public String getName() { 
     return name; 
    } 
} 

class ItemCreated { 
    private final String itemId; 
    private final String name; 

    public ItemCreated(String itemId, String naam) { 
     this.itemId = itemId; 
     this.name = naam; 
    } 

    public String getItemId() { 
     return itemId; 
    } 

    public String getName() { 
     return name; 
    } 

    @Override 
    public String toString() { 

     return itemId + " " + name; 
    } 
} 

class Item { 
    @AggregateIdentifier 
    private String itemId; 
    private String name; 

    public Item() { 

    } 

    @CommandHandler 
    public Item(CreateItem createItem) { 
     apply(new ItemCreated(createItem.getItemId(), createItem.getName())); 
    } 

    @EventHandler 
    public void itemCreated(ItemCreated itemCreated) { 
     itemId = itemCreated.getItemId(); 
     name = itemCreated.getName(); 
    } 
} 

は今、私の問題は、プライマリノードが5のイベントを生成しているが、二次ノードは、すべてのイベントを取得されていません。それは2つまたは3つまたは4つのイベントを得ることができるが、すべてではない。私はすべてのイベントをセカンダリノードに配信したい。私はAxonFrameworkとJGroupsの新機能です。ここで何が問題なのか理解してください。

答えて

0

すべてを試した後、ルーティング戦略を試してみることにしました。決定的な宛先を持たないコマンドメッセージの意思決定に基本的に役立つAbstractRoutingStrategyを使用することに決めました。以下は、JGroupのプライマリノード(送信者)にある作業コードです。 PrimaryNodeクラスからconfigureDistributedCommandBus()メソッドを変更します - 私はJGroupsのを使用しておりますので

private CommandBus configureDistributedCommandBus() throws Exception { 

    CommandBus commandBus = new SimpleCommandBus(); 

    channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml")); 

    RoutingStrategy rs = new AbstractRoutingStrategy(UnresolvedRoutingKeyPolicy.STATIC_KEY) { 

     @Override 
     protected String doResolveRoutingKey(CommandMessage<?> cmdMsg) { 

      View view = channel.getView(); 

      if (view.getMembers().size() == 2) { 

       return "secondary"; 

      } else if (view.getMembers().size() == 1) { 

      } 

      return cmdMsg.getIdentifier(); 
     } 
    }; 

    connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(), rs); 
    connector.updateMembership(100, AcceptAll.INSTANCE); 

    connector.connect(); 
    connector.awaitJoined(); 

    return new DistributedCommandBus(connector, connector); 
} 

、私はそこにある、すなわちどのように多くのノード、クラスタのビューを取得することができます。それに基づいて、私はコマンドメッセージルーティングの決定を下します。

1

デフォルトでは、Axonは各イベントハンドラをイベントバス(あなたの場合はEmbeddedEventStore)に登録します。つまり、特定のローカルインスタンスがイベントをパブリッシュするときにハンドラが呼び出されます。そのイベントはコマンドを処理するときに公開されます。したがって、本質的に、イベントハンドラは、コマンドを処理するノードで呼び出されます。

また、イベントハンドラを「追跡」モードで実行するように設定することもできます。その場合、イベントストアへの接続が開かれます。その場合、正確な構成に応じて、各ノードは、発行された場所に関係なく、イベントの独自のコピーを取得できます。

関連する問題