私はスパークストリーミングダイレクト・ストリームとカフカのトピックから読み取るしようとしているが、私は次のエラーが表示されますはカフカへのストリーミングスパークから接続できません:org.apache.spark.SparkExceptionを:java.net.SocketTimeoutException
INFO consumer.SimpleConsumer: Reconnect due to socket error: java.net.SocketTimeoutException
ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
私はカフカ0.7.1とスパーク1.5.2を持っています。
私は、次のコードを使用しています:
val ssc : StreamingContext = new StreamingContext(sparkContext, Seconds(60))
val topicsSet = Set("myTopic")
val kafkaParams = Map[String, String]
("metadata.broker.list" -> "mybrokerhostname1:9092,mybrokerhostname2:9092")
val stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet)
私は他のアプリケーションが正しくそれから読んでいるので、話題がすでに存在していることを確信しています。
はあなたがからカフカにアクセスしようとしている場所からのネットワーク通信が正しく設定されていますか? –