2017-06-06 8 views
0

私はKafkaコンシューマグループで複数のコンシューマインスタンスを試していましたが、常にkafka.common.NotLeaderForPartitionExceptionで失敗します。Kafka + SparkStreamingのコンシューマグループで複数のインスタンスを使用しているときNotLeaderForPartitionException

私のカフカクラスタは、3つのブローカーと、PartitionCount:2ReplicationFactor:3というトピックで構成されています。

SparkConsumer.java

public class SparkConsumer { 

private static Function2<Integer, Integer, Integer> MyReducerFunc = (a, b) -> a + b; 

public static void main(String[] args) throws Exception { 
    if (args.length < 2) { 
     System.err.println("Usage: SparkConsumer <brokers> <topics>\n" + 


     " <brokers> is a list of one or more Kafka brokers\n" + 
       " <topics> is a list of one or more kafka topics to consume from\n\n"); 
      System.exit(1); 
     } 

     //StreamingExamples.setStreamingLogLevels(); 

     String brokers = args[0]; 
     String topics = args[1]; 

     SparkConf sparkConf = new SparkConf().setMaster("local[5]").setAppName("SparkConsumer").set("spark.driver.host", "localhost"); 

     JavaSparkContext sc = new JavaSparkContext(sparkConf); 

     // Create a StreamingContext with a 2 second batch size 
     JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(2)); 

     Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); 

     Map<String, String> kafkaParams = new HashMap<>(); 
     kafkaParams.put("metadata.broker.list", brokers); 
     //kafkaParams.put("auto.offset.reset", "smallest"); 
     kafkaParams.put("group.id", "SparkConsumerGrp"); 
     kafkaParams.put("zookeeper.connect", "localhost:2181"); 

     // Create direct kafka stream with brokers and topics 
     JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
      jssc, 
      String.class, 
      String.class, 
      StringDecoder.class, 
      StringDecoder.class, 
      kafkaParams, 
      topicsSet 
     ); 
     //Aggregate data every 30 sec 
     JavaPairDStream<String, String> messages2 = 
       messages.window(Durations.seconds(30), Durations.seconds(30)); 


     messages2.foreachRDD(rdd -> { 

      long numHits = rdd.count(); 

      if(numHits == 0) 
       System.out.println("No new data fetched in last 30 sec"); 

      //Do Processing 
      else{ 
        System.out.println("\n\n----------------------------------Data fetched in the last 30 seconds: " + rdd.partitions().size() 
          + " partitions and " + numHits + " records------------------\n\n"); 

        //Convert to java log object  
        JavaRDD<ApacheAccessLog> logs = rdd.map(x-> x._2) 
                 .map(ApacheAccessLog::parseFromLogLine) 
                  .cache(); 

        //Find the bot ip addresses 
        JavaRDD<String> iprdd = logs.mapToPair(ip-> new Tuple2<>(ip.getIpAddress(),1)) 
          .reduceByKey(MyReducerFunc) 
           .filter(botip-> botip._2 > 50) 
            .keys(); 

        //If we find something, we store it in results dir on hdfs 

        if(iprdd.count() > 0) 
        { 
         sc.hadoopConfiguration().set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); 
         sc.hadoopConfiguration().set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); 

         String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss").format(new Date()); 
         //sc.hadoopConfiguration().set("mapreduce.output.basename", timeStamp); 
         iprdd.coalesce(1).saveAsTextFile("hdfs://quickstart.cloudera:8020/results/"+timeStamp); 
         JobConf jobConf=new JobConf(); 
           System.out.println("\n\n-------------Resuts successfully written to /results on hdfs-------------\n"); 
        } 

       }   

     }); 



     jssc.start(); 
     jssc.awaitTermination(); 
} 
} 

documentationからの私の理解では、私たちが既存のグループにそのインスタンスを追加するために、同じグループIDを持つ別のコンシューマ・プロセスを開始することができますということでした。したがって、私は2つの別々の端末でこのコードを実行します。

最初のインスタンスで:しかし、これは私が常に取得エラーです番目のインスタンスで

17/06/06 00:53:26 ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([topic5,1])) 
17/06/06 00:53:26 ERROR DirectKafkaInputDStream: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([topic5,0], [topic5,1])) 
17/06/06 00:53:27 ERROR JobScheduler: Error generating jobs for time 1496735582000 ms 
org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([topic5,0], [topic5,1])) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:133) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:158) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) 
    at scala.Option.orElse(Option.scala:289) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2$$anonfun$apply$29.apply(DStream.scala:900) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2$$anonfun$apply$29.apply(DStream.scala:899) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2.apply(DStream.scala:899) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2.apply(DStream.scala:877) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701) 
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264) 
    at org.apache.spark.streaming.dstream.DStream.slice(DStream.scala:877) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$1.apply(DStream.scala:871) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$1.apply(DStream.scala:871) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701) 
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264) 
    at org.apache.spark.streaming.dstream.DStream.slice(DStream.scala:870) 
    at org.apache.spark.streaming.dstream.WindowedDStream.compute(WindowedDStream.scala:65) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) 
    at scala.Option.orElse(Option.scala:289) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) 
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) 
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
Exception in thread "main" org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([topic5,0], [topic5,1])) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:133) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:158) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) 
    at scala.Option.orElse(Option.scala:289) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2$$anonfun$apply$29.apply(DStream.scala:900) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2$$anonfun$apply$29.apply(DStream.scala:899) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2.apply(DStream.scala:899) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2.apply(DStream.scala:877) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701) 
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264) 
    at org.apache.spark.streaming.dstream.DStream.slice(DStream.scala:877) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$1.apply(DStream.scala:871) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$1.apply(DStream.scala:871) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701) 
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264) 
    at org.apache.spark.streaming.dstream.DStream.slice(DStream.scala:870) 
    at org.apache.spark.streaming.dstream.WindowedDStream.compute(WindowedDStream.scala:65) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) 
    at scala.Option.orElse(Option.scala:289) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) 
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) 
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)  

:(私は最初のインスタンスの後rdd.count()、右が起こるを呼んでいるときに発生するようです私が持っているクラッシュ)

Exception in thread "streaming-job-executor-0" java.lang.Error: java.lang.InterruptedException 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: java.lang.InterruptedException 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) 
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:619) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) 
    at org.apache.spark.rdd.RDD.count(RDD.scala:1158) 
    at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:455) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:45) 
    at hadoopTest.hadoopTest.SparkConsumer.lambda$1(SparkConsumer.java:85) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:256) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:256) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:256) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:255) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
... 2 more 

これは私のpom.xml

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>hadoopTest</groupId> 
    <artifactId>hadoopTest</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
    <packaging>jar</packaging> 
    <name>hadoopTest</name> 
    <url>http://maven.apache.org</url> 
    <properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    </properties> 
    <dependencies> 
     <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>3.8.1</version> 
     <scope>test</scope> 
     </dependency> 
     <dependency> 
     <groupId>org.apache.hadoop</groupId> 
     <artifactId>hadoop-hdfs</artifactId> 
     <version>2.6.0</version> 
     </dependency> 
     <dependency> 
     <groupId>org.apache.hadoop</groupId> 
     <artifactId>hadoop-auth</artifactId> 
     <version>2.6.0</version> 
     </dependency> 
     <dependency> 
     <groupId>org.apache.hadoop</groupId> 
     <artifactId>hadoop-common</artifactId> 
     <version>2.6.0</version> 
     </dependency> 
     <dependency> 
     <groupId>org.apache.hadoop</groupId> 
     <artifactId>hadoop-mapreduce-client-core</artifactId> 
     <version>2.6.0</version> 
     </dependency> 
     <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.8.2.1</version> 
     </dependency> 
     <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.8.2.1</version> 
     <scope>compile</scope> 
     <exclusions> 
      <exclusion> 
       <artifactId>jmxri</artifactId> 
       <groupId>com.sun.jmx</groupId> 
      </exclusion> 
      <exclusion> 
       <artifactId>jms</artifactId> 
       <groupId>javax.jms</groupId> 
      </exclusion> 
      <exclusion> 
       <artifactId>jmxtools</artifactId> 
       <groupId>com.sun.jdmk</groupId> 
      </exclusion> 
     </exclusions> 
     </dependency> 
     <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.1.1</version> 
     <exclusions> 
      <exclusion> 
       <groupId>javax.validation</groupId> 
       <artifactId>validation-api</artifactId> 
      </exclusion> 
     </exclusions> 
     </dependency> 
     <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.11</artifactId> 
     <version>1.2.2</version> 
     </dependency> 
     <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> 
     <version>2.1.1</version> 
     </dependency> 
     <dependency> 
     <groupId>org.slf4j</groupId> 
     <artifactId>slf4j-log4j12</artifactId> 
     <version>1.7.5</version> 
     </dependency> 
     <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.1.1</version> 
     </dependency> 
     <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-tags_2.11</artifactId> 
     <version>2.1.1</version> 
     </dependency> 
     <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-library</artifactId> 
     <version>2.11.8</version> 
     </dependency> 
     <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-reflect</artifactId> 
     <version>2.10.2</version> 
     </dependency> 
     <dependency> 
     <groupId>net.jpountz.lz4</groupId> 
     <artifactId>lz4</artifactId> 
     <version>1.3</version> 
     </dependency> 
    </dependencies> 
</project> 

です一日中これをデバッグしようとしていて、紛失しています。私のブローカーがすべて立ち上がっているにもかかわらず、指導者を見つけることができないと言います。私はどんな助けにも感謝しています。ありがとう!

答えて

0

spark-streaming-kafka-0-10_2.11の代わりにspark-streaming-kafka-0-8_2.11を使用していたためです。また、kafka-clientsのバージョンを0.10.2.0に変更する必要があります。

関連する問題