2017-03-08 2 views
-1

私はSparkの世界にはかなり新しく、この小さな問題については助けが必要です。Apache SparkのカスタムクラスのJava RDDにデータをロードする際にクラスが見つかりません

私は同じマシン上に1つのマスターと1つのスレーブで動作するローカルスパークを持っています。

Spring BootGradleを使用して、ジョブをSparkインスタンスに送信するJavaアプリケーションを作成しています。

私はJavaSparkContextを取得するサービスクラスを持っている:私は春データJPAからこの方法で、transactionRecordRepositoryを実行すると

public void loadTransactions(JavaSparkContext context) { 
    try { 
     List<TransactionRecord> transactionRecordList = new ArrayList<>(); 
     Iterable<TransactionRecord> all = trxRecordRepository.findAll(); 
     all.forEach(trx -> transactionRecordList.add(trx)); 
     System.out.println("Trx array list ready: "+ transactionRecordList.size()); 
     JavaRDD<TransactionRecord> trxRecordRDD = context.parallelize(transactionRecordList, 4); 
     System.out.println(trxRecordRDD.count()); 
     System.out.println("data frame loaded"); 
    }catch (Exception e) { 
     logger.error("Error while loading transactions", e.getCause()); 
    }finally { 
     context.close(); 
    } 
} 

、首尾リストに移入されます。 スパークジョブが実行を開始するが、その後、次のエラーで失敗します。私は、テキストファイルからの単純なデータをロードする場合

2017-03-08 10:28:44.888 WARN 9021 --- [result-getter-2] o.apache.spark.scheduler.TaskSetManager : Lost task 1.0 in stage 0.0 (TID 1, 10.20.12.216, executor 0): java.io.IOException: java.lang.ClassNotFoundException: learning.spark.models.TransactionRecord 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) 
    at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) 
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) 
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:258) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassNotFoundException: learning.spark.models.TransactionRecord 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:348) 
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1919) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) 
    at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) 
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) 
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) 
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) 
    ... 20 more 

すべてが正常に動作します。

JavaRDD<String> movieData = context.textFile("/Users/panshul/Development/sparkDataDump/ratings.csv", 4); 
      count = movieData.count(); 

マイGradleのビルドファイル:

buildscript { 
    ext { 
     springBootVersion = '1.5.2.RELEASE' 
    } 
    repositories { 
     mavenCentral() 
    } 
    dependencies { 
     classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") 
    } 
} 

apply plugin: 'java' 
apply plugin: 'eclipse' 
apply plugin: 'idea' 
apply plugin: 'org.springframework.boot' 

jar { 
    baseName = 'spark-example' 
    version = '0.0.1-SNAPSHOT' 
} 

sourceCompatibility = 1.8 
targetCompatibility = 1.8 

repositories { 
    mavenCentral() 
    mavenLocal() 
} 


dependencies { 
    compile('org.springframework.boot:spring-boot-starter-web') { 
     exclude module: "spring-boot-starter-tomcat" 
    } 
    compile("org.springframework.boot:spring-boot-starter-jetty") 
    compile("org.springframework.boot:spring-boot-starter-actuator") 
    compile("org.springframework.boot:spring-boot-starter-data-jpa") 
    compile("mysql:mysql-connector-java:6.0.5") 
    compile("org.codehaus.janino:janino:3.0.6") 
    compile("org.apache.spark:spark-core_2.11:2.1.0") 
      { 
       exclude group: "org.slf4j", module: "slf4j-log4j12" 
      } 
    compile("org.apache.spark:spark-sql_2.11:2.1.0") 
      { 
       exclude group: "org.slf4j", module: "slf4j-log4j12" 
      } 
    testCompile("org.springframework.boot:spring-boot-starter-test") 
    testCompile("junit:junit") 
} 

私は私がここで間違ってやっているかを把握助けてください。

Spark version 2.1.0 をSparkウェブサイトからダウンロードしてインストールします。 Mac OS X Sierraで動作します。

+0

あなたの 'build.gradle'ファイルを共有できますか? – semsorock

答えて

0

私が使っていたすべてのカスタム・クラスのjarファイルを作成して、私のApache-のjarsフォルダに入れていましたスパークのインストール。

これはスパークマスターが私のカスタムRDDタイプクラスを発見し、ワーカーに伝播しました。

1

あなたの問題は、あなたのクラスlearning.spark.models.TransactionRecordは、ジョブをサブミットするときにclass-pathに含まれていないと思います。

spark-submit -jarsパラメータにすべての依存jarを指定するか、すべての依存関係を持つ大きなjarを1つ作成する必要があります。

は、私が最も簡単な方法は、ちょうどこのように複数のjarファイルを提出していると思う:

$SPARK_HOME/bin/spark-submit --name yourApp --class yourMain.class --master yourMaster --jars dependencyA.jar, dependencyB.jar, job.jar 
+0

あなたのGradleを共有すると助けになる –

+0

私は、ジョブをSparkに手作業で提出するつもりはありません。 SparkInstanceを作成し、そのメソッドを呼び出して自動的にジョブをSparkインスタンスに送信するWebサービスを実行しています。 – Panshul

+0

でも、あなたの問題の原因は同じですが、learning.spark.models.TransactionRecordクラスが依存関係から抜けていると思います(別のプロジェクトにあり、あなたのメインスパークジョブアプリです)。スパークインスタンス –

関連する問題