2017-08-22 8 views
0

私はspark SQL操作を実行しているときにエラーが発生しています。 私はスパーク1.6.3のバージョンを実行しており、すべてのjarファイルが同じバージョンのpom.xmlに属していることを確認しており、参照用にコードが貼り付けられています。"streaming-job-executor-0"スレッドの例外java.lang.NoClassDefFoundError

私はSparkConsumerでストリーミングされているjsonデータを解析するために多くの方法を試しました。私は以下の問題で立ち往生しています。助けてください

注:すべてのスパーク、ストリーミング、SQLは1.6.3に属しているjarsのバージョンの不一致はありません。ブログによれば、言及されたエラースタックはJARSバージョンの不一致によるものです。

*package datapipeline; 
    import java.util.Arrays; 
    import java.util.HashMap; 
    import java.util.Map; 
    import java.util.regex.Pattern; 
    import org.apache.log4j.Level; 
    import org.apache.log4j.Logger; 
    import org.apache.spark.SparkConf; 
    import org.apache.spark.SparkContext; 
    import org.apache.spark.api.java.JavaSparkContext; 
    import org.apache.spark.api.java.function.Function; 
    import org.apache.spark.sql.DataFrame; 
    import org.apache.spark.sql.SQLContext; 
    import org.apache.spark.storage.StorageLevel; 
    import org.apache.spark.streaming.Duration; 
    import org.apache.spark.streaming.api.java.JavaDStream; 
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; 
    import org.apache.spark.streaming.api.java.JavaStreamingContext; 
    import org.apache.spark.streaming.kafka.KafkaUtils; 
    import org.json.simple.JSONObject; 
    import org.json.simple.parser.JSONParser; 
    import org.onosproject.net.Device; 
    import scala.Tuple2; 
    public final class SparkConsumer { 
     //private static SparkContext sc = new SparkContext(); 
     private static final Pattern SPACE = Pattern.compile(" "); 
     private static void setLogLevels() { 
      boolean log4jInitialized = Logger.getRootLogger().getAllAppenders().hasMoreElements(); 
      if (!log4jInitialized) { 
       // We first log something to initialize Spark's default logging, then we override the 
       // logging level. 
       Logger.getLogger(SparkConsumer.class).info("Setting log level to [WARN] for streaming example." + 
         " To override add a custom log4j.properties to the classpath."); 
       Logger.getRootLogger().setLevel(Level.WARN); 
      } 
     } 
     public static void main(String[] args) throws Exception{ 
      String jars[]={"C:\\DeviceStreaming-1.0.0.jar"}; 
      setLogLevels() 
      SparkConf sparkConf = new SparkConf().setAppName("CustomerKafkaConsumerThread") 
        .set("spark.local.ip","16.214.240.4:9092") 
        .setMaster("local[*]").setJars(jars); 
      JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(3000)); 
      JavaSparkContext ctx = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(sparkConf)); 
      SQLContext sqlContext = new SQLContext(ctx); 
      Map<String, Integer> topicMap = new HashMap<>(); 
      topicMap.put("iot", 10); 
      JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc,"16.214.240.4:2181","SparkConsumer", topicMap,StorageLevel.MEMORY_ONLY()); 
      messages.print(); 

      JavaDStream<String> json = messages.map(
        new Function<Tuple2<String, String>, String>() { 
         public String call(Tuple2<String, String> message) { 

          return message._2(); 
         } 
        } 
       ); 

      json.foreachRDD(rdd -> { 

       //DataFrame df = sqlContext.read().json(rdd); 
       DataFrame df=sqlContext.createDataFrame(rdd, Device.class); 
       df.show(); 
       df.printSchema(); 
      }); 

      jssc.start(); 
      jssc.awaitTermination(); 


     } 
    }* 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>com.iot.app.kafka</groupId> 
    <artifactId>DeviceStreaming</artifactId> 
    <packaging>jar</packaging> 
    <version>1.0.0</version> 
    <name>DeviceStreaming</name> 
    <dependencies> 
     <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --> 
     <dependency> 
      <groupId>com.fasterxml.jackson.core</groupId> 
      <artifactId>jackson-databind</artifactId> 
      <version>2.7.1-1</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.9.0.0</version> 
     </dependency> 
     <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-network-common_2.11</artifactId> 
      <version>2.1.0</version> </dependency> --> 
     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-network-common_2.11 --> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-network-common_2.11</artifactId> 
      <version>1.6.3</version> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.11 --> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka_2.11</artifactId> 
      <version>1.6.3</version> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 --> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_2.11</artifactId> 
      <version>1.6.3</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-sql_2.11</artifactId> 
      <version>1.6.3</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.glassfish/javax.json --> 
     <dependency> 
      <groupId>org.glassfish</groupId> 
      <artifactId>javax.json</artifactId> 
      <version>1.0.4</version> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-client</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.11</artifactId> 
      <version>1.6.3</version> 
     </dependency> 
     <dependency> 
      <groupId>org.spark-project.spark</groupId> 
      <artifactId>unused</artifactId> 
      <version>1.0.0</version> 
      <scope>provided</scope> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations --> 
     <dependency> 
      <groupId>com.fasterxml.jackson.core</groupId> 
      <artifactId>jackson-annotations</artifactId> 
      <version>2.4.0</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/javax.json/javax.json-api --> 
     <dependency> 
      <groupId>javax.json</groupId> 
      <artifactId>javax.json-api</artifactId> 
      <version>1.1</version> 
     </dependency> 

     <dependency> 
      <groupId>com.fasterxml.jackson.core</groupId> 
      <artifactId>jackson-databind</artifactId> 
      <version>2.4.4</version> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core --> 
     <dependency> 
      <groupId>com.fasterxml.jackson.core</groupId> 
      <artifactId>jackson-core</artifactId> 
      <version>2.4.4</version> 
     </dependency> 
     <dependency> 
      <groupId>com.fasterxml.jackson.module</groupId> 
      <artifactId>jackson-module-scala_2.10</artifactId> 
      <version>2.8.2</version> 
     </dependency> 
     <dependency> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
      <version>1.2.17</version> 
     </dependency> 
     <dependency> 
      <groupId>junit</groupId> 
      <artifactId>junit</artifactId> 
      <version>4.12</version> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/com.google.guava/guava --> 
     <dependency> 
      <groupId>com.google.guava</groupId> 
      <artifactId>guava</artifactId> 
      <version>14.0.1</version> 
     </dependency> 
     <!-- https://mvnrepository.com/artifact/org.onosproject/onos-api --> 
     <dependency> 
      <groupId>org.onosproject</groupId> 
      <artifactId>onos-api</artifactId> 
      <version>1.6.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-library</artifactId> 
      <version>2.11.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka-0-8-assembly_2.11</artifactId> 
      <version>2.0.0</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.avro</groupId> 
      <artifactId>avro</artifactId> 
      <version>1.8.0</version> 
     </dependency> 

     <dependency> 
      <groupId>com.twitter</groupId> 
      <artifactId>bijection-avro_2.10</artifactId> 
      <version>0.9.2</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/com.googlecode.json-simple/json-simple --> 
<dependency> 
    <groupId>com.googlecode.json-simple</groupId> 
    <artifactId>json-simple</artifactId> 
    <version>1.1</version> 
</dependency> 
    </dependencies> 
    <properties> 
     <maven.compiler.source>1.8</maven.compiler.source> 
     <maven.compiler.target>1.8</maven.compiler.target> 
     <scala.version>2.10.4</scala.version> 
    </properties> 
</project> 

    *Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class 
     at scala.reflect.internal.util.WeakHashSet.<init>(WeakHashSet.scala:19) 
     at scala.reflect.internal.util.WeakHashSet$.apply(WeakHashSet.scala:429) 
     at scala.reflect.internal.SymbolTable$perRunCaches$.<init>(SymbolTable.scala:310) 
     at scala.reflect.internal.SymbolTable.perRunCaches$lzycompute(SymbolTable.scala:304) 
     at scala.reflect.internal.SymbolTable.perRunCaches(SymbolTable.scala:304) 
     at scala.reflect.internal.Symbols$class.$init$(Symbols.scala:71) 
     at scala.reflect.internal.SymbolTable.<init>(SymbolTable.scala:13) 
     at scala.reflect.runtime.JavaUniverse.<init>(JavaUniverse.scala:12) 
     at scala.reflect.runtime.package$.universe$lzycompute(package.scala:16) 
     at scala.reflect.runtime.package$.universe(package.scala:16) 
     at org.apache.spark.sql.types.AtomicType.<init>(AbstractDataType.scala:134) 
     at org.apache.spark.sql.types.NumericType.<init>(AbstractDataType.scala:144) 
     at org.apache.spark.sql.types.FractionalType.<init>(AbstractDataType.scala:207* 
+0

あなたは 'jackson-module-scala_2.10 'に依存していますが、残りのScalaのものは2.11を使用しています。そしてあなたのScalaのバージョンプロパティは2.10.4と言います。すべての場合に2.10または2.11のいずれかを使用する必要があります。 – Thilo

+0

@Thilo問題を指摘してくれてありがとう。それは2.11にアップグレードした後に機能しました。 – user3837415

答えて

0

ここにはScalaのバージョンが混在しています。

<properties> 
    <maven.compiler.source>1.8</maven.compiler.source> 
    <maven.compiler.target>1.8</maven.compiler.target> 
    <scala.version>2.10.4</scala.version> 
</properties> 

これは2.10と言いますが、ほとんどの(すべてではありません)依存関係は2.11バイナリを引き込みます。

Scalaはバージョン間で互換性がないため、一貫性を保つ必要があります(2.11が望ましい)。

関連する問題