2016-12-31 3 views
1

私は、次のカフカの設定を使用します。スパークストリーミング(バージョン2.1.0)は、それらのホスト名(ない彼らのIP addrの)とカフカ(v.0.10.0)ブローカーを指し

val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "10.30.3.41:9092,10.30.3.42:9092,10.30.3.43:9092", 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[StringDeserializer], 
    "group.id" -> "123", 
    "auto.offset.reset" -> "latest", 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
) 

すべてカフカブローカーが定義されています(上記のように)対応するIPアドレスを使用します。私はストリーミングコンテキストを起動したときに

はしかし、私は次のエラーを取得する:

16/12/31 01:46:06 DEBUG NetworkClient: Error connecting to node 1 at broker1:9092: 
java.io.IOException: Can't resolve address: broker1:9092 
    at org.apache.kafka.common.network.Selector.connect(Selector.java:171) 
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498) 
    at org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:48) 
... 
Caused by: java.nio.channels.UnresolvedAddressException 
    at sun.nio.ch.Net.checkAddress(Net.java:101) 
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) 
    at org.apache.kafka.common.network.Selector.connect(Selector.java:168) 
    ... 31 more 

broker1は私のブローカーのホスト名です。クラスタにDNSを設定していないので、この名前はすべてのノードから解決できません。すべてのノードですべてのブローカーホスト名を/etc/hostsに正しく追加することで、この問題を解決できます。残念ながら、私は本当に/etc/hostsを管理したくないので、私は実際にスパークがIPアドレスを使ってブローカーに接続しているのを理解したいと思っています。具体的にはbootstrap.serversです。

答えて

1

私はあなたのカフカ構成がスパークよりも問題であると信じています。おそらくlistenersadvertised.listenersは設定されていないか、ホスト名を使用するように設定されています。実際にそうである場合、これらの値は消費者に広告され、観察された挙動をもたらす。これらのプロパティのIPアドレスを使用するようにブローカーを構成する

は、問題を解決する必要があります。

# Adjust security protocol according to your requirements 
# and replace public_host_ip with desired IP 
listeners=PLAINTEXT://public_host_ip:9092 # or 0.0.0.0:9092 
advertised.listeners=PLAINTEXT://public_host_ip:9092 
関連する問題