2017-10-20 3 views
2

クライアントがネットワークエラーによって切断される場合、サーバーは私のケースではpub/sub接続を閉じる必要があります。私はctx.Done()関数について知っていますが、私の場合は正しく使用する方法がわかりません。誰か説明していただけますか?クライアントが切断されている場合、ctx.Done()を正しく使用する方法は?

grpc-行く:1.7.0

行くのバージョンgo1.8.4

func (a *API) Notifications(in *empty.Empty, stream pb.Service_NotificationsServer) error { 
    ctx := stream.Context() 
    _, ok := user.FromContext(ctx) 
    if !ok { 
     return grpc.Errorf(codes.Unauthenticated, "user not found") 
    } 

    pubsub := a.redisClient.Subscribe("notifications") 
    defer pubsub.Close() 

    for { 
     msg, err := pubsub.ReceiveMessage() 
     if err != nil { 
      grpclog.Warningf("Notifications: pubsub error: %v", err) 
      return grpc.Errorf(codes.Internal, "pubsub error %v", err) 
     } 

     notification := &pb.Notification{} 
     err = json.Unmarshal([]byte(msg.Payload), notification) 
     if err != nil { 
      grpclog.Warningf("Notifications: parse error: %v", err) 
      continue 
     } 
     if err := stream.Send(notification); err != nil { 
      grpclog.Warningf("Notifications: %v", err) 
      return err 
     } 
     grpclog.Infof("Notifications: send msg %v", notification) 
    } 
} 

答えて

0

あなたは、呼び出し元の関数(またはどこコンテキストにアクセスすることができる)からコンテキストをキャンセルして、上の適切な処置を行う必要がありますDone()selectステートメントにチェックインしてください。完了

完了

は、このコンテキストのために実行された処理をキャンセルする必要があるときに閉じていますチャンネルを返すSELECT文で使用するために提供されています。このコンテキストを決してキャンセルできない場合、Doneはnilを返すことがあります。 Doneを連続して呼び出すと、同じ値が返されます。

そして

WithCancelは新しい完了チャネルと親のコピーを返します。返されたコンテキストのDoneチャネルは、返されたキャンセル関数が呼び出されたとき、または親コンテキストのDoneチャネルが閉じられたときのどちらか早い方で発生したときに閉じられます。

このコンテキストをキャンセルすると、関連付けられているリソースが解放されるため、このコンテキストで実行されている操作が完了するとコードはcancelを呼び出す必要があります。

go func() { 
    for { 
     select { 
     case <-ctx.Done(): 
      return // returning not to leak the goroutine 
     case dst <- n: 
      n++ 
     } 
    } 
}() 
1

あなたはselectを使用することができます。通常は関数からデータを取得する代わりに、チャネルを使用してデータを取得し、実行ルーチンを使用してそれを処理します。 このようないくつかの事:

func (a *API) Notifications(in *empty.Empty, stream 
    pb.Service_NotificationsServer) error { 
    ctx := stream.Context() 
    _, ok := user.FromContext(ctx) 
    if !ok { 
     return grpc.Errorf(codes.Unauthenticated, "user not found") 
    } 

    pubsub := a.redisClient.Subscribe("notifications") 
    defer pubsub.Close() 

    // I can not build the code, so I assume the msg in your code Message struct 
    c := make(chan Message) 
    go func() { 
     for { 
      msg, err := pubsub.ReceiveMessage() 
      if err != nil { 
       grpclog.Warningf("Notifications: pubsub error: %v", err) 
       close(c) 
       return grpc.Errorf(codes.Internal, "pubsub error %v", err) 
      } 
      c<- msg 
     } 
    }() 

    for { 
     select { 
      case msg, ok := <-c: 
       if !ok { 
        // channel is closed handle it 
       } 
       notification := &pb.Notification{} 
       err = json.Unmarshal([]byte(msg.Payload), notification) 
       if err != nil { 
        grpclog.Warningf("Notifications: parse error: %v", err) 
        continue 
       } 
       if err := stream.Send(notification); err != nil { 
        grpclog.Warningf("Notifications: %v", err) 
        return err 
       } 
       grpclog.Infof("Notifications: send msg %v", notification) 
      case <- ctx.Done(): 
       // do exit logic. some how close the pubsub, so next 
       // ReceiveMessage() return an error 
       // if forget to do that the go routine runs for ever 
       // until the end of main(), which I think its not what you wanted 
       pubsub.Close() // Its just pseudo code 
       return 
     } 
    } 
} 

チャネルから(私はタイプがMessageであると仮定)のメッセージを読んで、そしてselectの電源を使用しています。このシナリオでは

2つのその他の関連する事柄:

  1. は、この機能を確定した後、ルーチンの端を行くことを確認してください。私はコードについて知らないので、私は推測することはできませんが、pubsubを閉じるためのClose()メソッドがあると仮定しますので、次のReceiveMessageはエラーを返します。 (私が望む仕事をする延期人を参照してください)

  2. ReceiveMessageの前にctx.Doneの前にエラーがある場合は、チャンネルを閉じてループを解除することができます。

関連する問題