2016-06-30 4 views
0

Perl言語でevalを呼び出すのと同じように、SparkのStringとして与えられたコードをdinamically解釈しようとしています。しかし、プログラムを実行するときに問題が発生しました。あなたの助けを本当に感謝します。Perlでevalを呼び出すのと同じように、Sparkでscala.tools.nsc.interpreter.IMainを使用する方法

要件:

要件はスパーク処理チェーンを構成可能にすることです。例えば、顧客は以下のように構成ファイル内の処理ステップを設定することができる。手順:

1) textFile("files///<file_full_path>") 
2) flatMap(line => line.split(" ")) 
3) map(word => (word, 1)) 
4) reduceByKey(_ + _) 
5) foreach(println) 

上記のすべてのステップは、構成ファイルで定義されています。 そして、スパーク・ドライバのような、文字列として設定ファイルをロードし、処理ステップを行います:

val processFlow = 
""" 
sc.textFile("file:///input.txt").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println) 
""" 

そして、スパークは、上記可変processFlowで定義されたコードの一部を実行します。これは、ワードカウントのサンプルからである

が、私は単なる文字列としてインタプリタによって呼び出さRDD法を作る:

は、ここに私のスパークのソースコードです。

​​

問題:スパークコード(マスタ=ローカル)上に実行時

しかし、私はエラーを持って、以下のように記録します。 Apacheのツェッペリンの実装でチェック

sc: org.apache.spark.SparkContext = [email protected] 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassNotFoundException: $anonfun$1 
     at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
     at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
     at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
     at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 
     at java.lang.Class.forName0(Native Method) 
     at java.lang.Class.forName(Class.java:270) 
     at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) 
     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) 
     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) 
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) 
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 
     at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) 
     at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940) 
     at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
     at org.apache.spark.rdd.RDD.collect(RDD.scala:926) 
     at .<init>(<console>:12) 
     at .<clinit>(<console>) 
     at .<init>(<console>:7) 
     at .<clinit>(<console>) 
     at $print(<console>) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) 
     at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) 
     at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) 
     at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) 
     at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) 
     at com.tr.ecp.test.TestMain.exec(TestMain.scala:44) 
     at com.tr.ecp.test.TestMain$.main(TestMain.scala:57) 
     at com.tr.ecp.test.TestMain.main(TestMain.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.ClassNotFoundException: $anonfun$1 
     at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
     at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
     at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
     at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 
     at java.lang.Class.forName0(Native Method) 
     at java.lang.Class.forName(Class.java:270) 
     at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) 
     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) 
     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) 
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) 
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 
     at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) 
     at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 

resultFlag = Error, returnObj = None 
+0

Apache Zeppelinはこれを抽出し、Webページにコードを記述して実行します。多分あなたはツェッペリンを見て、それがどのように問題を解決しているか見ることができます。 –

+0

こんにちはロッキー、本当にあなたの情報に感謝します。私はApache Zeppelinを見ます。 –

+0

こんにちは、私は同様の要件を実装する必要があります。あなたはそれを解決しましたか?あなたが使った「scala」と「Spark」のバージョンが分かっているかもしれません。同じことをしようとすると依存関係を解決できないからです。 – NehaM

答えて

0

。ここでは、入力を解釈するために使用されたcode snippetがあります。

基本的にorg.apache.spark.repl.SparkILoopを使用していますが、いくつかのコードでは依存関係があります。

public InterpreterResult interpretInput(String[] lines, InterpreterContext context) { 
    SparkEnv.set(env); 

    // add print("") to make sure not finishing with comment 
    // see https://github.com/NFLabs/zeppelin/issues/151 
    String[] linesToRun = new String[lines.length + 1]; 
    for (int i = 0; i < lines.length; i++) { 
    linesToRun[i] = lines[i]; 
    } 
    linesToRun[lines.length] = "print(\"\")"; 

    Console.setOut(context.out); 
    out.setInterpreterOutput(context.out); 
    context.out.clear(); 
    Code r = null; 
    String incomplete = ""; 
    boolean inComment = false; 

    for (int l = 0; l < linesToRun.length; l++) { 
    String s = linesToRun[l]; 
    // check if next line starts with "." (but not ".." or "./") it is treated as an invocation 
    if (l + 1 < linesToRun.length) { 
     String nextLine = linesToRun[l + 1].trim(); 
     boolean continuation = false; 
     if (nextLine.isEmpty() 
     || nextLine.startsWith("//")   // skip empty line or comment 
     || nextLine.startsWith("}") 
     || nextLine.startsWith("object")) { // include "} object" for Scala companion object 
     continuation = true; 
     } else if (!inComment && nextLine.startsWith("/*")) { 
     inComment = true; 
     continuation = true; 
     } else if (inComment && nextLine.lastIndexOf("*/") >= 0) { 
     inComment = false; 
     continuation = true; 
     } else if (nextLine.length() > 1 
       && nextLine.charAt(0) == '.' 
       && nextLine.charAt(1) != '.'  // ".." 
       && nextLine.charAt(1) != '/') { // "./" 
     continuation = true; 
     } else if (inComment) { 
     continuation = true; 
     } 
     if (continuation) { 
     incomplete += s + "\n"; 
     continue; 
     } 
    } 

    scala.tools.nsc.interpreter.Results.Result res = null; 
    try { 
     res = intp.interpret(incomplete + s); 
    } catch (Exception e) { 
     sc.clearJobGroup(); 
     out.setInterpreterOutput(null); 
     logger.info("Interpreter exception", e); 
     return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); 
    } 

    r = getResultCode(res); 

    if (r == Code.ERROR) { 
     sc.clearJobGroup(); 
     out.setInterpreterOutput(null); 
     return new InterpreterResult(r, ""); 
    } else if (r == Code.INCOMPLETE) { 
     incomplete += s + "\n"; 
    } else { 
     incomplete = ""; 
    } 
    } 

    if (r == Code.INCOMPLETE) { 
    sc.clearJobGroup(); 
    out.setInterpreterOutput(null); 
    return new InterpreterResult(r, "Incomplete expression"); 
    } else { 
    sc.clearJobGroup(); 
    out.setInterpreterOutput(null); 
    return new InterpreterResult(Code.SUCCESS); 
    } 
} 
+0

ええ、それを得ました。あなたが投稿したソースコードを見ると、 "res = intp.interpret(incomplete + s);"というコードを実行します。 –

関連する問題