2017-11-14 8 views
1

私の目的は、共通フォーマットを共有する1つまたは複数のcsvファイルを読み込み、csvデータのパーティション列に基づいて別々のファイルに書き込むことです。最後の列がパーティションで、そのデータがソートされておらず、特定のパーティションが複数のファイルに存在することを許可してください。一つのファイルの例:複数のcsvファイルを同時に書き込む、Golangのパーティション列に分割する

fsdio,abc,def,2017,11,06,01 
1sdf9,abc,def,2017,11,06,04 
22df9,abc,def,2017,11,06,03 
1d243,abc,def,2017,11,06,02 

このアプローチは恐ろしいXY問題のようなにおいがした場合、私は調整するうれしいです。データセットに

  • 読み取りをし、パーティションが(これは含まれています新しいワーカールーチンをスピンオフ、 見られている場合は、各ライン
  • を反復処理:私がこれまで試したどのような

    a file/csv ライター)。行をchan []stringに送信します。

  • 各作業者はファイルライターであるため、入力チャネル上の1つのパーティションに対してのみ行を受け取る必要があります。

これは、特定の行に表示されたパーティション値に基づいて正しいワーカーに行を送信する方法がわからないため、明らかに機能しません(まだ)。

私は、各ワーカーに各パーティションの値についてid stringを与えられたが、私は、各労働者のために別々のchan []stringを作成する必要がある場合、に送信すること労働者を選択し、selectと、そのチャネルに送信するかどうかは知りませんしましたおそらく構造体がプールとルーティング機能のある種の作業者を保持する必要があるかどうかを判断します。

TLDR;一意の数は任意である可能性がありますが、おそらく24の一意のパーティションの値を超えない、いくつかのカテゴリのstring値に基づいて、条件付きでデータを与えられたルーチンまたはチャネルに送信する方法については迷っています。

私はこのような質問に気付いたことに気付くでしょう。これは下位投票をするのに十分に建設的であると思われる場合は、理由を添えてコメントしてください。犯行

ご協力いただきありがとうございます。

Playground

スニペット:

package main 

    import (
     "encoding/csv" 
     "fmt" 
     "log" 
     "strings" 
     "time" 
    ) 

    func main() { 

     // CSV 
     r := csv.NewReader(csvFile1) 
     lines, err := r.ReadAll() 
     if err != nil { 
      log.Fatalf("error reading all lines: %v", err) 
     } 

     // CHANNELS 
     lineChan := make(chan []string) 

     // TRACKER 
     var seenPartitions []string 

     for _, line := range lines { 

      hour := line[6] 
      if !stringInSlice(hour, seenPartitions) { 
       seenPartitions = append(seenPartitions, hour) 
       go worker(hour, lineChan) 
      } 
      // How to send to the correct worker/channel? 
      lineChan <- line 

     } 
     close(lineChan) 
    } 

    func worker(id string, lineChan <-chan []string) { 
     for j := range lineChan { 
      fmt.Println("worker", id, "started job", j) 
      // Write to a new file here and wait for input over the channel 
      time.Sleep(time.Second) 
      fmt.Println("worker", id, "finished job", j) 
     } 
    } 

    func stringInSlice(str string, list []string) bool { 
     for _, v := range list { 
      if v == str { 
       return true 
      } 
     } 
     return false 
    } 

    // DUMMY 
var csvFile1 = strings.NewReader(` 
12fy3,abc,def,2017,11,06,04 
fsdio,abc,def,2017,11,06,01 
11213,abc,def,2017,11,06,02 
1sdf9,abc,def,2017,11,06,01 
2123r,abc,def,2017,11,06,03 
1v2t3,abc,def,2017,11,06,01 
1r2r3,abc,def,2017,11,06,02 
g1253,abc,def,2017,11,06,02 
d1e23,abc,def,2017,11,06,02 
a1d23,abc,def,2017,11,06,02 
12jj3,abc,def,2017,11,06,03 
t1r23,abc,def,2017,11,06,03 
22123,abc,def,2017,11,06,03 
14d23,abc,def,2017,11,06,04 
1d243,abc,def,2017,11,06,01 
1da23,abc,def,2017,11,06,04 
a1523,abc,def,2017,11,06,01 
12453,abc,def,2017,11,06,04`) 
+1

'map [string](chan []文字列)'を作成します。パーティションキーが与えられたら、その行を対応するチャネルに送信することができます。 – zerkms

+1

@/zerkmsに同意します。あるいは、シンプルなものを保ったまま柔軟性を高めるために、各ワーカーをIDを保持する 'struct'型のインスタンス、行を送信するチャネル、停止/フラッシュ/閉じるのタイミングを教える終了チャネル、そして必要なもの'map [string] worker'を保持し、それを使って正しい行を正しいワーカーに送ります。 – Adrian

+0

ありがとうございました! 私はこれらの提案された構造を作成しましたが、あまりよく分かりません: - デッドロックを防止する方法 - どのようにマップに追加する[文字列]作業者はトラックを維持することを示唆(私はこのworkerPoolと呼ばれる) - すでに作成した作業者の中から選択してください 現在の進捗状況: https://play.golang.org/p/j56r_QvSJs – gpanda

答えて

1

同期バージョンはありません最初の同時魔法を行く(以下、並行バージョンを参照してください)。

package main 

import (
    "encoding/csv" 
    "fmt" 
    "io" 
    "log" 
    "strings" 
) 

func main() { 

    // CSV 
    r := csv.NewReader(csvFile1) 
    partitions := make(map[string][][]string) 

    for { 
     rec, err := r.Read() 
     if err != nil { 
      if err == io.EOF { 
       err = nil 

       save_partitions(partitions) 

       return 
      } 
      log.Fatal(err) 
     } 

     process(rec, partitions) 
    } 

} 

// prints only 
func save_partitions(partitions map[string][][]string) { 
    for part, recs := range partitions { 
     fmt.Println(part) 
     for _, rec := range recs { 
      fmt.Println(rec) 
     } 
    } 
} 

// this can also write/append directly to a file 
func process(rec []string, partitions map[string][][]string) { 
    l := len(rec) 
    part := rec[l-1] 
    if p, ok := partitions[part]; ok { 
     partitions[part] = append(p, rec) 
    } else { 
     partitions[part] = [][]string{rec} 
    } 
} 

// DUMMY 
var csvFile1 = strings.NewReader(` 
fsdio,abc,def,2017,11,06,01 
1sdf9,abc,def,2017,11,06,01 
1d243,abc,def,2017,11,06,01 
1v2t3,abc,def,2017,11,06,01 
a1523,abc,def,2017,11,06,01 
1r2r3,abc,def,2017,11,06,02 
11213,abc,def,2017,11,06,02 
g1253,abc,def,2017,11,06,02 
d1e23,abc,def,2017,11,06,02 
a1d23,abc,def,2017,11,06,02 
12jj3,abc,def,2017,11,06,03 
t1r23,abc,def,2017,11,06,03 
2123r,abc,def,2017,11,06,03 
22123,abc,def,2017,11,06,03 
14d23,abc,def,2017,11,06,04 
1da23,abc,def,2017,11,06,04 
12fy3,abc,def,2017,11,06,04 
12453,abc,def,2017,11,06,04`) 

https://play.golang.org/p/--iqZGzxCF

同時バージョン:

package main 

import (
    "encoding/csv" 
    "fmt" 
    "io" 
    "log" 
    "strings" 
    "sync" 
) 

var (
    // list of channels to communicate with workers 
    // workers accessed synchronousely no mutex required 
    workers = make(map[string]chan []string) 

    // wg is to make sure all workers done before exiting main 
    wg = sync.WaitGroup{} 

    // mu used only for sequential printing, not relevant for program logic 
    mu = sync.Mutex{} 
) 

func main() { 

    // wait for all workers to finish up before exit 
    defer wg.Wait() 

    r := csv.NewReader(csvFile1) 

    for { 
     rec, err := r.Read() 
     if err != nil { 
      if err == io.EOF { 
       savePartitions() 
       return 
      } 
      log.Fatal(err) // sorry for the panic 
     } 
     process(rec) 
    } 

} 

func process(rec []string) { 
    l := len(rec) 
    part := rec[l-1] 

    if c, ok := workers[part]; ok { 
     // send rec to worker 
     c <- rec 
    } else { 
     // if no worker for the partition 

     // make a chan 
     nc := make(chan []string) 
     workers[part] = nc 

     // start worker with this chan 
     go worker(nc) 

     // send rec to worker via chan 
     nc <- rec 
    } 
} 

func worker(c chan []string) { 

    // wg.Done signals to main worker completion 
    wg.Add(1) 
    defer wg.Done() 

    part := [][]string{} 
    for { 
     // wait for a rec or close(chan) 
     rec, ok := <-c 
     if ok { 
      // save the rec 
      // instead of accumulation in memory 
      // this can be saved to file directly 
      part = append(part, rec) 
     } else { 
      // channel closed on EOF 

      // dump partition 
      // locks ensures sequential printing 
      // not a required for independent files 
      mu.Lock() 
      for _, p := range part { 
       fmt.Printf("%+v\n", p) 
      } 
      mu.Unlock() 

      return 
     } 
    } 
} 

// simply signals to workers to stop 
func savePartitions() { 
    for _, c := range workers { 
     // signal to all workers to exit 
     close(c) 
    } 
} 

// DUMMY 
var csvFile1 = strings.NewReader(` 
fsdio,abc,def,2017,11,06,01 
1sdf9,abc,def,2017,11,06,01 
1d243,abc,def,2017,11,06,01 
1v2t3,abc,def,2017,11,06,01 
a1523,abc,def,2017,11,06,01 
1r2r3,abc,def,2017,11,06,02 
11213,abc,def,2017,11,06,02 
g1253,abc,def,2017,11,06,02 
d1e23,abc,def,2017,11,06,02 
a1d23,abc,def,2017,11,06,02 
12jj3,abc,def,2017,11,06,03 
t1r23,abc,def,2017,11,06,03 
2123r,abc,def,2017,11,06,03 
22123,abc,def,2017,11,06,03 
14d23,abc,def,2017,11,06,04 
1da23,abc,def,2017,11,06,04 
12fy3,abc,def,2017,11,06,04 
12453,abc,def,2017,11,06,04`) 

https://play.golang.org/p/oBTPosy0yT

お楽しみに!

+0

私の使用事例でこれがどのように機能しているかを見て、何か質問があれば返信します。ありがとう! – gpanda

+0

両方のソリューションは素晴らしいですが、2番目のソリューションは特にエレガントです。私は特に、worker()内のchanを閉じるためのok構文、および説明のコメントに感謝します。私はLock()が独立したファイルに必要でないというコメントについて興味があります。私はこれが私が各パーティションのために別々のcsvに書き込むために適応するところだと思います - 各パーティションをファイルに書き込むためにここでロックが必要ないのは確かですか? – gpanda

+1

@gpandaはい、ロックする必要はないと確信しています。これは、印刷するときに私たちがどこに印刷するかを示す単一の共有リソースを持つために必要です。 'mu.Lockとmu.Unlock'をコメントアウトしてみてください。線の順序がランダムになる以外は何も悪くはありません。このロックで、各作業員は「話している間に誰もが黙っているのを見てください」と言っています。あなたが作業者ごとに1つのファイルを持っている場合、あなたは何も共有していないので、あなたは 'part:=] [] string {}'をファイルとして考えることができます。私たちはそれを追加するためにそれをロックしていません。 – biosckon

関連する問題