2017-09-13 4 views
0

私は最近Goと遊んでいて、ログファイルを解析して弾性検索に挿入する小さなスクリプトを作りました。私は弾性検索に送信する機能を持っている私のprocessFileの内部並行ファイルの解析とElastic Searchへの挿入

var wg := sync.WaitGroup{} 
wg.Add(len(files)) 
for _, file := range files { 
    go func(f os.FileInfo){ 
     defer wg.Done() 
     ProcessFile(f.Name(), config.OriginFilePath, config.WorkingFilePath, config.ArchiveFilePath,fmt.Sprintf("http://%v:%v", config.ElasticSearch.Host, config.ElasticSearch.Port),config.ProviderIndex, config.NetworkData) 
    }(file) 
} 
wg.Wait() 

:各ファイルについて、私はこのようなゴルーチンを生み出し

func BulkInsert(lines []string, ES *elastic.Client) (*elastic.Response, error){ 
    r, err := ES.PerformRequest("POST", "/_bulk", url.Values{}, strings.Join(lines, "\n")+"\n") 
    if err != nil { 
     return nil, err 
    } 
    return r, nil 
} 

問題は仕事をゴルーチンどのように私は完全には理解していないということです。私の理解は、弾性検索に送信することは、私のgoroutinesの実行をブロックすることです。私の関数の戻り値の前に

WaitGroupgo func(){defer wg.Done(); BulkInsert(elems, ES);}()wg.Wait():私は同じアプローチで一括挿入を有する弾性検索のための別のゴルーチンを産卵しようとしました。しかし、私は最終的にすべてのイベントが弾性検索に終わるわけではないことを発見しました。これはゴルーチンがバルクリクエストを送信/待機することなく戻ってきたことによるものだと思います。

私の質問は、この問題に対する私のアプローチは正しいですか?より良いパフォーマンスを達成できますか?

答えて

1

パフォーマンスを向上できますか?

不明な点は、受信者と送信者の能力によって異なります。

私の質問は、この問題に対する私のアプローチは正しいですか?

このヘルプは、あなたは、より良い、ルーチンを行くたとえば

package main 

import (
    "fmt" 
    "log" 
    "net/http" 
    "sync" 
    "time" 
) 

func main() { 

    addr := "127.0.0.1:2074" 

    srv := http.Server{ 
     Addr: addr, 
     Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 
      log.Println("hit ", r.URL.String()) 
      <-time.After(time.Second) 
      log.Println("done ", r.URL.String()) 
     }), 
    } 
    fail(unblock(srv.ListenAndServe)) 

    jobs := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} 

    // case 1 
    // it creates 10 goroutines, 
    // that triggers 10 // concurrent get queries 
    { 
     wg := sync.WaitGroup{} 
     wg.Add(len(jobs)) 
     log.Printf("starting %v jobs\n", len(jobs)) 
     for _, job := range jobs { 
      go func(job int) { 
       defer wg.Done() 
       http.Get(fmt.Sprintf("http://%v/job/%v", addr, job)) 
      }(job) 
     } 
     wg.Wait() 
     log.Printf("done %v jobs\n", len(jobs)) 
    } 

    log.Println() 
    log.Println("=================") 
    log.Println() 

    // case 2 
    // it creates 3 goroutines, 
    // that triggers 3 // concurrent get queries 
    { 
     wg := sync.WaitGroup{} 
     wg.Add(len(jobs)) 
     in := make(chan string) 
     limit := make(chan bool, 3) 
     log.Printf("starting %v jobs\n", len(jobs)) 
     go func() { 
      for url := range in { 
       limit <- true 
       go func(url string) { 
        defer wg.Done() 
        http.Get(url) 
        <-limit 
       }(url) 
      } 
     }() 
     for _, job := range jobs { 
      in <- fmt.Sprintf("http://%v/job/%v", addr, job) 
     } 
     wg.Wait() 
     log.Printf("done %v jobs\n", len(jobs)) 
    } 

    log.Println() 
    log.Println("=================") 
    log.Println() 

    // case 2: rewrite 
    // it creates 6 goroutines, 
    // that triggers 6 // concurrent get queries 
    { 
     wait, add := parallel(6) 
     log.Printf("starting %v jobs\n", len(jobs)) 
     for _, job := range jobs { 
      url := fmt.Sprintf("http://%v/job/%v", addr, job) 
      add(func() { 
       http.Get(url) 
      }) 
     } 
     wait() 
     log.Printf("done %v jobs\n", len(jobs)) 
    } 
} 

func parallel(c int) (func(), func(block func())) { 
    wg := sync.WaitGroup{} 
    in := make(chan func()) 
    limit := make(chan bool, c) 
    go func() { 
     for block := range in { 
      limit <- true 
      go func(block func()) { 
       defer wg.Done() 
       block() 
       <-limit 
      }(block) 
     } 
    }() 
    return wg.Wait, func(block func()) { 
     wg.Add(1) 
     in <- block 
    } 
} 

func unblock(block func() error) error { 
    w := make(chan error) 
    go func() { w <- block() }() 
    select { 
    case err := <-w: 
     return err 
    case <-time.After(time.Millisecond): 
    } 
    return nil 
} 

func fail(err error) { 
    if err != nil { 
     panic(err) 
    } 
} 

出力

$ go run main.go 
2017/09/14 01:30:50 starting 10 jobs 
2017/09/14 01:30:50 hit /job/0 
2017/09/14 01:30:50 hit /job/4 
2017/09/14 01:30:50 hit /job/5 
2017/09/14 01:30:50 hit /job/2 
2017/09/14 01:30:50 hit /job/9 
2017/09/14 01:30:50 hit /job/1 
2017/09/14 01:30:50 hit /job/3 
2017/09/14 01:30:50 hit /job/7 
2017/09/14 01:30:50 hit /job/8 
2017/09/14 01:30:50 hit /job/6 
2017/09/14 01:30:51 done /job/5 
2017/09/14 01:30:51 done /job/4 
2017/09/14 01:30:51 done /job/2 
2017/09/14 01:30:51 done /job/0 
2017/09/14 01:30:51 done /job/6 
2017/09/14 01:30:51 done /job/9 
2017/09/14 01:30:51 done /job/1 
2017/09/14 01:30:51 done /job/3 
2017/09/14 01:30:51 done /job/7 
2017/09/14 01:30:51 done /job/8 
2017/09/14 01:30:51 done 10 jobs 
2017/09/14 01:30:51 
2017/09/14 01:30:51 ================= 
2017/09/14 01:30:51 
2017/09/14 01:30:51 starting 10 jobs 
2017/09/14 01:30:51 hit /job/0 
2017/09/14 01:30:51 hit /job/2 
2017/09/14 01:30:51 hit /job/1 
2017/09/14 01:30:52 done /job/2 
2017/09/14 01:30:52 done /job/0 
2017/09/14 01:30:52 done /job/1 
2017/09/14 01:30:52 hit /job/3 
2017/09/14 01:30:52 hit /job/4 
2017/09/14 01:30:52 hit /job/5 
2017/09/14 01:30:53 done /job/3 
2017/09/14 01:30:53 done /job/4 
2017/09/14 01:30:53 done /job/5 
2017/09/14 01:30:53 hit /job/6 
2017/09/14 01:30:53 hit /job/7 
2017/09/14 01:30:53 hit /job/8 
2017/09/14 01:30:54 done /job/6 
2017/09/14 01:30:54 done /job/7 
2017/09/14 01:30:54 done /job/8 
2017/09/14 01:30:54 hit /job/9 
2017/09/14 01:30:55 done /job/9 
2017/09/14 01:30:55 done 10 jobs 
2017/09/14 01:30:55 
2017/09/14 01:30:55 ================= 
2017/09/14 01:30:55 
2017/09/14 01:30:55 starting 10 jobs 
2017/09/14 01:30:55 hit /job/0 
2017/09/14 01:30:55 hit /job/1 
2017/09/14 01:30:55 hit /job/4 
2017/09/14 01:30:55 hit /job/2 
2017/09/14 01:30:55 hit /job/3 
2017/09/14 01:30:55 hit /job/5 
2017/09/14 01:30:56 done /job/0 
2017/09/14 01:30:56 hit /job/6 
2017/09/14 01:30:56 done /job/1 
2017/09/14 01:30:56 done /job/2 
2017/09/14 01:30:56 done /job/4 
2017/09/14 01:30:56 hit /job/7 
2017/09/14 01:30:56 done /job/3 
2017/09/14 01:30:56 hit /job/9 
2017/09/14 01:30:56 hit /job/8 
2017/09/14 01:30:56 done /job/5 
2017/09/14 01:30:57 done /job/6 
2017/09/14 01:30:57 done /job/7 
2017/09/14 01:30:57 done /job/9 
2017/09/14 01:30:57 done /job/8 
2017/09/14 01:30:57 done 10 jobs 
+0

感謝を理解することがあります。私が理解したように、私は最初の例が必要です:必要なだけ多くのゴルーチンを生成します(ファイルのリスト長の長さにもよる)。同じアプローチ、 'wg.Add(1)'と 'wg.Wait()'を使います。 しかし、私はまだ理解していません、なぜ弾性検索バルク挿入機能の内側の 'wg'は外側のゴルーチンをブロックしないのですか? 'BulkInsert'関数が返るのを待つために' wg'を参照渡しする必要がありますか? – Disciples

+0

'wg'はコピーしてはいけません。渡す必要がある場合はポインタを使用してください。 wgはチャネルではなく、同時実行性を構成せず、すべてが完了したときだけ通知し、Wait()呼び出しでのみブロックします。あなたの特定のバルクコードについては、読んでいないと本当に言ってもらえませんか? –

関連する問題