2017-01-02 1 views
1

ScalaとSparkを使用してリモートKafkaキューのトピックからメッセージを消費する必要があります。デフォルトでは、リモートマシン上のカフカのポートは7072であり、9092ではありません。また、リモートマシンにインストールされ、次のバージョンがあります。ScalaからKafkaにブローカリストパラメータを渡せません:プロパティbootstrap.serversが無効です

  1. カフカ0.10.1.0
  2. Scalaの2.11

は、それは私がScalaのから(ポート7072付き)ブローカーのリストを渡す必要があることを意味しそれ以外の場合はデフォルトのポートを使用しようとするため、リモートのKafkaに送信します。

ログによれば、パラメータbootstrap.serversはリモートマシンで認識できないという問題があります。また、このパラメータの名前をmetadata.broker.list,broker.listlistenersに変更しようとしましたが、ログProperty bootstrap.servers is not validに同じエラーが表示され、次にデフォルトでポート9092が使用されます(メッセージは明らかに消費されません)。 POMファイルで

私はカフカとスパークのために、次の依存関係を使用します。

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming-kafka_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

だから、私はScalaの2.10、2.11ないし使用しています。

これは、(私は自分のカフカは私がEMRマシン(そこに私はカフカのために使用するポート9092を持っている)持っているアマゾンのクラウドにインストール使用している場合、それは絶対に正常に動作します)私のScalaのコードです:

val testTopicMap = testTopic.split(",").map((_, kafkaNumThreads.toInt)).toMap 

    val kafkaParams = Map[String, String](
     "broker.list" -> "XXX.XX.XXX.XX:7072", 
     "zookeeper.connect" -> "XXX.XX.XXX.XX:2181", 
     "group.id" -> "test", 
     "zookeeper.connection.timeout.ms" -> "10000", 
     "auto.offset.reset" -> "smallest") 

    val testEvents: DStream[String] = 
     KafkaUtils 
     .createStream[String, String, StringDecoder, StringDecoder](
     ssc, 
     kafkaParams, 
     testTopicMap, 
     StorageLevel.MEMORY_AND_DISK_SER_2 
    ).map(_._2) 

を私はthis Documentationを読んでいたが、私がやったことはすべて正しいと思われる。他のKafkaクライアントAPI(その他のMaven依存関係)を使用する必要がありますか?

UPDATE#1:

私はまた、(飼育係なし)ダイレクト・ストリームを試してみましたが、それは誤りに私を実行します:

val testTopicMap = testTopic.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> "XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072","bootstrap.servers" -> "XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072", 
             "auto.offset.reset" -> "smallest") 
val testEvents = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, testTopicMap).map(_._2) 

testEvents.print() 

17/01/02 12:23:15 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. 
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. 
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. 

UPDATE#2:

私はこの関連するトピックを見つけました。提案された解決策にはFixed it by setting the property 'advertised.host.name' as instructed by the comments in the kafka configuration (config/server.properties)と記載されています。 Kafkaがインストールされているリモートマシンでconfig/server.propertiesを変更する必要があることを正しく理解していますか

Kafka : How to connect kafka-console-consumer to fetch remote broker topic content?

答えて

0

私は(てEOFException)最近、同じ問題に遭遇し、その理由は、カフカのバージョンの不一致だったと思います。

もし私がここを見ればhttps://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10/1.6.2カフカストリーミングバージョンのコンパイル時間依存性は0.8ですが、0.10を使用します。

私が知っている限り、0.9は既に0.8と互換性がありません。ローカルの0.8または0.9ブローカーをセットアップして接続しようとすることはできますか?

+0

テストを行うために、Kafka 0.10.1.0-2.11をローカルにインストールした後、config/serverの 'listers'パラメータでポートを変更しました。プロパティ "から" 7072 "に変更し、最後に質問に記載されたPOMでコードを実行しました。問題なくメッセージを受け取ることができました。だから、私はバージョンの互換性の問題を破棄しました。私がうまく自分自身を説明したことを確かめるために、私はAWS EMRクラスターでコンシューマーコードを実行しますが、カフカクラスターはAWS以外の別のマシンです。 – Dinosaurius

+0

私は 'security.protocol'と何か関係があると考えていました。例えば、リモートKafkaクラスタが 'SSL'プロトコルを使用する場合、明らかに私の接続は失敗します。このケースで私が誤解しているのは、 'curl'とKafka-Rest-API(http://docs.confluent.io/2.0.0/kafka-rest/docs/api)を使ってメッセージを取得できる理由です。 html#consumers)を端末から(Scalaなし)。おそらく、リモートクラスタ内のConfluent APIの設定(SSLを使用しない)と関係があるかもしれません。 – Dinosaurius

関連する問題