(後に入社読者のために再プレイするすべてのことができるようにする)このライターの性質によって、どのようにあなたがそれを使用し、メモリ内のすべてのものを維持することは非常に危険であると多くのメモリ、または原因を要求するかもしれませんメモリ不足のためにアプリがクラッシュする可能性があります。
「トラフィックが少ない」ロガーでメモリ内のすべての情報を保持することは、おそらく問題ありませんが、たとえば、一部のオーディオやビデオをストリーミングすることはほとんどありません。
以下の読者の実装では、バッファに書き込まれたすべてのデータを読み込むと、Read()
メソッドはio.EOF
と正しく報告されます。 io.EOF
に遭遇した場合、いくつかの構造体(bufio.Scanner
など)でさらにデータを読み取れない可能性があります(ただし、これは実装の欠陥ではありません)。
バッファーにデータがない場合はバッファーを待機させ、io.EOF
を返す代わりに新しいデータが書き込まれるまで待つ場合は、返されたリーダーをここに示された「テールリーダー」にラップすることができます:Go: "tail -f"-like generator。ここで
「メモリー・安全な」ファイル実装
は非常にシンプルかつエレガントなソリューションです。これは、ファイルへの書き込みに使用し、読み込みにもファイルを使用します。同期は基本的にオペレーティングシステムによって提供されます。これは、データがディスクに格納されているため、メモリ不足のエラーになることはありません。あなたの作家の性質によって、これは十分かもしれません。
Close()
がファイルの場合に重要であるため、次のインターフェイスを使用します。
type MyBuf interface {
io.WriteCloser
NewReader() (io.ReadCloser, error)
}
と実装は非常に簡単です:
type mybuf struct {
*os.File
}
func (mb *mybuf) NewReader() (io.ReadCloser, error) {
f, err := os.Open(mb.Name())
if err != nil {
return nil, err
}
return f, nil
}
func NewMyBuf(name string) (MyBuf, error) {
f, err := os.Create(name)
if err != nil {
return nil, err
}
return &mybuf{File: f}, nil
}
当社mybuf
タイプは*os.File
を埋め込むので、私たちは "自由" のためのWrite()
とClose()
メソッドを取得します。
NewReader()
は、既存のバッキングファイルを読み取り専用(読み取り専用モード)で開き、それを再度使用して、io.ReadCloser
を実装します。
新しいMyBuf
値を作成すると、NewMyBuf()
関数に実装されます。この関数は、ファイルの作成に失敗するとerror
を返すこともあります。
注:
注mybuf
が*os.File
を埋め込むために、それは彼らがMyBuf
インタフェースの一部ではないにもかかわらず、os.File
の他のエクスポート方法を「到達」するtype assertionで可能であること。私はこの問題を考慮していませんが、これを拒否するには、mybuf
の実装をos.File
に埋め込むのではなく、名前付きフィールドとして使用する必要があります(ただし、Write()
とClose()
のメソッドを自分で追加する必要があります。 os.File
フィールドに正しく転送してください)。
インメモリ実装
ファイル実装が十分でない場合は、ここでは、メモリ内の実装が付属しています。
我々だけで、メモリ今だので、私たちは次のインタフェースを使用します。
type MyBuf interface {
io.Writer
NewReader() io.Reader
}
アイデアは、これまで私たちのバッファに渡されるすべてのバイトのスライスを格納することです。読者は、Read()
が呼び出されると、格納されたスライスを提供します。各リーダーは、格納されているスライスの数を、Read()
メソッドで処理したものを追跡します。同期は対処しなければなりません。単純なsync.RWMutex
を使用します。さらに騒ぎがなければ、ここでの実装
されています:
type mybuf struct {
data [][]byte
sync.RWMutex
}
func (mb *mybuf) Write(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
// Cannot retain p, so we must copy it:
p2 := make([]byte, len(p))
copy(p2, p)
mb.Lock()
mb.data = append(mb.data, p2)
mb.Unlock()
return len(p), nil
}
type mybufReader struct {
mb *mybuf // buffer we read from
i int // next slice index
data []byte // current data slice to serve
}
func (mbr *mybufReader) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
// Do we have data to send?
if len(mbr.data) == 0 {
mb := mbr.mb
mb.RLock()
if mbr.i < len(mb.data) {
mbr.data = mb.data[mbr.i]
mbr.i++
}
mb.RUnlock()
}
if len(mbr.data) == 0 {
return 0, io.EOF
}
n = copy(p, mbr.data)
mbr.data = mbr.data[n:]
return n, nil
}
func (mb *mybuf) NewReader() io.Reader {
return &mybufReader{mb: mb}
}
func NewMyBuf() MyBuf {
return &mybuf{}
}
注Writer.Write()
の一般的な契約は実装が渡されたスライスを保持していなければならないことを含んでいるので、私たちは、記憶」の前にそれのコピーを作成する必要があること" それ。
リーダのRead()
は、最小限の時間だけロックしようとします。つまり、バッファから新しいデータスライスが必要な場合にのみロックし、読み取りロックを行います。つまり、リーダーに部分データスライスがある場合は、バッファをロックしてタッチすることなくRead()
に送ります。
https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should -know-about-real-time-datas-unifying https://kafka.apache.org/intro – dm03514
標準ライブラリの何もこれを行いません。ただし、チャンネルを中心に構築されたカスタム構造を使用することもできます。各リーダーは、チャンネルに読み込んだ内容をエコーして、他の読者が読むことができます。問題は限界がどこであるかを定義しています。あなたは、遅い読者のために古いデータを再生できるようにしたいが、それは新しい読者がいつ参加するかを決して知らないので、すべてのデータを永久に(つまり、プログラムの寿命の間)保持することを意味する。これは大きなメモリリークのリスクです。 – Kaedys
古いデータを保持して再生することの重要性が低い場合、これはシングルブロードキャスタとマルチレシーバのシステムを確実に実装します。https://rogpeppe.wordpress.com/2009/12/01/concurrent-idioms-1-broadcasting関連チャンネル連動チャンネル/ – Kaedys