2016-10-31 5 views
0

スパークデータフレームをパラケートファイルとして保存しようとしていますが、以下の例外により達成できません。何か不足している場合は教えてください。データフレームはカフカストリームrdds。スパークでデータフレームをパラケットファイルとして保存しない

dataframe.write.paraquet("/user/space") 

スタック例外:

Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat 
at java.lang.ClassLoader.defineClass1(Native Method) 
at java.lang.ClassLoader.defineClass(ClassLoader.java:763) 
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 
at java.net.URLClassLoader.access$100(URLClassLoader.java:73) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:368) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:362) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:361) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
at java.lang.Class.forName0(Native Method) 
at java.lang.Class.forName(Class.java:348) 
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370) 
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) 
at java.util.ServiceLoader$1.next(ServiceLoader.java:480) 
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:263) 
at scala.collection.AbstractTraversable.filter(Traversable.scala:105) 
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:59) 
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:219) 
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) 
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) 
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334) 
at KafkaHbaseWrite$$anonfun$main$1.apply(KafkaHbaseWrite.scala:309) 
at KafkaHbaseWrite$$anonfun$main$1.apply(KafkaHbaseWrite.scala:280) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at scala.util.Try$.apply(Try.scala:161) 
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat 
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
... 50 more 

のpom.xmlのsnaphot使用:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>Paymentprocessor</groupId> 
    <artifactId>research</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
    <packaging>jar</packaging> 


    <name>research</name> 

    <properties> 
     <maven.compiler.source>1.8</maven.compiler.source> 
     <maven.compiler.target>1.8</maven.compiler.target> 
     <encoding>UTF-8</encoding> 
     <scala.tools.version>2.10</scala.tools.version> 
     <scala.version>2.10.6</scala.version> 
     <spark.version>1.6.1</spark.version> 
     <scalaCompatVersion>2.10</scalaCompatVersion> 
     <maven-scala-plugin.version>2.15.2</maven-scala-plugin.version> 

    </properties> 

    <repositories> 
     <repository> 
      <id>central</id> 

      <name>Maven Repository</name> 
      <url>https://repo1.maven.org/maven2</url> 
      <releases> 
       <enabled>true</enabled> 
      </releases> 
      <snapshots> 
       <enabled>false</enabled> 
      </snapshots> 
     </repository> 
     <repository> 
      <id>scala-tools.org</id> 
      <name>Scala-tools Maven2 Repository</name> 
      <url>http://scala-tools.org/repo-releases</url> 
     </repository> 
     <repository> 
      <id>mapr-releases</id> 
      <url>http://repository.mapr.com/maven/</url> 
      <snapshots> 
       <enabled>false</enabled> 
      </snapshots> 
      <releases> 
       <enabled>true</enabled> 
      </releases> 
     </repository> 

    </repositories> 
    <dependencies> 
    <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-compiler</artifactId> 
      <version>${scala.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-library</artifactId> 
      <version>${scala.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_${scala.tools.version}</artifactId> 
      <version>${spark.version}</version> 
      <scope>provided</scope> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-sql_${scala.tools.version}</artifactId> 
      <version>${spark.version}</version> 
      <scope>provided</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_${scala.tools.version}</artifactId> 
      <version>${spark.version}</version> 
      <scope>provided</scope> 
     </dependency> 

     <!-- <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> 
      <version>2.8.0</version> </dependency> --> 

     <dependency> 
      <groupId>org.scalatest</groupId> 
      <artifactId>scalatest</artifactId> 
      <version>1.2</version> 
      <scope>test</scope> 
     </dependency> 


     <dependency> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
      <version>1.2.15</version> 
      <exclusions> 
       <exclusion> 
        <groupId>com.sun.jmx</groupId> 
        <artifactId>jmxri</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>com.sun.jdmk</groupId> 
        <artifactId>jmxtools</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>javax.jms</groupId> 
        <artifactId>jms</artifactId> 
       </exclusion> 
      </exclusions> 
     </dependency> 
     <dependency> 
      <groupId>net.sf.jopt-simple</groupId> 
      <artifactId>jopt-simple</artifactId> 
      <version>3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-simple</artifactId> 
      <version>1.6.4</version> 
     </dependency> 


     <dependency> 
      <groupId>com.yammer.metrics</groupId> 
      <artifactId>metrics-core</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.yammer.metrics</groupId> 
      <artifactId>metrics-annotation</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.easymock</groupId> 
      <artifactId>easymock</artifactId> 
      <version>3.0</version> 
      <scope>test</scope> 
     </dependency> 




     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-mllib_${scala.tools.version}</artifactId> 
      <version>${spark.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.commons</groupId> 
      <artifactId>commons-csv</artifactId> 
      <version>1.1</version> 
     </dependency> 
     <dependency> 
      <groupId>com.jsuereth</groupId> 
      <artifactId>scala-arm_2.10</artifactId> 
      <version>1.4</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.hbase</groupId> 
      <artifactId>hbase-server</artifactId> 
      <version>1.2.3</version> 
     </dependency> 
     <dependency> 
      <groupId>com.101tec</groupId> 
      <artifactId>zkclient</artifactId> 
      <version>0.7</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka-producer_2.10</artifactId> 
      <version>1.6.1</version> 
     </dependency> 
     <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-v09_2.10</artifactId> 
      <version>1.6.1-mapr-1607</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka-producer_2.10</artifactId> <version>1.6.1-mapr-1607</version> 
      </dependency> --> 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka_2.10</artifactId> 
      <version>0.8.2.2</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka_2.10</artifactId> 
      <version>1.6.1</version> 
     </dependency> 


     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.10.0.0</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.hbase</groupId> 
      <artifactId>hbase-client</artifactId> 
      <version>1.2.3</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.parquet</groupId> 
      <artifactId>parquet-hadoop</artifactId> 
      <version>1.9.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-hivecontext-compatibility_2.10</artifactId> 
      <version>2.0.0-preview</version> 
     </dependency> 

    </dependencies> 


    <build> 
     <sourceDirectory>src/main/scala</sourceDirectory> 
     <testSourceDirectory>src/test/scala</testSourceDirectory> 
     <plugins> 
      <plugin> 
       <groupId>org.scala-tools</groupId> 
       <artifactId>maven-scala-plugin</artifactId> 
       <version>2.15.2</version> 
       <executions> 
        <execution> 
         <goals> 
          <goal>compile</goal> 
         </goals> 
        </execution> 
       </executions> 
      </plugin> 

      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-eclipse-plugin</artifactId> 
       <version>2.8</version> 
      </plugin> 


     </plugins> 
    </build> 
</project> 

コードスニペット:

val messagesDStream: InputDStream[(String, String)] = { 

      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)  
     } 



     val valuesDStream: DStream[String] = messagesDStream.map(_._2) 

     /*Construct RDD from Kafka*/ 

     println("Count value"+valuesDStream.count()) 

     /*Construct RDD from Kafka*/  
      valuesDStream.foreachRDD { rdd => 
      // There exists at least one element in RDD 
      if (!rdd.isEmpty) { 
      val count = rdd.count 
      println("count received " + count) 
      val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) 


      import sqlContext.implicits._ 
      import org.apache.spark.sql.functions._ 


      val cdrDF = rdd.map(CallCreditCardRecord.parseCallCreditCardRecord).toDF() 
      val cardRDD = cdrDF.cache() 
      println("PRinting") 

      cdrDF.registerTempTable("Card") 
      cdrDF.printSchema() 
      cdrDF.show() 

      cdrDF.write.format("parquet").save("/usr/local/Cellar/hadoop/hdfs/tmp/nm-local-dir/CreditCardRecord.parquet") 

      } 
     } 

    ssc.start() 
    //ssc.awaitTermination() 

    ssc.stop(stopSparkContext = true, stopGracefully = true) 
+0

RDDからDFを作成した方法に関するコードを投稿できますか? – Shankar

+0

コードスニペットが追加されました。 –

答えて

2

あなたは異なるスパークを混合しているように見えますバージョン - おそらく、あなたのクラスタ(mas ter/workers)は、あなたのドライバーアプリケーションが別のアプリケーションを実行している間に1つのSparkバージョンを実行するので、これらのバージョンのいずれかにのみ存在するクラスに対してClassNotFoundExceptionが得られます。

具体的には、クラスorg.apache.spark.sql.execution.datasources.FileFormatは〜2週間前に作成されました(thisコミットによって)、公式のSparkリリースの一部ではありません:あなたのコンポーネントの1つでSparkの「最新マスター」バージョンを使用していますか?もしそうなら - すべてのコンポーネントで(しかし、いくつかのバグと粗いエッジを見て準備する)、またはすべてのコードがコンパイルされ、1つの公式バージョンで実行されていることを確認してください。

EDIT(POMファイルの後に掲載):あなたのPOMファイルは、2つの異なるスパークバージョン含まれています - ほとんどの依存関係のため1.6.1、そして最後の一つを2.0.0-previewを:

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-hivecontext-compatibility_2.10</artifactId> 
    <version>2.0.0-preview</version> 
</dependency> 

あなたはこの依存関係を削除する必要があります(それはありません1.6.1で必要)。

+1

1.6.1にはこのクラスは含まれていません(https://github.com/apache/spark/tree/v1.6.1/sql/core/src/main/scala/org/apache/spark/sql/execution/を参照)。私は1.6.1を使ってパーケットファイルを正常に保存していますので、1.6.1以外のSparkクラスが何とかあなたのクラスパスにうんざりしていることを確信しています。 –

+1

Pom.xmlを添付しています。私はトラックにいますか? –

+0

更新された回答を参照 –

関連する問題