2016-06-28 6 views
1

私は、時系列データを格納するためにinfluxDBを使用しています。golangクライアントを使用して継続的にinfluxdbに書き込む方法

time.logというファイルから行を読み込むための簡単なゴランアプリケーションを作成しました。 https://github.com/influxdata/influxdb/blob/master/client/README.md#inserting-data

ドキュメントは言う:ポイント別名

データの挿入

時系列データは、バッチ挿入を使用してデータベースに書き込まれます。このメカニズムは、1つまたは複数のポイントを作成してから、別のバッチポイントを作成し、それらを特定のデータベースおよびシリーズに書き込むことです。シリーズとは、測定(時間/値)とタグのセットの組み合わせです。

このサンプルでは、​​1,000ポイントのバッチを作成します。各点には時間と単一の値だけでなく、形と色を示す2つのタグがあります。これらの点は、shapesという名前の測定値を使ってsquare_holesというデータベースに書き出します。

注:RetentionPolicyをバッチポイントの一部として指定できます。指定されていない場合、InfluxDBはデータベースのデフォルト保存ポリシーを使用します。

func writePoints(clnt client.Client) { 
    sampleSize := 1000 
    rand.Seed(42) 

    bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ 
     Database: "systemstats", 
     Precision: "us", 
    }) 

    for i := 0; i < sampleSize; i++ { 
     regions := []string{"us-west1", "us-west2", "us-west3", "us-east1"} 
     tags := map[string]string{ 
      "cpu": "cpu-total", 
      "host": fmt.Sprintf("host%d", rand.Intn(1000)), 
      "region": regions[rand.Intn(len(regions))], 
     } 

     idle := rand.Float64() * 100.0 
     fields := map[string]interface{}{ 
      "idle": idle, 
      "busy": 100.0 - idle, 
     } 

     bp.AddPoint(client.NewPoint(
      "cpu_usage", 
      tags, 
      fields, 
      time.Now(), 
     )) 
    } 

    err := clnt.Write(bp) 
    if err != nil { 
     log.Fatal(err) 
    } 
} 

しかし、私は継続的にログからデータを読んでいるので。私はログを読んだことはありません。だから私は流入サーバーにポイントを書くための最善の方法は何ですか?ここで

は私の現在のコードです:

cmdBP := client.NewBatchPoints(...) 
for line := range logFile.Lines { 
    pt := parseLine(line.Text) 
    cmdBP.AddPoint(pt) 
} 

influxClient.Write(cmdBP) 

は、基本的にlogFile.Linesの範囲で、それがチャネルに基づいているため、終了することはありません。

+0

処理されるN行のログごとに 'client.Write(cmdBP)'を呼び出すのはどうでしょうか? – sberry

+0

だから私は範囲のループ内のカウンタを使用する必要がありますか? –

答えて

1

(これはゴルーチンとして実行)バッチポイントとタイムアウトの組み合わせを使用します。

func (h *InfluxDBHook) loop() { 
    var coll []*client.Point 
    tick := time.NewTicker(h._batchInterval) 

    for { 
     timeout := false 

     select { 
     case pt := <-h._points: 
      coll = append(coll, pt) 
     case <-tick.C: 
      timeout = true 
     } 

     if (timeout || len(coll) >= h._batchSize) && len(coll) > 0 { 
      bp, err := client.NewBatchPoints(h._batchPointsConfig) 
      if err != nil { 
       //TODO: 
      } 
      bp.AddPoints(coll) 
      err = h._client.Write(bp) 
      if err != nil { 
       //TODO: 
      } else { 
       coll = nil 
      } 
     } 
    } 
} 

ところであなたはInfluxDBにログを送信するために、logrus loggingパッケージでフックを使用することができます(サンプルコードがありますlogrusInfluxDB hookから)。

関連する問題