私は最近Goと遊んでいて、ログファイルを解析して弾性検索に挿入する小さなスクリプトを作りました。私は弾性検索に送信する機能を持っている私のprocessFileの内部並行ファイルの解析とElastic Searchへの挿入
var wg := sync.WaitGroup{}
wg.Add(len(files))
for _, file := range files {
go func(f os.FileInfo){
defer wg.Done()
ProcessFile(f.Name(), config.OriginFilePath, config.WorkingFilePath, config.ArchiveFilePath,fmt.Sprintf("http://%v:%v", config.ElasticSearch.Host, config.ElasticSearch.Port),config.ProviderIndex, config.NetworkData)
}(file)
}
wg.Wait()
:各ファイルについて、私はこのようなゴルーチンを生み出し
func BulkInsert(lines []string, ES *elastic.Client) (*elastic.Response, error){
r, err := ES.PerformRequest("POST", "/_bulk", url.Values{}, strings.Join(lines, "\n")+"\n")
if err != nil {
return nil, err
}
return r, nil
}
問題は仕事をゴルーチンどのように私は完全には理解していないということです。私の理解は、弾性検索に送信することは、私のgoroutinesの実行をブロックすることです。私の関数の戻り値の前に
WaitGroup
、go func(){defer wg.Done(); BulkInsert(elems, ES);}()
とwg.Wait()
:私は同じアプローチで一括挿入を有する弾性検索のための別のゴルーチンを産卵しようとしました。しかし、私は最終的にすべてのイベントが弾性検索に終わるわけではないことを発見しました。これはゴルーチンがバルクリクエストを送信/待機することなく戻ってきたことによるものだと思います。
私の質問は、この問題に対する私のアプローチは正しいですか?より良いパフォーマンスを達成できますか?
感謝を理解することがあります。私が理解したように、私は最初の例が必要です:必要なだけ多くのゴルーチンを生成します(ファイルのリスト長の長さにもよる)。同じアプローチ、 'wg.Add(1)'と 'wg.Wait()'を使います。 しかし、私はまだ理解していません、なぜ弾性検索バルク挿入機能の内側の 'wg'は外側のゴルーチンをブロックしないのですか? 'BulkInsert'関数が返るのを待つために' wg'を参照渡しする必要がありますか? – Disciples
'wg'はコピーしてはいけません。渡す必要がある場合はポインタを使用してください。 wgはチャネルではなく、同時実行性を構成せず、すべてが完了したときだけ通知し、Wait()呼び出しでのみブロックします。あなたの特定のバルクコードについては、読んでいないと本当に言ってもらえませんか? –