それはより簡単になるように、なぜこのエラーはコンパイル時に表示されない...
ここに私のJavaコード、
エラーの下に取得<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId>
<version>0.10.0.1</version> </dependency> -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-twitter_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.restlet.jee</groupId>
<artifactId>org.restlet</artifactId>
<version>2.0.10</version>
</dependency>
を使用して
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]");
sparkConf.set("spark.streaming.concurrentJobs", "3");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(3000));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "x.xx.xxx.xxx:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
Collection<String> topics = Arrays.asList("topicName");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
stream.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
System.out.println("file data");
return new Tuple2<>(record.key(), record.value());
}
});
依存関係があります、
Exception in thread "main" java.lang.NoClassDefFoundError: org.apache.spark.internal.Logging
at java.lang.ClassLoader.defineClassImpl(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:346)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:154)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:727)
at java.net.URLClassLoader.access$400(URLClassLoader.java:95)
at java.net.URLClassLoader$ClassFinder.run(URLClassLoader.java:1182)
at java.security.AccessController.doPrivileged(AccessController.java:686)
at java.net.URLClassLoader.findClass(URLClassLoader.java:602)
at java.lang.ClassLoader.loadClassHelper(ClassLoader.java:846)
at java.lang.ClassLoader.loadClass(ClassLoader.java:825)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:325)
at java.lang.ClassLoader.loadClass(ClassLoader.java:805)
at org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe(ConsumerStrategy.scala)
at spark.KafkaConsumerDirectStream.main(KafkaConsumerDirectStream.java:45)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:607)
at java.lang.ClassLoader.loadClassHelper(ClassLoader.java:846)
at java.lang.ClassLoader.loadClass(ClassLoader.java:825)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:325)
at java.lang.ClassLoader.loadClass(ClassLoader.java:805)
... 14 more
この問題は通常、クラスパスには依存関係がないために発生します。クラスパスを設定するにはいくつかの方法がありますが、どのメソッドを使用しているか表示できますか? –
上記のコードはMainメソッド内にあります。 –