私は、次のカフカの設定を使用します。スパークストリーミング(バージョン2.1.0)は、それらのホスト名(ない彼らのIP addrの)とカフカ(v.0.10.0)ブローカーを指し
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "10.30.3.41:9092,10.30.3.42:9092,10.30.3.43:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "123",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
すべてカフカブローカーが定義されています(上記のように)対応するIPアドレスを使用します。私はストリーミングコンテキストを起動したときに
はしかし、私は次のエラーを取得する:
16/12/31 01:46:06 DEBUG NetworkClient: Error connecting to node 1 at broker1:9092:
java.io.IOException: Can't resolve address: broker1:9092
at org.apache.kafka.common.network.Selector.connect(Selector.java:171)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498)
at org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:48)
...
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at org.apache.kafka.common.network.Selector.connect(Selector.java:168)
... 31 more
broker1は私のブローカーのホスト名です。クラスタにDNSを設定していないので、この名前はすべてのノードから解決できません。すべてのノードですべてのブローカーホスト名を/etc/hosts
に正しく追加することで、この問題を解決できます。残念ながら、私は本当に/etc/hosts
を管理したくないので、私は実際にスパークがIPアドレスを使ってブローカーに接続しているのを理解したいと思っています。具体的にはbootstrap.servers
です。