2017-11-02 7 views
1

私はローカルのKafkaサーバーからメッセージを読み込んでチャットに出力する簡単なテレグラムボットを作っています。 zookeeperとkafkaサーバーの設定ファイルはどちらもデフォルトになっています。コンソールのコンシューマー作品。 Golang Saramaパッケージを使用してコードからメッセージを消費しようとすると、この問題が発生します。私はこれらの行を追加する前に:Golang Saramaパッケージを使用して、ローカルに実行中のKafkaサーバーからのメッセージを使用できません。

case err := <-pc.Errors(): log.Panic(err)

をプログラムは、それが失速そのあと、一度メッセージを印刷しました。 は、今では、ログにこれをprinitngパニック: kafka: error while consuming test1/0: kafka: broker not connected

は、ここでは、コードです:

type kafkaResponse struct { 
     telega *tgbotapi.Message 
     message []byte 
    } 

    type kafkaRequest struct { 
     telega *tgbotapi.Message 
     topic string 
    }  
    var kafkaBrokers = []string{"localhost:9092"} 
    func main() { 
       //channels for request response 
       var reqChan = make(chan kafkaRequest) 
       var respChan = make(chan kafkaResponse) 

       //starting kafka client routine to listen to topic channnel 
       go consumer(reqChan, respChan, kafkaBrokers) 

       //bot thingy here 
       bot, err := tgbotapi.NewBotAPI(token) 
       if err != nil { 
        log.Panic(err) 
       } 
       bot.Debug = true 
       log.Printf("Authorized on account %s", bot.Self.UserName) 
       u := tgbotapi.NewUpdate(0) 
       u.Timeout = 60 
       updates, err := bot.GetUpdatesChan(u) 
       for { 
        select { 
        case update := <-updates: 
         if update.Message == nil { 
          continue 
         } 
         switch update.Message.Text { 

         case "Topic: test1": 
          topic := "test1" 
          reqChan <- kafkaRequest{update.Message, topic} 
         } 
        case response := <-respChan: 
         bot.Send(tgbotapi.NewMessage(response.telega.Chat.ID, string(response.message))) 
        } 

       } 

はここconsumer.goです:

func consumer(reqChan chan kafkaRequest, respChan chan kafkaResponse, brokers []string) { 
      config := sarama.NewConfig() 
      config.Consumer.Return.Errors = true 

      // Create new consumer 
      consumer, err := sarama.NewConsumer(brokers, config) 
      if err != nil { 
       panic(err) 
      } 
      defer func() { 
       if err := consumer.Close(); err != nil { 
        panic(err) 
       } 
      }() 

      select { 
      case request := <-reqChan: 
       //get all partitions on the given topic 
       partitionList, err := consumer.Partitions(request.topic) 
       if err != nil { 
        fmt.Println("Error retrieving partitionList ", err) 
       } 

       initialOffset := sarama.OffsetOldest 
       for _, partition := range partitionList { 
        pc, _ := consumer.ConsumePartition(request.topic, partition, initialOffset) 

        go func(pc sarama.PartitionConsumer) { 
         for { 
          select { 
          case message := <-pc.Messages(): 
           respChan <- kafkaResponse{request.telega, message.Value} 
          case err := <-pc.Errors(): 
           log.Panic(err) 
          } 
         } 
        }(pc) 
       } 
      } 
     } 
+0

sync.WaitGroupを追加推薦

// Close shuts down the consumer. It must be called after all child // PartitionConsumers have already been closed. Close() error 

(それはだ場所) –

+0

事がということですクライアントはメッセージを取得することができますが、私は延期機能を削除したので(問題があるかどうかを確認するだけですが)、プログラムはまだ停止していますので、一度だけ –

答えて

0

あなたはすべての設定後に、あなたの消費者を閉じていますPartitionConsumerコード内

defer func() { 
      if err := consumer.Close(); err != nil { 
       panic(err) 
      } 
     }() 
ただし、すべてのPartitionConsumersが閉じられた後でのみ、コンシューマを閉じる必要があることをドキュメントで指定しています。

私はあなたがエラーメッセージがかなり明確である、あなたのブローカー(カフカサーバ)がクライアントからアクセスできない機能にgo func(pc sarama.PartitionConsumer) {

+0

が問題ではないようです。 –

+0

質問に記載されているように、「kafka:test1/0:kafka:broker not connected」というエラーメッセージが表示されますか? –

+0

それは、ありがとう! –

関連する問題