私の目的は、共通フォーマットを共有する1つまたは複数のcsvファイルを読み込み、csvデータのパーティション列に基づいて別々のファイルに書き込むことです。最後の列がパーティションで、そのデータがソートされておらず、特定のパーティションが複数のファイルに存在することを許可してください。一つのファイルの例:複数のcsvファイルを同時に書き込む、Golangのパーティション列に分割する
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,04
22df9,abc,def,2017,11,06,03
1d243,abc,def,2017,11,06,02
このアプローチは恐ろしいXY問題のようなにおいがした場合、私は調整するうれしいです。データセットに
- 読み取りをし、パーティションが(これは含まれています新しいワーカールーチンをスピンオフ、 見られている場合は、各ライン
- を反復処理:私がこれまで試したどのような
a file/csv ライター)。行を
chan []string
に送信します。 - 各作業者はファイルライターであるため、入力チャネル上の1つのパーティションに対してのみ行を受け取る必要があります。
これは、特定の行に表示されたパーティション値に基づいて正しいワーカーに行を送信する方法がわからないため、明らかに機能しません(まだ)。
私は、各ワーカーに各パーティションの値についてid string
を与えられたが、私は、各労働者のために別々のchan []string
を作成する必要がある場合、に送信すること労働者を選択し、select
と、そのチャネルに送信するかどうかは知りませんしましたおそらく構造体がプールとルーティング機能のある種の作業者を保持する必要があるかどうかを判断します。
TLDR;一意の数は任意である可能性がありますが、おそらく24の一意のパーティションの値を超えない、いくつかのカテゴリのstring
値に基づいて、条件付きでデータを与えられたルーチンまたはチャネルに送信する方法については迷っています。
私はこのような質問に気付いたことに気付くでしょう。これは下位投票をするのに十分に建設的であると思われる場合は、理由を添えてコメントしてください。犯行
ご協力いただきありがとうございます。
スニペット:
package main
import (
"encoding/csv"
"fmt"
"log"
"strings"
"time"
)
func main() {
// CSV
r := csv.NewReader(csvFile1)
lines, err := r.ReadAll()
if err != nil {
log.Fatalf("error reading all lines: %v", err)
}
// CHANNELS
lineChan := make(chan []string)
// TRACKER
var seenPartitions []string
for _, line := range lines {
hour := line[6]
if !stringInSlice(hour, seenPartitions) {
seenPartitions = append(seenPartitions, hour)
go worker(hour, lineChan)
}
// How to send to the correct worker/channel?
lineChan <- line
}
close(lineChan)
}
func worker(id string, lineChan <-chan []string) {
for j := range lineChan {
fmt.Println("worker", id, "started job", j)
// Write to a new file here and wait for input over the channel
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
}
}
func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}
// DUMMY
var csvFile1 = strings.NewReader(`
12fy3,abc,def,2017,11,06,04
fsdio,abc,def,2017,11,06,01
11213,abc,def,2017,11,06,02
1sdf9,abc,def,2017,11,06,01
2123r,abc,def,2017,11,06,03
1v2t3,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1d243,abc,def,2017,11,06,01
1da23,abc,def,2017,11,06,04
a1523,abc,def,2017,11,06,01
12453,abc,def,2017,11,06,04`)
'map [string](chan []文字列)'を作成します。パーティションキーが与えられたら、その行を対応するチャネルに送信することができます。 – zerkms
@/zerkmsに同意します。あるいは、シンプルなものを保ったまま柔軟性を高めるために、各ワーカーをIDを保持する 'struct'型のインスタンス、行を送信するチャネル、停止/フラッシュ/閉じるのタイミングを教える終了チャネル、そして必要なもの'map [string] worker'を保持し、それを使って正しい行を正しいワーカーに送ります。 – Adrian
ありがとうございました! 私はこれらの提案された構造を作成しましたが、あまりよく分かりません: - デッドロックを防止する方法 - どのようにマップに追加する[文字列]作業者はトラックを維持することを示唆(私はこのworkerPoolと呼ばれる) - すでに作成した作業者の中から選択してください 現在の進捗状況: https://play.golang.org/p/j56r_QvSJs – gpanda