私は、ドメイン固有の言語用のカスタムインタープリタを開発しています。 Apache Zeppelinのドキュメント(https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.html)に示されている例に基づいて、インタプリタはうまく動作します。今私はいくつかの結果を新しいDataFrameに保存したいと思います。Zeppelin:カスタムインタプリタの中からDataFrameを作成するには?
DataFrames(http://spark.apache.org/docs/latest/sql-programming-guide.html)を作成するコードが見つかりましたが、カスタムインタープリタ内から有効なランタイムSparkContext(多くの場合「sc」と呼ばれる)にアクセスする方法が見つからないため、インタープリタでこれを使用できません。
私は(静的な)SparkContext.getOrCreate()を試しましたが、ClassNotFoundExceptionが発生しました。次に、zeppelin-spark-dependencies ... jarをインタープリタフォルダに追加しました。これはクラスロードの問題を解決しましたが、今はSparkExceptionを取得しています(「マスターURLを設定する必要があります...」)。
カスタムインタープリタの中からノートブックのSparkContextにどうやってアクセスできるか考えてみましょうか?どうもありがとう!以下Kangrokリーさんのコメントに
UPDATE
おかげで、次のように、私のコードは今になります。下記を参照してください。それは実行され、DataFrameを作成するようです(少なくともそれは任意の例外をスローしません)。だから、おそらくそこにある
%opl
1 2 3
> 1
> 2
> 3
%sql
select * from result
> Table not found: result; line 1 pos 14
:しかし、私は、後続のSQLパラグラフ(下記のような第1段落は「結果」のデータフレームを作成するべきである、私の「%OPL」インタプリタを使用しています)で作成したデータフレームを消費することはできませんSparkContextを扱う私のやり方にまだ何か間違っています。何か案は?どうもありがとう!
package opl;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OplInterpreter2 extends Interpreter {
static {
Interpreter.register("opl","opl",OplInterpreter2.class.getName(),
new InterpreterPropertyBuilder()
.add("spark.master", "local[4]", "spark.master")
.add("spark.app.name", "Opl Interpreter", "spark.app.name")
.add("spark.serializer", "org.apache.spark.serializer.KryoSerializer", "spark.serializer")
.build());
}
private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class);
private void log(Object o) {
if (logger != null)
logger.warn("OplInterpreter2 "+o);
}
public OplInterpreter2(Properties properties) {
super(properties);
log("CONSTRUCTOR");
}
@Override
public void open() {
log("open()");
}
@Override
public void cancel(InterpreterContext arg0) {
log("cancel()");
}
@Override
public void close() {
log("close()");
}
@Override
public List<String> completion(String arg0, int arg1) {
log("completion()");
return new ArrayList<String>();
}
@Override
public FormType getFormType() {
log("getFormType()");
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext arg0) {
log("getProgress()");
return 100;
}
@Override
public InterpreterResult interpret(String string, InterpreterContext context) {
log("interpret() "+string);
PrintStream oldSys = System.out;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
System.setOut(ps);
execute(string);
System.out.flush();
System.setOut(oldSys);
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
baos.toString());
} catch (Exception ex) {
System.out.flush();
System.setOut(oldSys);
return new InterpreterResult(
InterpreterResult.Code.ERROR,
InterpreterResult.Type.TEXT,
ex.toString());
}
}
private void execute(String code) throws Exception {
SparkContext sc = SparkContext.getOrCreate();
SQLContext sqlc = SQLContext.getOrCreate(sc);
StructType structType = new StructType().add("value",DataTypes.IntegerType);
ArrayList<Row> list = new ArrayList<Row>();
for (String s : code.trim().split("\\s+")) {
int value = Integer.parseInt(s);
System.out.println(value);
list.add(RowFactory.create(value));
}
DataFrame df = sqlc.createDataFrame(list,structType);
df.registerTempTable("result");
}
}