Spark Streamingの新機能、 Spark StreamingからKafkaUtilsを使用してKafkaへの直接ストリームを作成しました。ここでスカラマニフェストが見つからないSpark Streaming
Exception in thread "main" java.lang.NoClassDefFoundError: scala/reflect/ClassManifest at kafka.utils.Log4jController$.(Log4jController.scala:29) at kafka.utils.Log4jController$.(Log4jController.scala) at kafka.utils.Logging$class.$init$(Logging.scala:29) at kafka.api.TopicMetadataRequest$.(TopicMetadataRequest.scala:28) at kafka.api.TopicMetadataRequest$.(TopicMetadataRequest.scala) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:130) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:119) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) at consumer.spark.KafkaConsumer.consumeMessages(KafkaConsumer.java:103) at consumer.spark.KafkaConsumer.executeStream(KafkaConsumer.java:142) at consumer.spark.KafkaConsumerService.main(KafkaConsumerService.java:27) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 23 more
は私のpom.xmlは
<!-- Scala version -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.3</version>
</dependency>
<!--Spark Core -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<!--Spark Streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<!--Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
です:
jssc = new JavaStreamingContext(sparkConf, Durations.seconds(KafkaConfig.getInstance().getBatchDuration()));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put(KafkaConstants.ZOOKEEPER_CONNECTION_STRING, zookeeperHost + ":" + zookeeperPort);
kafkaParams.put(KafkaConstants.METADATA_BROKER_LIST_STRING, bootstrapHost + ":" + bootstrapPort);
kafkaParams.put(KafkaConstants.GROUP_ID_STRING, groupId);
HashSet<String> topicSet = new HashSet<String>();
topicSet.add(topic);
JavaPairInputDStream<String, String> topicStream = KafkaUtils.createDirectStream(jssc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicSet);
JavaDStream<String> topicMessages = topicStream.map(Tuple2::_2);
topicMessages.print()
以下のように私はそれがフルスタックがここにある
java.lang.NoClassDefFoundError: scala/reflect/ClassManifest
エラーの下に投げているジョブを実行しようとしたとき
私に知らせる方法を教えてくださいそれをどうぞ。