私はkafka2.11-0.11.0.1、scala 2.11、およびspark 2.2.0を使用しています。私はのJavaのビルド・パスをEclipseに以下のjarファイルを追加しました:kafka.cluster.BrokerEndPointをkafka.cluster.Brokerにキャストできません。
kafka-streams-0.11.0.1,
kafka-tools-0.11.0.1,
spark-streaming_2.11-2.2.0,
spark-streaming-kafka_2.11-1.6.3,
spark-streaming-kafka-0-10_2.11-2.2.0,
kafka_2.11-0.11.0.1.
そして、私のコードは以下の通りです:
import kafka.serializer.StringDecoder
import kafka.api._
import kafka.api.ApiUtils._
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.SparkContext._
object KafkaExample {
def main(args: Array[String]) {
val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))
val kafkaParams = Map("bootstrap.servers" -> "kafkaIP:9092")
val topics = List("logstash_log").toSet
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics).map(_._2)
stream.print()
ssc.checkpoint("C:/checkpoint/")
ssc.start()
ssc.awaitTermination()
}
}
それはちょうど火花やカフカを接続するための非常に簡単なコードです。しかし、私はこのエラーを取得する:私が間違っているの
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:146)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
at org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at com.defne.KafkaExample$.main(KafkaExample.scala:28)
at com.defne.KafkaExample.main(KafkaExample.scala)
?
注: "bootstrap.server"の代わりに "metadata.broker.list"を試みましたが、変更はありませんでした。
Thxを。 spark-streaming-kafka_2.11-1.6.3(import ..... KafkaUtils)を削除するとエラーが発生します。 kafka_2.11-0.11.0.1トップ3のインポートを削除するとエラーが発生します。だから、私はメインのメソッドを使用することはできません。私はトップ2瓶を取り外した、それは大丈夫です。しかし、この5つのjarファイルでは、次のようになります。スレッド "main"の例外java.lang.NoClassDefFoundError:org/apache/kafka/common/network/Send。私は全く混乱している。私はそれらを取り除くことはできません、私が含まれていれば、まだdoesnt仕事。 : –
'spark-streaming-kafka-0-10_2.11-2.2.0'と' spark-streaming_2.11-2.2.0 'のみを使用すると、エラーが発生しますか?それは起こりそうもありません。 –
私のコードは間違っていますか?私はkafkaautilsが必要だと思うし、それはkafka_2.11-0.11.0.1に入っているので、私はstringdecoderが必要で、それはカフカストリームにある.2つのコード部分が必要ないなら、それらを削除することができる。あなたが推測するように、私は初心者であり、カフカとスパークを接続するためのクイックガイドが必要です。 –