2016-04-22 8 views
1

私はスパークストリーミングダイレクト・ストリームとカフカのトピックから読み取るしようとしているが、私は次のエラーが表示されますはカフカへのストリーミングスパークから接続できません:org.apache.spark.SparkExceptionを:java.net.SocketTimeoutException

INFO consumer.SimpleConsumer: Reconnect due to socket error: java.net.SocketTimeoutException 
ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.net.SocketTimeoutException 
java.net.SocketTimeoutException 
org.apache.spark.SparkException: java.net.SocketTimeoutException 
java.net.SocketTimeoutException 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) 
    at scala.util.Either.fold(Either.scala:97) 
    at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422) 

私はカフカ0.7.1とスパーク1.5.2を持っています。

私は、次のコードを使用しています:

val ssc : StreamingContext = new StreamingContext(sparkContext, Seconds(60)) 
    val topicsSet = Set("myTopic") 
    val kafkaParams = Map[String, String] 
      ("metadata.broker.list" -> "mybrokerhostname1:9092,mybrokerhostname2:9092") 

    val stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet) 

私は他のアプリケーションが正しくそれから読んでいるので、話題がすでに存在していることを確信しています。

+0

はあなたがからカフカにアクセスしようとしている場所からのネットワーク通信が正しく設定されていますか? –

答えて

0

古いバージョンのkafkaを使用しないようにしてください。あなたのケースでは(0.7.1)です。 0.7.1を使用する理由があれば教えてください。 例外を見ると、アプリケーションがkafkaブローカーに接続できないように見えます。

私は、この直接ストリームAPIを使用して、kafka 0.8.2から読み取りました。 https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala

希望すると、これで問題は解決します。

おかげ&よろしく、 ヴィカスジート

+0

あなたは正しいです。私はカフカ0.8.2で試してみました。それはスムーズに動作します。残念ながら、現在のところ、私たちのアプリケーションはKafka 0.7.1上で動作しているので、誰かが回避策を知らない限り、0.8.2に移行する必要があります。ご協力いただきありがとうございます! – nicola

関連する問題