2016-11-08 7 views
2

私はSparkマスターサーバーに対してSpark SQLクエリを実行するSpring起動アプリケーションを作成しようとしています。ここでは、クエリを実行する必要がありますコードは次のとおりです。AvroでSpring起動アプリケーションからのSpark SQLを使用

public class SparkJob { 

    public void run() { 
     SparkConf conf = new SparkConf() 
      .setMaster("spark://10.20.30.50:7077") 
      .setAppName("show_avro_data"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     SQLContext sqlContext = new SQLContext(sc); 

     DataFrame df = sqlContext.read() 
      .format("com.databricks.spark.avro") 
      .load("hdfs://localhost:8020/test.avro"); 

     df.show(); 
    } 

} 

run()方法は、春MVCコントローラ内から呼び出されます。 Sparkクエリを「オンライン」に実行する必要があります。つまり、Sparkにアプリケーションをアップロードする必要はありません。​​である。私が使用し

2016-11-08 17:38:18.805 ERROR 43109 --- [nio-9003-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]: 
    Servlet.service() for servlet [dispatcherServlet] in context with path [] 
    threw exception [Request processing failed; nested exception is 
    org.apache.spark.SparkException: Job aborted due to stage failure: 
    Task 0 in stage 0.0 failed 4 times, most recent failure: 
    Lost task 0.3 in stage 0.0 (TID 3, 10.20.30.50): 
    java.lang.ClassNotFoundException: 
    com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$3 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:348) 
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362) 
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362) 
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362) 
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) 
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72) 
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

私はスパークバージョン1.5.2とScalaのバージョン2.10.4を使用していますので:

私は今、アプリケーションを起動し、メソッドを実行すると、私は、次のスタックトレースを取得します次のMavenの依存関係according to the spark-avro documentation: - これとすべてを行う方法を、私は知りません

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.10</artifactId> 
    <version>1.5.2</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-sql_2.10</artifactId> 
    <version>1.5.2</version> 
</dependency> 

<dependency> 
    <groupId>com.databricks</groupId> 
    <artifactId>spark-avro_2.10</artifactId> 
    <version>2.0.1</version> 
</dependency> 

私が推測するには、私は何とかスパークマスターにアブロライブラリを提供する必要があります私がインターネット上で見つけた例は、この問題の解決策を提示していませんでした。

編集

アプリケーションは、あなたの春のアプリケーションから火花を呼び出しているので、私は(あなたが​​を使用していない)

DataFrame df = sqlContext.read().json("hdfs://..."); 

答えて

1

とJSONの代わりに、アブロを使用するときにしようとしている、正常に動作しますMVCを「ドライバ」として使用し、スタンドアロンのスパークでスパークジョブを実行する

したがって、MVCをドライバとして使用し、clientモードで使用するとします。右?

従属性はあなたのMVC spring project(スパーク・ドライバclient側)にあるので、このライブラリを追加してスタンドアロン(ワーカー)を起動する必要があります。今火花がspark-avro_2.10.jarを認識している

JavaSparkContext sc = new JavaSparkContext(conf); 
sc.addJar("pathToAvroOnHDFS/spark-avro_2.10.jar") 

<dependency> 
    <groupId>com.databricks</groupId> 
    <artifactId>spark-avro_2.10</artifactId> 
    <version>2.0.1</version> 
</dependency> 

あなたはHDFSにaddJarspark-avro_2.10.jarなどSparkContextものにも追加することができます。

別の方法として、このjarをsparkクラスパスに追加することもできます。あなたは、次のプロパティ(これらもspark 1.5.2でサポートされなければならない)を確認することができます

spark.driver.extraClassPath /avroPath.jar 
spark.executor.extraClassPath /avroPath.jar 
+0

をおかげでたくさん - 私の質問に答えてくれ、実際に作品スパークどのようにさらなる洞察を与えた:) –

関連する問題