2017-11-06 16 views
0

私はkafka2.11-0.11.0.1、scala 2.11、およびspark 2.2.0を使用しています。私はのJavaのビルド・パスをEclipseに以下のjarファイルを追加しました:kafka.cluster.BrokerEndPointをkafka.cluster.Brokerにキャストできません。

kafka-streams-0.11.0.1, 
kafka-tools-0.11.0.1, 
spark-streaming_2.11-2.2.0, 
spark-streaming-kafka_2.11-1.6.3, 
spark-streaming-kafka-0-10_2.11-2.2.0, 
kafka_2.11-0.11.0.1. 

そして、私のコードは以下の通りです:

import kafka.serializer.StringDecoder 
import kafka.api._ 
import kafka.api.ApiUtils._ 
import org.apache.spark.SparkConf 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.dstream._ 
import org.apache.spark.streaming.kafka 
import org.apache.spark.streaming.kafka._ 
import org.apache.spark.streaming.kafka.KafkaUtils 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.SparkContext._ 


object KafkaExample { 

    def main(args: Array[String]) { 

    val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1)) 

    val kafkaParams = Map("bootstrap.servers" -> "kafkaIP:9092") 

    val topics = List("logstash_log").toSet 

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics).map(_._2) 

    stream.print() 

    ssc.checkpoint("C:/checkpoint/") 
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

それはちょうど火花やカフカを接続するための非常に簡単なコードです。しかし、私はこのエラーを取得する:私が間違っているの

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) 
    at scala.Option.map(Option.scala:146) 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87) 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:94) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86) 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85) 
    at scala.util.Either$RightProjection.flatMap(Either.scala:522) 
    at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85) 
    at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179) 
    at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161) 
    at org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150) 
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215) 
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211) 
    at scala.util.Either$RightProjection.flatMap(Either.scala:522) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) 
    at com.defne.KafkaExample$.main(KafkaExample.scala:28) 
    at com.defne.KafkaExample.main(KafkaExample.scala) 

注: "bootstrap.server"の代わりに "metadata.broker.list"を試みましたが、変更はありませんでした。

答えて

0

あなたの問題は、Kafkaの依存関係があまりにも多くロードされていることです。実行時に選択されたものは、Sparkが期待するバージョンとバイナリ互換ではありません。

実際のの問題は、PartitionMetadataクラスです。 0.8.2では、それは(あなたがspark-streaming-kafka_2.11-1.6.3から得るものである)次のようになります。

case class PartitionMetadata(partitionId: Int, 
          val leader: Option[Broker], 
          replicas: Seq[Broker], 
          isr: Seq[Broker] = Seq.empty, 
          errorCode: Short = ErrorMapping.NoError) extends Logging 

そして、このような> 0.10.0.0中:

case class PartitionMetadata(partitionId: Int, 
          leader: Option[BrokerEndPoint], 
          replicas: Seq[BrokerEndPoint], 
          isr: Seq[BrokerEndPoint] = Seq.empty, 
          errorCode: Short = Errors.NONE.code) extends Logging 

leaderOption[BrokerEndPoint]Option[Broker]からどのように変化するかを参照してください?それがスパークが叫んでいるものです。 (あなたがスパーク2.2を使用している場合)

あなたの依存関係をクリーンアップする必要があり、あなたが必要とするすべては、次のとおりです。迅速な対応のYuvalため

spark-streaming_2.11-2.2.0, 
spark-streaming-kafka-0-10_2.11-2.2.0 
+0

Thxを。 spark-streaming-kafka_2.11-1.6.3(import ..... KafkaUtils)を削除するとエラーが発生します。 kafka_2.11-0.11.0.1トップ3のインポートを削除するとエラーが発生します。だから、私はメインのメソッドを使用することはできません。私はトップ2瓶を取り外した、それは大丈夫です。しかし、この5つのjarファイルでは、次のようになります。スレッド "main"の例外java.lang.NoClassDefFoundError:org/apache/kafka/common/network/Send。私は全く混乱している。私はそれらを取り除くことはできません、私が含まれていれば、まだdoesnt仕事。 : –

+0

'spark-streaming-kafka-0-10_2.11-2.2.0'と' spark-streaming_2.11-2.2.0 'のみを使用すると、エラーが発生しますか?それは起こりそうもありません。 –

+0

私のコードは間違っていますか?私はkafkaautilsが必要だと思うし、それはkafka_2.11-0.11.0.1に入っているので、私はstringdecoderが必要で、それはカフカストリームにある.2つのコード部分が必要ないなら、それらを削除することができる。あなたが推測するように、私は初心者であり、カフカとスパークを接続するためのクイックガイドが必要です。 –

関連する問題