2017-03-15 8 views
2

さまざまなプロセス(またはノード)で動作しているアクターに、フォールトトレランスとロードバランシングを維持しながら、異なるプロセス(またはノード)から実行されている他のアクターにメッセージを送信したいと考えています。私は現在Akka.ClusterのSharding機能を使用してこれを達成しようとしています。1つのプロセスでアクターを実行させるにはどうしたら別のプロセスで実行されている別のアクターにメッセージを送信できますか?

しかし、私は私が私のシードノードを反映して、次のコードが...

これを実現する方法がわからないです::出力は次のようになります

let configurePort port = 
    let config = Configuration.parse (""" 
     akka { 
      actor { 
       provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster" 
       serializers { 
       hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion" 
       } 
       serialization-bindings { 
       "System.Object" = hyperion 
       } 
      } 
      remote { 
      helios.tcp { 
       public-hostname = "localhost" 
       hostname = "localhost" 
       port = """ + port.ToString() + """ 
      } 
      } 
      cluster { 
      auto-down-unreachable-after = 5s 
      seed-nodes = [ "akka.tcp://[email protected]:2551/" ] 
      } 
      persistence { 
      journal.plugin = "akka.persistence.journal.inmem" 
      snapshot-store.plugin = "akka.persistence.snapshot-store.local" 
      } 
     } 
     """) 
    config.WithFallback(ClusterSingletonManager.DefaultConfig()) 

let consumer (actor:Actor<_>) msg = printfn "\n%A received %s" (actor.Self.Path.ToStringWithAddress()) msg |> ignored 

// spawn two separate systems with shard regions on each of them 
let system1 = System.create "cluster-system" (configurePort 2551) 
let shardRegion1 = spawnSharded id system1 "shardRegion1" <| props (actorOf2 consumer) 
System.Threading.Thread.Sleep(1000) 

let system2 = System.create "cluster-system" (configurePort 2552) 
let shardRegion2 = spawnSharded id system2 "shardRegion2" <| props (actorOf2 consumer) 
System.Threading.Thread.Sleep(1000) 

let system3 = System.create "cluster-system" (configurePort 2553) 
let shardRegion3 = spawnSharded id system3 "shardRegion3" <| props (actorOf2 consumer) 
System.Threading.Thread.Sleep(3000) 


// NOTE: Even thou we sent all messages through single shard region, 
//  some of them will be executed on the second and third one thanks to shard balancing 
System.Threading.Thread.Sleep(3000) 
shardRegion1 <! ("shard-1", "entity-1", "hello world 1") 
shardRegion1 <! ("shard-1", "entity-2", "hello world 2") 
shardRegion1 <! ("shard-2", "entity-3", "hello world 3") 
shardRegion1 <! ("shard-2", "entity-4", "hello world 4") 

System.Threading.Thread.Sleep(1000) 

let printShards shardRegion = 
    async { 
     let! (reply:AskResult<ShardRegionStats>) = (retype shardRegion) <? GetShardRegionStats.Instance 
     let (stats: ShardRegionStats) = reply.Value 
     for kv in stats.Stats do 
      printfn "\tShard '%s' has %d entities on it" kv.Key kv.Value 
    } |> Async.RunSynchronously 

let printNodes() = 
    printfn "\nShards active on node 'localhost:2551':" 
    printShards shardRegion1 
    printfn "\nShards active on node 'localhost:2552':" 
    printShards shardRegion2 
    printfn "\nShards active on node 'localhost:2553':" 
    printShards shardRegion3 

printNodes() 

Shards active on node 'localhost:2551': 
    Shard 'shard-1' has 2 entities on it 
    Shard 'shard-2' has 2 entities on it 

ノード 'localhost:2552'で有効なシャード:

Iは、次のコードを実行する別のプロセスがあります

let configurePort port = 
    let config = Configuration.parse (""" 
     akka { 
      actor { 
       provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster" 
       serializers { 
       hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion" 
       } 
       serialization-bindings { 
       "System.Object" = hyperion 
       } 
      } 
      remote { 
      helios.tcp { 
       public-hostname = "localhost" 
       hostname = "localhost" 
       port = "0" 
      } 
      } 
      cluster { 
      auto-down-unreachable-after = 5s 
      seed-nodes = [ "akka.tcp://[email protected]:2551/" ] 
      } 
      persistence { 
      journal.plugin = "akka.persistence.journal.inmem" 
      snapshot-store.plugin = "akka.persistence.snapshot-store.local" 
      } 
     } 
     """) 
    config.WithFallback(ClusterSingletonManager.DefaultConfig()) 

let consumer (actor:Actor<_>) msg = printfn "\n%A received %s" (actor.Self.Path.ToStringWithAddress()) msg |> ignored 

// spawn two separate systems with shard regions on each of them 
let system1 = System.create "cluster-system" (configurePort 2554) 
let shardRegion1 = spawnSharded id system1 "printer" <| props (actorOf2 consumer) 
System.Threading.Thread.Sleep(1000) 

let system2 = System.create "cluster-system" (configurePort 2555) 
let shardRegion2 = spawnSharded id system2 "printer" <| props (actorOf2 consumer) 
System.Threading.Thread.Sleep(1000) 

let system3 = System.create "cluster-system" (configurePort 2556) 
let shardRegion3 = spawnSharded id system3 "printer" <| props (actorOf2 consumer) 

を(別のプロセスで実行されている)私のクラスタシステムが参加している新しいノードを認識し

> [INFO][3/15/2017 9:12:13 PM][Thread 0054][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://[email protected]:52953] is JOINING, roles [] 
[INFO][3/15/2017 9:12:14 PM][Thread 0006][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://[email protected]:52956] is JOINING, roles [] 
[INFO][3/15/2017 9:12:15 PM][Thread 0054][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://[email protected]:52961] is JOINING, roles [] 
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://[email protected]:52953] to [Up] 
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://[email protected]:52956] to [Up] 
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://[email protected]:52961] to [Up] 

結論:

さまざまなプロセス(またはノード)上で動作するアクタに、フォールトトレランスとロードバランシングを維持しながら、異なるプロセス(またはノード)から実行されている他のアクターにメッセージを送信させたいと考えています。私は現在Akka.ClusterのSharding機能を使用してこれを達成しようとしています。

付録:プロセスのすべてに表示されるデータベースを指している必要があり永続的なバックエンドをAkka.Cluster.Sharding、破片及びそれらの位置の一貫したビューを維持するために

open System 
open System.IO 
#if INTERACTIVE 
let cd = Path.Combine(__SOURCE_DIRECTORY__, "../src/Akkling.Cluster.Sharding/bin/Debug") 
System.IO.Directory.SetCurrentDirectory(cd) 
#endif 

#r "../src/Akkling.Cluster.Sharding/bin/Debug/System.Collections.Immutable.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Hyperion.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Newtonsoft.Json.dll" 
#r @"C:\Users\Snimrod\Documents\Visual Studio 2015\Projects\Temp\packages\Akka.FSharp.1.1.3\lib\net45\Akka.FSharp.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FSharp.PowerPack.Linq.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Helios.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FsPickler.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.Serialization.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Remote.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Persistence.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Tools.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Sharding.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Serialization.Hyperion.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Persistence.dll" 
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Cluster.Sharding.dll" 


open Akka.Actor 
open Akka.Configuration 
open Akka.Cluster 
open Akka.Cluster.Tools.Singleton 
open Akka.Cluster.Sharding 
open Akka.Persistence 

open Akkling 
open Akkling.Persistence 
open Akkling.Cluster 
open Akkling.Cluster.Sharding 
open Hyperion 

答えて

4

。あなたの設定では、メモリ内のデータストアであるakka.persistence.journal.inmemを使用しています(テストと開発にのみ使用されます)。他のプロセスからは表示されません。

異なるマシン/プロセスに存在するノード間でシャードが見えるように、永続的なバックエンドを設定する必要があります。つまり、Akka.Persistence.SqlServerまたは他のプラグインを使用して行うことができます。

akka.persistence { 
    journal { 
     plugin = "akka.persistence.journal.sql-server" 
     sql-server { 
      connection-string = "<connection-string>" 
      auto-initialize = on 
     } 
    } 
    snapshot-store { 
     plugin = "akka.persistence.snapshot-store.sql-server" 
     sql-server { 
      connection-string = "<connection-string>" 
      auto-initialize = on 
     } 
    } 
} 

は、より実用的なものについては、this articleを参照してください。これは、シャーディングでのみ使用され、あなたの永続性のバックエンドのための最も基本的な構成です。

Akka.Cluster.ShardingとAkka.Persistenceの両方のプラグインは、プレリリースモードでのみ使用できるため(-preフラグを指定してパッケージをインストールする必要があります)、注意してください。

関連する問題