2016-03-22 12 views
2

現在、IntelliJ、DataFrameSuiteBase およびSharedSparkContextでSparkでユニットテストを行っています。Spark Unitテストでエラーが発生しました。

私はWindows上で動作し、スパーク操作が行われない限りすべて完璧です。 はorg.apache.spark.sql.expressions.Windowオブジェクトを使用します。例えば

val accu = anosGroupees.select($"col1", $"avg(col2)", 
sum($"avg(col2)").over(Window.orderBy("col3").rowsBetween(Long.MinValue, 
-1)).as("mycolumn")) 

エラーは次のとおりです。

Task not serializable 
org.apache.spark.SparkException: Task not serializable 
     at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
     at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
     at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) 
     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) 
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
     at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) 
     at org.apache.spark.sql.execution.Window.doExecute(Window.scala:245) 
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
     at 
org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46) 
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
     at 
org.apache.spark.sql.execution.Coalesce.doExecute(basicOperators.scala:250) 
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
     at 
org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46) 
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
     at 
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashJoin.scala:82) 
     at 
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashJoin.scala:79) 
     at 
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:100) 
     at 
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashJoin.scala:79) 
     at 
org.apache.spark.sql.execution.joins.BroadcastHashJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashJoin.scala:79) 
     at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
     at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassCastException: 
org.apache.hive.com.esotericsoftware.kryo.Kryo cannot be cast to 
com.esotericsoftware.kryo.Kryo 
     at 
org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.serializePlan(HiveShim.scala:155) 
     at 
org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.writeExternal(HiveShim.scala:168) 
     at 
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) 
     at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 
     at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
     at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) 
     at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
     at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
     ... 39 more 

あなたはそれについてどう思いますか?クラスパスの問題はありますか? このコードを糸クラスター上で実行すると、問題はありません。 Windows上でmavenプロジェクトの下で発生したのは です。

<dependencies> 
    <dependency> 
     <groupId>ai.h2o</groupId> 
     <artifactId>sparkling-water-core_2.10</artifactId> 
     <version>1.5.10</version> 
    </dependency> 
    <dependency> 
     <groupId>ai.h2o</groupId> 
     <artifactId>h2o-scala_2.10</artifactId> 
     <version>3.8.0.3</version> 
    </dependency> 
    <dependency> 
     <groupId>ai.h2o</groupId> 
     <artifactId>h2o-app</artifactId> 
     <version>3.8.0.3</version> 
    </dependency> 
    <dependency> 
     <groupId>com.google.guava</groupId> 
     <artifactId>guava</artifactId> 
     <version>15.0</version> 
    </dependency> 
    <dependency> 
     <groupId>net.alchim31.maven</groupId> 
     <artifactId>scala-maven-plugin</artifactId> 
     <version>3.2.2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.scalatest</groupId> 
     <artifactId>scalatest_2.10</artifactId> 
     <version>2.2.6</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.slf4j</groupId> 
     <artifactId>slf4j-simple</artifactId> 
     <version>1.7.14</version> 
    </dependency> 
    <dependency> 
     <groupId>com.holdenkarau</groupId> 
     <artifactId>spark-testing-base_2.10</artifactId> 
     <version>1.5.2_0.3.1</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.10</artifactId> 
     <version>1.6.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.elasticsearch</groupId> 
     <artifactId>elasticsearch-hadoop</artifactId> 
     <version>2.1.2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-mllib_2.10</artifactId> 
     <version>1.6.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.10</artifactId> 
     <version>1.6.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-hive_2.10</artifactId> 
     <version>1.6.0</version> 
    </dependency> 
    <dependency> 
     <groupId>com.databricks</groupId> 
     <artifactId>spark-csv_2.10</artifactId> 
     <version>1.3.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.commons</groupId> 
     <artifactId>commons-csv</artifactId> 
     <version>1.2</version> 
    </dependency> 
    <dependency> 
     <groupId>joda-time</groupId> 
     <artifactId>joda-time</artifactId> 
     <version>2.4</version> 
    </dependency> 
    <dependency> 
     <groupId>org.elasticsearch</groupId> 
     <artifactId>elasticsearch-spark_2.10</artifactId> 
     <version>2.1.2</version> 
    </dependency> 
</dependencies> 

よろしく、 ブライス

答えて

0

根本的な原因は、ClassCastExceptionがある::POMで依存関係があるorg.apache.hive.com.esotericsoftware.kryo.Kryoが com.esotericsoftwareにキャストすることはできません.kryo.Kryo。

Javaで同じ問題が発生しました。

Hive-Exec jarは、Kryoの実装(影付き)でパッケージ化されています。 Spark-Hive jarでは、HiveShimクラスは別のKryoクラスにリンクしています。 UnitTestの回避策は、runtimeSerializationKryoを再割り当てすることです。

// For JUnit4 you can use BeforeClass 
@BeforeClass 
public static void otherKryoImpl() { 
    // com.esotericsoftware.kryo.Kryo << Not the: ORG.APACHE.HIVE Shaded version. 
    // In the class HiveShim (spark-hive_2.10) the com.esotericsoftware.kryo.Kryo is expected 
    // org.apache.hadoop.hive.ql.exec.Utilities 
    Utilities.runtimeSerializationKryo = new ThreadLocal() { // Not typed to avoid compiler warning. 
     @Override 
     protected synchronized Kryo initialValue() { 
      Kryo kryo = new Kryo(); 
      // copy stuff from original implementation... 
      return kryo; 
     }; 
    }; 
} 
0

私は同じ問題を抱えています。

と私はそれを修正しました:-)

これはクラスパスの問題を縫っています。 あなたの余分なドライバ(エグゼキュータ)パスの最初の場所にspark libを入れてください

+0

これは質問に対する答えを提供しません。十分な[評判](https://stackoverflow.com/help/whats-reputation)があれば、[投稿にコメントする]ことができます(https://stackoverflow.com/help/privileges/comment)。代わりに、[質問者からの明確化を必要としない回答を提供する](https://meta.stackexchange.com/questions/214173/why-do-i-need-50-reputation-to-comment-what-c​​an- i-do-代わりに)。 - [レビューから](/レビュー/低品質の投稿/ 17646724) – clemens

+0

これは本当に質問に答えるものではありません。別の質問がある場合は、[質問する](https://stackoverflow.com/questions/ask)をクリックして質問することができます。十分な[評判](https://stackoverflow.com/help/)があれば、この問題にもっと注意を払うために[奨励金を追加](https://stackoverflow.com/help/privileges/set-bounties)することもできます何が評判か)。 - [レビューから](/レビュー/低品質の投稿/ 17646724) –

関連する問題