Docker

2017-11-15 9 views
0

にデプロイすると、GolangパニックにApache Kafkaのコンシューマーが実装されました。ここでは、kafkaサーバーからメッセージを読み込んでHTTP経由で送信する単純なマイクロサービスを実装しようとしています。それは私が、端末からそれを実行すると正常に動作しますが、Docker

panic: runtime error: invalid memory address or nil pointer dereference 
[signal SIGSEGV: segmentation violation code=0x1 addr=0x40 pc=0x7b6345] 

goroutine 12 [running]: 
main.kafkaRoutine.func1(0xc420174060, 0x0, 0x0) 
    /go/src/github.com/deathcore666/ProperConsumerServiceYo/kafka.go:36 +0x95 
created by main.kafkaRoutine 
    /go/src/github.com/deathcore666/ProperConsumerServiceYo/kafka.go:32 +0x1ad 

kafka.go線32と36はgo func(pc sarama.PartitionConsumer)機能が座っているものですし、それはパニックドッカーする上で展開したとき。 私はプログラミングが比較的新しいので、どんな助けにも感謝します。ありがとうございました!

main.go:

func main() { 
var (
    listen = flag.String("listen", ":8080", "HTTP listen address") 
    proxy = flag.String("proxy", "", "Optional comma-separated list of URLs to proxy uppercase requests") 
) 
flag.Parse() 

logger := log.NewLogfmtLogger(os.Stderr) 

var svc KafkaService 
svc = kafkaService{} 
svc = proxyingMiddleware(context.Background(), *proxy, logger)(svc) 
svc = loggingMiddleware(logger)(svc) 


consumehandler := httptransport.NewServer(
    makeConsumeEndpoint(svc), 
    decodeConsumeRequest, 
    encodeResponse, 
) 

http.Handle("/consume", consumehandler) 

logger.Log("msg", "HTTP", "addr", *listen) 
logger.Log("err", http.ListenAndServe(*listen, nil))} 

service.go:

package main 

import (
    "context" 
    "errors" 
    "time" 
) 

//KafkaService yolo 
type KafkaService interface { 
    Consume(context.Context, string) (string, error) 
} 

//ErrEmpty yolo 
var ErrEmpty = errors.New("No topic provided") 

type kafkaService struct{} 

//Consumer logic implemented here 
func (kafkaService) Consume(_ context.Context, topic string) (string, error) { 
    if topic == "" { 
     return "", ErrEmpty 
    } 

    var inChan = make(chan string) 
    var readyChan = make(chan struct{}) 
    var result string 
    var brokers = []string{"192.168.88.208:9092"} 
    //var brokersLocal = []string{"localhost:9092"} 
    go kafkaRoutine(inChan, topic, brokers) 
    go func() { 
     for { 
      select { 
      case msg := <-inChan: 
       result = result + msg + "\n" 
      case <-time.After(time.Second * 1): 
       readyChan <- struct{}{} 
      } 

     } 
    }() 

    <-readyChan 
    close(inChan) 
    return result, nil 
} 

//ServiceMiddleware is a chainable thing for the service 
type ServiceMiddleware func(KafkaService) KafkaService 

kafka.go:kafka.goの27行で

package main 

import (
    "fmt" 
    "time" 

    "github.com/Shopify/sarama" 
) 

func kafkaRoutine(inChan chan string, topic string, brokers []string) { 
    config := sarama.NewConfig() 
    config.Consumer.Return.Errors = true 
    consumer, err := sarama.NewConsumer(brokers, config) 
    if err != nil { 
     panic(err) 
    } 

    topics, _ := consumer.Topics() 
    if !(containsTopic(topics, topic)) { 
     inChan <- "There is no such a topic" 
     fmt.Println("kafkaroutine exited") 
     return 
    } 

    partitionList, err := consumer.Partitions(topic) 
    for _, partition := range partitionList { 
     pc, _ := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest) 
     go func(pc sarama.PartitionConsumer) { 
     loop: 
      for { 
       select { 
       case msg := <-pc.Messages(): 
        inChan <- string(msg.Value) 
       case <-time.After(time.Second * 1): 
        break loop 
       } 
      } 
     }(pc) 
    } 
    fmt.Println("Kafka GoRoutine exited") 
} 

func containsTopic(topics []string, topic string) bool { 
    for _, v := range topics { 
     if topic == v { 
      return true 
     } 
    } 
    return false 
} 

答えて

0

、あなたはエラーを無視していますConsumePartition()から返されました。有効なパーティションコンシューマではなくエラーが返されている可能性がありますが、パーティションコンシューマを使用しようとすると無視するので、クラッシュします。

+0

DNSの問題です。あなたのコンテナはお使いのコンピュータと同じDNSサーバーを使用していません。さらなる助けが必要な場合は、新しい質問をすることをお勧めします。 – Robo

+0

yess !!!ありがとうございました! kafkaサーバーの設定ファイルが正しく設定されていません –

関連する問題