2016-10-10 12 views
1

このコードを実行すると、次のエラーが発生します。私は他の答えをチェックしたが、それは私のためにはうまくいかなかった。kafka.cluster.BrokerEndPointをkafka.cluster.Brokerにキャストできないのはなぜですか?

誰かがこれをどのようにスローするか分かりませんか?私は依存関係を調べた。

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 

import java.util.*; 


/** 
* Created by jonas on 10/10/16. 
*/ 
public class SparkStream { 

    public static void main(String[] args){ 

     SparkConf conf = new SparkConf() 
       .setAppName("kafka-sandbox") 
       .setMaster("local[*]"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

     Map<String, String> kafkaParams = new HashMap<>(); 
     kafkaParams.put("metadata.broker.list", "localhost:9092"); 
     Set<String> topics = Collections.singleton("Test"); 

     JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class 
     , String.class, kafka.serializer.StringDecoder.class, kafka.serializer.StringDecoder.class, kafkaParams, topics); 

     directKafkaStream.foreachRDD(rdd -> { 
      System.out.println("--- New RDD with " + rdd.partitions().size() 
        + " partitions and " + rdd.count() + " records"); 
      rdd.foreach(record -> System.out.println(record._2)); 
     }); 

     // TODO: processing pipeline 

     ssc.start(); 


    } 


} 

私は以前ポート9092.でポート2181での飼育係とカフカサーバー0.9.0.0を始めた。しかし、私は、Sparkドライバで次のエラーを取得する:

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker 
     at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:97) 
     at scala.Option.map(Option.scala:146) 
     at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:97) 
     at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:94) 
     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) 
     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) 
     at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
     at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) 
     at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) 
     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 
     at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:94) 
     at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:93) 
     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) 
     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) 
     at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) 
     at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) 
     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 
     at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:93) 
     at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:92) 
     at scala.util.Either$RightProjection.flatMap(Either.scala:522) 
     at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:92) 
     at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:186) 
     at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:168) 
     at org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:157) 
     at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215) 
     at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211) 
     at scala.util.Either$RightProjection.flatMap(Either.scala:522) 
     at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) 
     at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) 
     at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) 
     at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) 
     at SparkStream.main(SparkStream.java:28) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 
+0

あなたはbootstrap.servers' 'にあなたの' metadata.broker.list'を変更しようとしましたか?この仕事は私のためです。 –

+0

はい私はこれを試みた。 –

答えて

1

依存関係が互いに互換性があることを確認してください。 はここで一緒に働くものもあります:

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming-kafka_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 
1

これは私のライブラリの問題のように思えますデバッグしています。 私はバージョン0.10.0.0とスカラバージョン2.11のkafkaサーバを使用しています 私のスパークコア/ストリーミングバージョンは2.11です:2.0.1 スパークストリーミングカフカlibは0-8_2.11です:2.0.1 カフカクライアントとストリームは0.10.0.1 私はkafka 2.11:0.10.0.1 libを使用するとこのエラーが発生しますが、私はkafka 2.10:0.10.0.1を使用すると問題なく動作します。

関連する問題