2017-11-15 16 views
0

Spark Streamingの新機能、 Spark StreamingからKafkaUtilsを使用してKafkaへの直接ストリームを作成しました。ここでスカラマニフェストが見つからないSpark Streaming

Exception in thread "main" java.lang.NoClassDefFoundError: scala/reflect/ClassManifest at kafka.utils.Log4jController$.(Log4jController.scala:29) at kafka.utils.Log4jController$.(Log4jController.scala) at kafka.utils.Logging$class.$init$(Logging.scala:29) at kafka.api.TopicMetadataRequest$.(TopicMetadataRequest.scala:28) at kafka.api.TopicMetadataRequest$.(TopicMetadataRequest.scala) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:130) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:119) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) at consumer.spark.KafkaConsumer.consumeMessages(KafkaConsumer.java:103) at consumer.spark.KafkaConsumer.executeStream(KafkaConsumer.java:142) at consumer.spark.KafkaConsumerService.main(KafkaConsumerService.java:27) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 23 more

は私のpom.xmlは

<!-- Scala version --> 
<dependency> 
    <groupId>org.scala-lang</groupId> 
    <artifactId>scala-library</artifactId> 
    <version>2.10.3</version> 
</dependency> 

<!--Spark Core --> 
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 --> 
<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.11</artifactId> 
    <version>2.1.1</version> 
</dependency> 

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 --> 
<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> 
    <version>2.1.1</version> 
</dependency> 

<!--Spark Streaming --> 
<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming_2.11</artifactId> 
    <version>2.1.1</version> 
</dependency> 

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.10 --> 
<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-hive_2.11</artifactId> 
    <version>2.1.1</version> 
</dependency> 

<!--Kafka --> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.1</version> 
</dependency> 

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --> 
<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-sql_2.11</artifactId> 
    <version>2.1.1</version> 
</dependency> 
です:

jssc = new JavaStreamingContext(sparkConf, Durations.seconds(KafkaConfig.getInstance().getBatchDuration())); 

HashMap<String, String> kafkaParams = new HashMap<>(); 

kafkaParams.put(KafkaConstants.ZOOKEEPER_CONNECTION_STRING, zookeeperHost + ":" + zookeeperPort); 
kafkaParams.put(KafkaConstants.METADATA_BROKER_LIST_STRING, bootstrapHost + ":" + bootstrapPort); 
kafkaParams.put(KafkaConstants.GROUP_ID_STRING, groupId); 

HashSet<String> topicSet = new HashSet<String>(); 
topicSet.add(topic); 

JavaPairInputDStream<String, String> topicStream = KafkaUtils.createDirectStream(jssc, String.class, 
     String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicSet); 

JavaDStream<String> topicMessages = topicStream.map(Tuple2::_2); 
topicMessages.print() 

以下のように私はそれがフルスタックがここにある

java.lang.NoClassDefFoundError: scala/reflect/ClassManifest

エラーの下に投げているジョブを実行しようとしたとき

私に知らせる方法を教えてくださいそれをどうぞ。

答えて

0

Scala version -> 2.10.3を定義し、artifactID with 2.11と書いています。依存関係ごとにartifactIDを2.10に変更する必要があります。

0

pom.xmlでscalaのバージョンを2.11に変更すると動作します

関連する問題