2017-02-09 3 views
0

これは私の存在の悩みです。 getEC2Metricsはチャンネルに公開されているもののよう複数のプロデューサ(ゴルーチン付き)でセマフォを実装する

type ec2Params struct { 
    sess *session.Session 
    region string 
} 

type cloudwatchParams struct { 
    cl cloudwatch.CloudWatch 
    id string 
    metric string 
    region string 
} 

type request struct { 
    ec2Params 
    cloudwatchParams 
} 

// Control concurrency and sync 
var maxRoutines = 128 
var sem chan bool 
var req chan request 

func main() { 
    sem := make(chan bool, maxRoutines) 
    for i := 0; i < maxRoutines; i++ { 
     sem <- true 
    } 
    req := make(chan request) 
    go func() { // This is my the producer 
     for _, arn := range arns { 
      arnCreds := startSession(arn) 
      for _, region := range regions { 
       sess, err := session.NewSession(
        &aws.Config{****}) 
       if err != nil { 
        failOnError(err, "Can't assume role") 
       } 
       req <- request{ec2Params: ec2Params{ **** }} 
      } 
     } 
    }() 
    for f := range(req) { 
     <- sem 
     if (ec2Params{}) != f.ec2Params { 
      go getEC2Metrics(****) 
     } else { 
      // I should be excercising this line of code too, 
      // but I'm not :(
      go getMetricFromCloudwatch(****) 
     } 
     sem <- true 
    } 
} 

getEC2MetricsgetCloudwatchMetricsはasynchronically reqにデータを公開すべきである

func getMetricFromCloudwatch(cl cloudwatch.CloudWatch, id, metric, region string) { 
    // Magic 
} 

func getEC2Metrics(sess *session.Session, region string) { 
    ec := ec2.New(sess) 
    var ids []string 
    l, err := ec.DescribeInstances(&ec2.DescribeInstancesInput{}) 
    if err != nil { 
     fmt.Println(err.Error()) 
    } else { 
     for _, rsv := range l.Reservations { 
      for _, inst := range rsv.Instances { 
       ids = append(ids, *inst.InstanceId) 
      } 
     } 
     metrics := cfg.AWSMetric.Metric 
     if len(ids) >= 0 { 
      cl := cloudwatch.New(sess) 
      for _, id := range ids{ 
       for _, metric := range metrics { 
        // For what I can tell, execution get stuck here 
        req <- request{ cloudwatchParams: ***** }} 
       } 
      } 
     } 
    } 
} 

両方mainで匿名プロデューサーとgetEC2Metricsを実行するためにゴルーチンですが、今のところそれはそうです処理されません。 goroutineの中から私が出版を止める何かがあるように見えますが、何も見つかりませんでした。私はこれについてどうやって行かなければならないのか知りたいと思います(これは実際に動作するセマフォーです)。

実装のベースは、ここで見つけることができます:必死https://burke.libbey.me/conserving-file-descriptors-in-go/

+1

セマフォを使用していないので、トークンを取り出してゴルーチンを取り出し、トークンを元に戻すだけです。ブロックしないでください。あなたはセマフォを "埋める"必要はなく、セマフォを取るためにトークンを送信し、それを解放するトークンを受け取ることができます) – JimB

+0

あなたは完全に正しいです!私はそれを更新し、私はgetEC2Metrics関数へのチャネル参照を渡す必要があるという別の問題を発見した。何らかの理由でグローバル定義を使用することはできません。 –

答えて

0

イムを、JimBのコメントはホイールスピンをしたし、今私はこれを解決してきました!セマフォがロックされなかった

  1. (私がチェックアウトしたので、それはあると思うゴルーチン内のトークンで、その競合状態はおそらくあった):
    // Control concurrency and sync 
    var maxRoutines = 128 
    var sem chan bool 
    var req chan request // Not reachable inside getEC2Metrics 
    
    func getEC2Metrics(sess *session.Session, region string, req chan <- request) { 
        .... 
        .... 
          for _, id := range ids{ 
           for _, metric := range metrics { 
            req <- request{ **** }} // When using the global req, 
                  // this would block 
           } 
          } 
        .... 
        .... 
    } 
    
    func main() { 
        sem := make(chan bool, maxRoutines) 
        for i := 0; i < maxRoutines; i++ { 
         sem <- true  
        } 
        req := make(chan request) 
        go func() { 
         // Producing tasks 
        }() 
        for f := range(req) { 
         <- sem // checking out tickets outside the goroutine does block 
           //outside of the goroutine 
         go func() { 
          defer func() { sem <- true }() 
          if (ec2Params{}) != f.ec2Params { 
           getEC2Metrics(****, req) // somehow sending the channel makes 
                 // possible to publish to it 
          } else { 
           getMetricFromCloudwatch(****) 
          } 
         }() 
        } 
    } 
    

    は二つの問題がありました。
  2. グローバルチャネルのreqはgetEC2Metricsで正しくアドレス指定されていなかったため、スコープ内にあったチャネルに公開しようとしているうちにゴルーチンが停止してしまいましたが、理由はまだわかりません)。

私は正直言って2番目のアイテムに恵まれましたが、これまでのところ私はこのクルックに関するドキュメントを見つけられませんでしたが、最後にはうまくいきました。

+0

チャネルからの送受信に「競合状態」はありません。これらは同時使用のために作られた同期プリミティブです。メインのグローバルな 'req'チャンネルをシャドーイングしています。 – JimB

関連する問題