2017-11-21 6 views
3

私はアプリのパフォーマンスを向上させようとしています。 コードの一部は、ファイルをサーバーにチャンクでアップロードします。作業員をスライスして作業する人数を制限します

元のバージョンでは、これが順次ループで行われます。しかし、それは遅く、シーケンス中に各チャンクをアップロードする前に別のサーバと話をする必要があります。

チャンクのアップロードは、単純にゴルーチンに配置することができます。それはうまくいきますが、ソースファイルが非常に大きい場合、大量のメモリを使用するため、良い解決策ではありません。

したがって、バッファリングされたチャネルを使用してアクティブなゴルーチンの数を制限しようとします。ここに私の試みを示すいくつかのコードがあります。私はコンセプトを示すためにそれを取り除き、それを実行して自分のためにテストすることができます。

package main 

import (
    "fmt" 
    "io" 
    "os" 
    "time" 
) 

const defaultChunkSize = 1 * 1024 * 1024 

// Lets have 4 workers 
var c = make(chan int, 4) 

func UploadFile(f *os.File) error { 
    fi, err := f.Stat() 
    if err != nil { 
     return fmt.Errorf("err: %s", err) 
    } 
    size := fi.Size() 

    total := (int)(size/defaultChunkSize + 1) 
    // Upload parts 
    buf := make([]byte, defaultChunkSize) 
    for partno := 1; partno <= total; partno++ { 
     readChunk := func(offset int, buf []byte) (int, error) { 
      fmt.Println("readChunk", partno, offset) 
      n, err := f.ReadAt(buf, int64(offset)) 
      if err != nil { 
       return n, err 
      } 

      return n, nil 
     } 

     // This will block if there are not enough worker slots available 
     c <- partno 

     // The actual worker. 
     go func() { 
      offset := (partno - 1) * defaultChunkSize 
      n, err := readChunk(offset, buf) 
      if err != nil && err != io.EOF { 
       return 
      } 

      err = uploadPart(partno, buf[:n]) 
      if err != nil { 
       fmt.Println("Uploadpart failed:", err) 
      } 
      <-c 
     }() 
    } 

    return nil 
} 

func uploadPart(partno int, buf []byte) error { 
    fmt.Printf("Uploading partno: %d, buflen=%d\n", partno, len(buf)) 
    // Actually upload the part. Lets test it by instead writing each 
    // buffer to another file. We can then use diff to compare the 
    // source and dest files. 

    // Open file. Seek to (partno - 1) * defaultChunkSize, write buffer 
    f, err := os.OpenFile("/home/matthewh/Downloads/out.tar.gz", os.O_CREATE|os.O_WRONLY, 0755) 
    if err != nil { 
     fmt.Printf("err: %s\n", err) 
    } 

    n, err := f.WriteAt(buf, int64((partno-1)*defaultChunkSize)) 
    if err != nil { 
     fmt.Printf("err=%s\n", err) 
    } 
    fmt.Printf("%d bytes written\n", n) 
    defer f.Close() 
    return nil 
} 

func main() { 
    filename := "/home/matthewh/Downloads/largefile.tar.gz" 
    fmt.Printf("Opening file: %s\n", filename) 

    f, err := os.Open(filename) 
    if err != nil { 
     panic(err) 
    } 

    UploadFile(f) 
} 

ほとんど動作します。しかし、いくつかの問題があります。 1)最後のpartno 22が3回発生しています。正しい長さは実際にはファイルの長さが1MBの倍数でないため612545です。

// Sample output 
... 
readChunk 21 20971520 
readChunk 22 22020096 
Uploading partno: 22, buflen=1048576 
Uploading partno: 22, buflen=612545 
Uploading partno: 22, buflen=1048576 

もう一つの問題は、アップロードが失敗する可能性があり、私は行くとどのように最高のゴルーチンの不具合を解決するとともに、十分慣れていませんよ。

最後に、正常に終了すると、uploadPartからデータを戻したいと思います。具体的には、文字列(HTTP ETagヘッダー値)になります。これらのエタグ値は主機能によって収集される必要があります。

このコードをこのインスタンスで構造化するにはどうすればよいでしょうか?私はここで私のニーズを正しく満たす良いゴランのデザインパターンをまだ見つけていません。

+1

コンパイルエラー: 'undefined:UploadChunk'。 – peterSO

答えて

0

このコードをどのように構造化するのが良いのかという問題をスキップすると、コードに問題が発生している可能性があるバグがあります。 goroutineで実行している関数は、ループの繰り返しごとに変化する変数partnoを使用するため、ゴルーチンを起動したときにゴルーチンに必ずpartnoという値が表示されるわけではありません。この固定の一般的な方法は、ループ内でその変数のローカルコピーを作成することである。

for partno := 1; partno <= total; partno++ { 
    partno := partno 
    // ... 
} 
+0

または、パラメータとして無名関数に渡してください。 – Adrian

+0

これはまた動作しますが、ループの始めに定義された関数については注意が必要ですが、 'partno'(デバッグ出力のためだけですが)も使用され、goroutineから呼び出されます。潜在的に間違った値が表示されることもあります。 –

+0

複数のゴルーチンが安全にf.ReadAtを呼び出すことができますか? – Matt

0

データレース#1

複数goroutines同時に同じ緩衝液を使用しています。 1つのgoroutingがそれを新しいチャンクで埋めている間に別のチャンクがそれから古いチャンクを読み込んでいる可能性があることに注意してください。代わりに、各goroutineはそれ自身のバッファを持つ必要があります。

データレース#2

アンディSchweigが指摘しているように、その反復で作成したgoroutineがそれを読むための機会を得る前に、partnoの値がループで更新されます。これが最終的なpartno 22が複数回現れる理由です。これを修正するには、partnoを引数として無名関数に渡すことができます。それは各goroutineがそれ自身の部品番号を持っていることを保証します。

また、チャンネルを使用してワーカーの結果を渡すこともできます。おそらく、部品番号とエラーを持つ構造体の型です。こうすることで、進行状況を確認して失敗したアップロードを再試行できます。

良いパターンの例については、exampleをGOPLの本からチェックしてください。アンディSchweig partnoによって示されるよう

0

提案された変更はdev.bmax bufによって示されるよう

は、外出先ルーチンに移動機能をアノンするPARAMであり、また、アップロードが完了した前出たUploadFileためWaitGroupを加えました。またdefer f.Close()ファイル、良い習慣。

package main 

import (
    "fmt" 
    "io" 
    "os" 
    "sync" 
    "time" 
) 

const defaultChunkSize = 1 * 1024 * 1024 

// wg for uploads to complete 
var wg sync.WaitGroup 

// Lets have 4 workers 
var c = make(chan int, 4) 

func UploadFile(f *os.File) error { 
    // wait for all the uploads to complete before function exit 
    defer wg.Wait() 

    fi, err := f.Stat() 
    if err != nil { 
     return fmt.Errorf("err: %s", err) 
    } 
    size := fi.Size() 
    fmt.Printf("file size: %v\n", size) 

    total := int(size/defaultChunkSize + 1) 
    // Upload parts 
    for partno := 1; partno <= total; partno++ { 

     readChunk := func(offset int, buf []byte, partno int) (int, error) { 
      fmt.Println("readChunk", partno, offset) 
      n, err := f.ReadAt(buf, int64(offset)) 
      if err != nil { 
       return n, err 
      } 

      return n, nil 
     } 

     // This will block if there are not enough worker slots available 
     c <- partno 

     // The actual worker. 
     go func(partno int) { 
      // wait for me to be done 
      wg.Add(1) 
      defer wg.Done() 

      buf := make([]byte, defaultChunkSize) 

      offset := (partno - 1) * defaultChunkSize 
      n, err := readChunk(offset, buf, partno) 
      if err != nil && err != io.EOF { 
       return 
      } 

      err = uploadPart(partno, buf[:n]) 
      if err != nil { 
       fmt.Println("Uploadpart failed:", err) 
      } 
      <-c 
     }(partno) 
    } 

    return nil 
} 

func uploadPart(partno int, buf []byte) error { 
    fmt.Printf("Uploading partno: %d, buflen=%d\n", partno, len(buf)) 

    // Actually do the upload. Simulate long running task with a sleep 
    time.Sleep(time.Second) 
    return nil 
} 

func main() { 
    filename := "/home/matthewh/Downloads/largefile.tar.gz" 
    fmt.Printf("Opening file: %s\n", filename) 

    f, err := os.Open(filename) 
    if err != nil { 
     panic(err) 
    } 
    defer f.Close() 

    UploadFile(f) 
} 

は、私はあなたがbuf状況に少し賢く対処することができます確信しています。私はちょうどゴミを処理しようとしています。あなたは特定の番号4にあなたの労働者を制限しているので、あなたは本当に唯一の4 x defaultChunkSizeバッファが必要です。あなたが何かシンプルで分け合っていたら分けてください。

楽しくお楽しみください!

+0

ありがとうございます。また、複数のgoroutineがf.ReadAtを安全に呼び出すことができますか? – Matt

+0

私の理解は読み取り専用です(このケースは安全です)。 – biosckon

関連する問題