2017-06-01 5 views
1

複数の同時読者と1人のライターをサポートするGoでバッファを構築したいとします。バッファーに書き込まれるものは、すべての読者が読む必要があります。新しい読者はいつでも辞任することができます。これは、既に書かれたデータが、読者のために再生できる必要があることを意味します。同時読者があるゴラン・バッファ

バッファは、次のインタフェースを満たす必要があります。

type MyBuffer interface { 
    Write(p []byte) (n int, err error) 
    NextReader() io.Reader 
} 

あなたが好ましく種類に建て使用して、このような実装のための任意の提案を持っていますか?

+0

https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should -know-about-real-time-datas-unifying https://kafka.apache.org/intro – dm03514

+2

標準ライブラリの何もこれを行いません。ただし、チャンネルを中心に構築されたカスタム構造を使用することもできます。各リーダーは、チャンネルに読み込んだ内容をエコーし​​て、他の読者が読むことができます。問題は限界がどこであるかを定義しています。あなたは、遅い読者のために古いデータを再生できるようにしたいが、それは新しい読者がいつ参加するかを決して知らないので、すべてのデータを永久に(つまり、プログラムの寿命の間)保持することを意味する。これは大きなメモリリークのリスクです。 – Kaedys

+0

古いデータを保持して再生することの重要性が低い場合、これはシングルブロードキャスタとマルチレシーバのシステムを確実に実装します。https://rogpeppe.wordpress.com/2009/12/01/concurrent-idioms-1-broadcasting関連チャンネル連動チャンネル/ – Kaedys

答えて

3

(後に入社読者のために再プレイするすべてのことができるようにする)このライターの性質によって、どのようにあなたがそれを使用し、メモリ内のすべてのものを維持することは非常に危険であると多くのメモリ、または原因を要求するかもしれませんメモリ不足のためにアプリがクラッシュする可能性があります。

「トラフィックが少ない」ロガーでメモリ内のすべての情報を保持することは、おそらく問題ありませんが、たとえば、一部のオーディオやビデオをストリーミングすることはほとんどありません。

以下の読者の実装では、バッファに書き込まれたすべてのデータを読み込むと、Read()メソッドはio.EOFと正しく報告されます。 io.EOFに遭遇した場合、いくつかの構造体(bufio.Scannerなど)でさらにデータを読み取れない可能性があります(ただし、これは実装の欠陥ではありません)。

バッファーにデータがない場合はバッファーを待機させ、io.EOFを返す代わりに新しいデータが書き込まれるまで待つ場合は、返されたリーダーをここに示された「テールリーダー」にラップすることができます:Go: "tail -f"-like generator。ここで

「メモリー・安全な」ファイル実装

は非常にシンプルかつエレガントなソリューションです。これは、ファイルへの書き込みに使用し、読み込みにもファイルを使用します。同期は基本的にオペレーティングシステムによって提供されます。これは、データがディスクに格納されているため、メモリ不足のエラーになることはありません。あなたの作家の性質によって、これは十分かもしれません。

Close()がファイルの場合に重要であるため、次のインターフェイスを使用します。

type MyBuf interface { 
    io.WriteCloser 
    NewReader() (io.ReadCloser, error) 
} 

と実装は非常に簡単です:

type mybuf struct { 
    *os.File 
} 

func (mb *mybuf) NewReader() (io.ReadCloser, error) { 
    f, err := os.Open(mb.Name()) 
    if err != nil { 
     return nil, err 
    } 
    return f, nil 
} 

func NewMyBuf(name string) (MyBuf, error) { 
    f, err := os.Create(name) 
    if err != nil { 
     return nil, err 
    } 
    return &mybuf{File: f}, nil 
} 

当社mybufタイプは*os.Fileを埋め込むので、私たちは "自由" のためのWrite()Close()メソッドを取得します。

NewReader()は、既存のバッキングファイルを読み取り専用(読み取り専用モード)で開き、それを再度使用して、io.ReadCloserを実装します。

新しいMyBuf値を作成すると、NewMyBuf()関数に実装されます。この関数は、ファイルの作成に失敗するとerrorを返すこともあります。

注:

mybuf*os.Fileを埋め込むために、それは彼らがMyBufインタフェースの一部ではないにもかかわらず、os.Fileの他のエクスポート方法を「到達」するtype assertionで可能であること。私はこの問題を考慮していませんが、これを拒否するには、mybufの実装をos.Fileに埋め込むのではなく、名前付きフィールドとして使用する必要があります(ただし、Write()Close()のメソッドを自分で追加する必要があります。 os.Fileフィールドに正しく転送してください)。

インメモリ実装

ファイル実装が十分でない場合は、ここでは、メモリ内の実装が付属しています。

我々だけで、メモリ今だので、私たちは次のインタフェースを使用します。

type MyBuf interface { 
    io.Writer 
    NewReader() io.Reader 
} 

アイデアは、これまで私たちのバッファに渡されるすべてのバイトのスライスを格納することです。読者は、Read()が呼び出されると、格納されたスライスを提供します。各リーダーは、格納されているスライスの数を、Read()メソッドで処理したものを追跡します。同期は対処しなければなりません。単純なsync.RWMutexを使用します。さらに騒ぎがなければ、ここでの実装

されています:

type mybuf struct { 
    data [][]byte 
    sync.RWMutex 
} 

func (mb *mybuf) Write(p []byte) (n int, err error) { 
    if len(p) == 0 { 
     return 0, nil 
    } 
    // Cannot retain p, so we must copy it: 
    p2 := make([]byte, len(p)) 
    copy(p2, p) 
    mb.Lock() 
    mb.data = append(mb.data, p2) 
    mb.Unlock() 
    return len(p), nil 
} 

type mybufReader struct { 
    mb *mybuf // buffer we read from 
    i int // next slice index 
    data []byte // current data slice to serve 
} 

func (mbr *mybufReader) Read(p []byte) (n int, err error) { 
    if len(p) == 0 { 
     return 0, nil 
    } 
    // Do we have data to send? 
    if len(mbr.data) == 0 { 
     mb := mbr.mb 
     mb.RLock() 
     if mbr.i < len(mb.data) { 
      mbr.data = mb.data[mbr.i] 
      mbr.i++ 
     } 
     mb.RUnlock() 
    } 
    if len(mbr.data) == 0 { 
     return 0, io.EOF 
    } 

    n = copy(p, mbr.data) 
    mbr.data = mbr.data[n:] 
    return n, nil 
} 

func (mb *mybuf) NewReader() io.Reader { 
    return &mybufReader{mb: mb} 
} 

func NewMyBuf() MyBuf { 
    return &mybuf{} 
} 

Writer.Write()の一般的な契約は実装が渡されたスライスを保持していなければならないことを含んでいるので、私たちは、記憶」の前にそれのコピーを作成する必要があること" それ。

リーダのRead()は、最小限の時間だけロックしようとします。つまり、バッファから新しいデータスライスが必要な場合にのみロックし、読み取りロックを行います。つまり、リーダーに部分データスライスがある場合は、バッファをロックしてタッチすることなくRead()に送ります。

+0

ありがとうございました。エレガントなソリューション。 'io.WriteCloser'を使用したあなたの提案は良い考えです。理想的には、読者の 'Read()'メソッドは、ライターが閉じられ、その時点で 'io.EOF'を受け取るまで、より多くのデータを待つでしょう。メモリ消費に関しては、バッファをメモリ内で開始し、一定のサイズを超えたらディスクにデータをダンプすることができます。私はそれに向かって私の道を進めるためにあなたの提案を使用します。もう一度ありがとうございます。 – Tympanix

1

あなたの要件に非常によく似ているので、私は追加ログだけをコミットログにリンクしました。私は分散システムとコミットログにはかなり新しいので、いくつかのコンセプトを打ち破っているかもしれませんが、カフカの紹介ではすてきなチャートですべてを明確に説明しています。

Goがまた私にはかなり新しいですので、私はそれを行うには良い方法があります確信している:

しかし、おそらくあなたは、スライスとして、あなたのバッファをモデル化することができ、私は例のカップルを考える:

  • バッファがない読者を持っていない、新たなデータがバッファに書き込まれ、バッファ長が成長
  • バッファ1 /多くのリーダー(複数可)を有する:

    • リーダsubscを>
  • すべてのクライアントチャンネルをループし、それにパブリッシュ(パブサブ) -
  • バッファをバッファリングするリベスは
  • バッファは、クライアントチャネルのリスト
  • 書き込みが発生を維持するクライアントへのチャネルを作成して返します

これは、メッセージがファンアウトされるpubsubリアルタイムコンシューマストリームに対処しますが、バックフィルには対処しません。

カフカは埋め戻しを可能にし、それはこれがオフセット:)

を行うことができますどのように彼らのintro illustratesは、消費者によって制御されます。通常、消費者は 実際には、それがレコードを読み込むように、その直線的にオフセット推進しますが、 の位置は消費者によって制御されているので、好きな順序で のレコードを消費することができます。たとえば、消費者は、過去のデータを再処理するために、古い オフセットにリセットすることができます。または、最も最近のレコード にスキップし、「今」から消費を開始することができます。

このような機能の組み合わせにより、Kafkaのコンシューマは安価で であることがわかります。クラスタに大きな影響を与えることなく、または他のコンシューマの に安価に出入りすることができます。たとえば、コマンドラインツールを使用すると、 は、既存のコンシューマが消費するものを変更せずに、トピックの内容を「テール」することができます。

0

私は実験の一環として、類似した何かをしなければならなかったので、共有:

type MultiReaderBuffer struct { 
    mu sync.RWMutex 
    buf []byte 
} 

func (b *MultiReaderBuffer) Write(p []byte) (n int, err error) { 
    if len(p) == 0 { 
     return 0, nil 
    } 
    b.mu.Lock() 
    b.buf = append(b.buf, p...) 
    b.mu.Unlock() 
    return len(p), nil 
} 

func (b *MultiReaderBuffer) NewReader() io.Reader { 
    return &mrbReader{mrb: b} 
} 

type mrbReader struct { 
    mrb *MultiReaderBuffer 
    off int 
} 

func (r *mrbReader) Read(p []byte) (n int, err error) { 
    if len(p) == 0 { 
     return 0, nil 
    } 
    r.mrb.mu.RLock() 
    n = copy(p, r.mrb.buf[r.off:]) 
    r.mrb.mu.RUnlock() 
    if n == 0 { 
     return 0, io.EOF 
    } 
    r.off += n 
    return n, nil 
} 
関連する問題