2016-07-24 4 views
0

サービスファブリック用のAkka.Persistenceの実装を書いていますが、スナップショット機能を使用できないようです。状態を回復しようとすると、最新のスナップショットが取得されますが、最新のスナップショット以降のイベントは再生されません。私には分かりませんコンポーネントを正しく接続していない場合や、パーシスタンスライブラリの実装が正しくない場合です。 私の俳優は簡単なカウンターです、私の状態はちょうど現在のカウントです。 私は、Recoverが最初に呼び出されるはずで、最後のスナップショットと最も高いシーケンス番号の間の各ジャーナルエントリに対してRecoverが呼び出されることを期待しています。ジャーナルにReplayMessagesAsync(...)という関数がありますが、これは実行する必要がありますが呼び出されることはありません。 私のカウンターのためのコードの下で、私のコードの残りの部分は次のとおりです。 1)私は渡されたものを返すために必要な私のSnapshotEntryは、ありません:CodeAkka.Persistanceがジャーナルエントリを再生しないのはなぜですか?

using Akka.Actor; 
using Akka.Persistence; 
using Akka.Persistence.ServiceFabric.Journal; 
using Akka.Persistence.ServiceFabric.Snapshot; 
using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading.Tasks; 

namespace AkkaPersistence.Actors 
{ 
    public class Counter : ReceivePersistentActor 
    { 
     public class GetCount { } 

     private int counter; 

     private CounterState State = new CounterState(); 

     private int _msgsSinceLastSnapshot = 0; 

     public Counter() 
     { 
      Recover<Evt>(evt => 
      { 
       State.Update(evt); 
      }); 

      Recover<SnapshotOffer>(offer => { 
       var snapshotEntry = offer.Snapshot as SnapshotEntry; 
       if (snapshotEntry != null) 
       { 
        State = (CounterState)snapshotEntry.Snapshot; 
       } 
      }); 

      Command<string>(str => Persist(str, s => 
      { 
       ++counter; 
       var evt = new Evt(s); 
       State.Update(evt); 

       if (++_msgsSinceLastSnapshot % 10 == 0) 
       { 
        //time to save a snapshot 
        SaveSnapshot(State.Copy()); 
       } 
      })); 

      Command<GetCount>(get => Sender.Tell(State.Count)); 

      Command<SaveSnapshotSuccess>(success => 
      { 
       ServiceEventSource.Current.Message($"Saved snapshot"); 
       DeleteMessages(success.Metadata.SequenceNr); 
      }); 

      Command<SaveSnapshotFailure>(failure => { 
       // handle snapshot save failure... 
       ServiceEventSource.Current.Message($"Snapshot failure"); 
      }); 

     } 

     public override string PersistenceId 
     { 
      get 
      { 
       return "counter"; 
      } 
     } 
    } 

    internal class CounterState 
    { 
     private long count = 0L; 

     public long Count 
     { 
      get { return count; } 
      set { count = value; } 
     } 

     public CounterState(long count) 
     { 
      this.Count = count; 
     } 

     public CounterState() : this(0) 
     { 
     } 

     public CounterState Copy() 
     { 
      return new CounterState(count); 
     } 

     public void Update(Evt evt) 
     { 
      ++Count; 
     } 
    } 

    public class Evt 
    { 
     public Evt(string data) 
     { 
      Data = data; 
     } 

     public string Data { get; } 

    } 

    public class Cmd 
    { 
     public Cmd(string data) 
     { 
      Data = data; 
     } 

     public string Data { get; } 
    } 

} 

答えて

0

私が間違っていた物事のカップルがありました私の永続化メカニズムの実装の詳細です。 2)文字列を保存することから翻訳して、ジャーナルの一部としてオブジェクトを保存しようとする単純なミス。 3)最後にもう1つ問題がありました。これは根本的な問題です。つまり、シリアライズが子オブジェクトで失敗していました。このコードでは、子オブジェクトの種類を含める必要はありませんでしたので、ジャーナルと既存のSnapshotSerializerのカスタムシリアライザ(the Wire serializer)を追加しました。現在は動作しています。

関連する問題