2016-05-08 7 views
2

私は、ドメイン固有の言語用のカスタムインタープリタを開発しています。 Apache Zeppelinのドキュメント(https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.html)に示されている例に基づいて、インタプリタはうまく動作します。今私はいくつかの結果を新しいDataFrameに保存したいと思います。Zeppelin:カスタムインタプリタの中からDataFrameを作成するには?

DataFrames(http://spark.apache.org/docs/latest/sql-programming-guide.html)を作成するコードが見つかりましたが、カスタムインタープリタ内から有効なランタイムSparkContext(多くの場合「sc」と呼ばれる)にアクセスする方法が見つからないため、インタープリタでこれを使用できません。

私は(静的な)SparkContext.getOrCreate()を試しましたが、ClassNotFoundExceptionが発生しました。次に、zeppelin-spark-dependencies ... jarをインタープリタフォルダに追加しました。これはクラスロードの問題を解決しましたが、今はSparkExceptionを取得しています(「マスターURLを設定する必要があります...」)。

カスタムインタープリタの中からノートブックのSparkContextにどうやってアクセスできるか考えてみましょうか?どうもありがとう!以下Kangrokリーさんのコメントに

UPDATE

おかげで、次のように、私のコードは今になります。下記を参照してください。それは実行され、DataFrameを作成するようです(少なくともそれは任意の例外をスローしません)。だから、おそらくそこにある

%opl 
1 2 3 
> 1 
> 2 
> 3 

%sql 
select * from result 
> Table not found: result; line 1 pos 14 

:しかし、私は、後続のSQLパラグラフ(下記のような第1段落は「結果」のデータフレームを作成するべきである、私の「%OPL」インタプリタを使用しています)で作成したデータフレームを消費することはできませんSparkContextを扱う私のやり方にまだ何か間違っています。何か案は?どうもありがとう!

package opl; 

import java.io.ByteArrayOutputStream; 
import java.io.PrintStream; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.Properties; 

import org.apache.spark.SparkContext; 
import org.apache.spark.sql.DataFrame; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.RowFactory; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.StructType; 
import org.apache.zeppelin.interpreter.Interpreter; 
import org.apache.zeppelin.interpreter.InterpreterContext; 
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; 
import org.apache.zeppelin.interpreter.InterpreterResult; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class OplInterpreter2 extends Interpreter { 

static { 
    Interpreter.register("opl","opl",OplInterpreter2.class.getName(), 
     new InterpreterPropertyBuilder() 
     .add("spark.master", "local[4]", "spark.master") 
     .add("spark.app.name", "Opl Interpreter", "spark.app.name") 
     .add("spark.serializer", "org.apache.spark.serializer.KryoSerializer", "spark.serializer") 
     .build()); 
} 

private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class); 

private void log(Object o) { 
    if (logger != null) 
     logger.warn("OplInterpreter2 "+o); 
} 

public OplInterpreter2(Properties properties) { 
    super(properties); 
    log("CONSTRUCTOR"); 
} 

@Override 
public void open() { 
    log("open()"); 
} 

@Override 
public void cancel(InterpreterContext arg0) { 
    log("cancel()"); 
} 

@Override 
public void close() { 
    log("close()"); 
} 

@Override 
public List<String> completion(String arg0, int arg1) { 
    log("completion()"); 
    return new ArrayList<String>(); 
} 

@Override 
public FormType getFormType() { 
    log("getFormType()"); 
    return FormType.SIMPLE; 
} 

@Override 
public int getProgress(InterpreterContext arg0) { 
    log("getProgress()"); 
    return 100; 
} 

@Override 
public InterpreterResult interpret(String string, InterpreterContext context) { 
    log("interpret() "+string); 
    PrintStream oldSys = System.out; 
    try { 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     PrintStream ps = new PrintStream(baos); 
     System.setOut(ps); 
     execute(string); 
     System.out.flush(); 
     System.setOut(oldSys); 
     return new InterpreterResult(
       InterpreterResult.Code.SUCCESS, 
       InterpreterResult.Type.TEXT, 
       baos.toString()); 
    } catch (Exception ex) { 
     System.out.flush(); 
     System.setOut(oldSys); 
     return new InterpreterResult(
       InterpreterResult.Code.ERROR, 
       InterpreterResult.Type.TEXT, 
       ex.toString()); 
    } 
} 

private void execute(String code) throws Exception { 
    SparkContext sc = SparkContext.getOrCreate(); 
    SQLContext sqlc = SQLContext.getOrCreate(sc); 
    StructType structType = new StructType().add("value",DataTypes.IntegerType); 
    ArrayList<Row> list = new ArrayList<Row>(); 
    for (String s : code.trim().split("\\s+")) { 
     int value = Integer.parseInt(s); 
     System.out.println(value); 
     list.add(RowFactory.create(value)); 
    } 
    DataFrame df = sqlc.createDataFrame(list,structType); 
    df.registerTempTable("result"); 
} 
} 

答えて

0

次のようなスパーククラスターを設定する必要があると思います。 SparkContext.getOrCreateを(使用

spark.master = "local[4]"

spark.app.name = "My Spark App"

spark.serializer = "org.apache.spark.serializer.KryoSerializer"

)私にはよさそうです。私は、これは非常に素晴らしいものだとは思わないが、

おかげで、 Kangrokリー

0

は最後に、私は解決策を見つけました。以下のコードでは、org.apache.zeppelin.spark.PySparkInterpreter.javaにあるgetSparkInterpreter()関数を使用しています。

これは、独自のインタープリタフォルダではなく、私が推奨する方法(https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.htmlによる)ではなく、Sparkインタプリタフォルダにパッケージ化されたコード(jar)を入れる必要があります。また、私の通訳はZeppelinのインタプリタ設定ページに、それ自身の通訳としては表示されません。それにもかかわらず、それはツェッペリンの段落で使用することができます。

And:コードでは、私はDataFrameを作成することができます。これは私の段落の外でも消耗しています。これは達成したいことです。

package opl; 

import java.io.ByteArrayOutputStream; 
import java.io.PrintStream; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.Properties; 

import org.apache.spark.sql.DataFrame; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.RowFactory; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.StructType; 
import org.apache.zeppelin.interpreter.Interpreter; 
import org.apache.zeppelin.interpreter.InterpreterContext; 
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; 
import org.apache.zeppelin.interpreter.InterpreterResult; 
import org.apache.zeppelin.interpreter.LazyOpenInterpreter; 
import org.apache.zeppelin.interpreter.WrappedInterpreter; 
import org.apache.zeppelin.spark.SparkInterpreter; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class OplInterpreter2 extends Interpreter { 

    static { 
     Interpreter.register(
       "opl", 
       "spark",//"opl", 
       OplInterpreter2.class.getName(), 
       new InterpreterPropertyBuilder() 
        .add("sth", "defaultSth", "some thing") 
        .build()); 
    } 

    private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class); 

    private void log(Object o) { 
     if (logger != null) 
      logger.warn("OplInterpreter2 "+o); 
    } 

    public OplInterpreter2(Properties properties) { 
     super(properties); 
     log("CONSTRUCTOR"); 
    } 

    @Override 
    public void open() { 
     log("open()"); 
    } 

    @Override 
    public void cancel(InterpreterContext arg0) { 
     log("cancel()"); 
    } 

    @Override 
    public void close() { 
     log("close()"); 
    } 

    @Override 
    public List<String> completion(String arg0, int arg1) { 
     log("completion()"); 
     return new ArrayList<String>(); 
    } 

    @Override 
    public FormType getFormType() { 
     log("getFormType()"); 
     return FormType.SIMPLE; 
    } 

    @Override 
    public int getProgress(InterpreterContext arg0) { 
     log("getProgress()"); 
     return 100; 
    } 

    @Override 
    public InterpreterResult interpret(String string, InterpreterContext context) { 
     log("interpret() "+string); 
     PrintStream oldSys = System.out; 
     try { 
      ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
      PrintStream ps = new PrintStream(baos); 
      System.setOut(ps); 
      execute(string); 
      System.out.flush(); 
      System.setOut(oldSys); 
      return new InterpreterResult(
        InterpreterResult.Code.SUCCESS, 
        InterpreterResult.Type.TEXT, 
        baos.toString()); 
     } catch (Exception ex) { 
      System.out.flush(); 
      System.setOut(oldSys); 
      return new InterpreterResult(
        InterpreterResult.Code.ERROR, 
        InterpreterResult.Type.TEXT, 
        ex.toString()); 
     } 
    } 

    private void execute(String code) throws Exception { 
     SparkInterpreter sintp = getSparkInterpreter(); 
     SQLContext sqlc = sintp.getSQLContext(); 
     StructType structType = new StructType().add("value",DataTypes.IntegerType); 
     ArrayList<Row> list = new ArrayList<Row>(); 
     for (String s : code.trim().split("\\s+")) { 
      int value = Integer.parseInt(s); 
      System.out.println(value); 
      list.add(RowFactory.create(value)); 
     } 
     DataFrame df = sqlc.createDataFrame(list,structType); 
     df.registerTempTable("result"); 
    } 

    private SparkInterpreter getSparkInterpreter() { 
     LazyOpenInterpreter lazy = null; 
     SparkInterpreter spark = null; 
     Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); 
     while (p instanceof WrappedInterpreter) { 
      if (p instanceof LazyOpenInterpreter) { 
       lazy = (LazyOpenInterpreter) p; 
      } 
      p = ((WrappedInterpreter) p).getInnerInterpreter(); 
     } 
     spark = (SparkInterpreter) p; 
     if (lazy != null) { 
      lazy.open(); 
     } 
     return spark; 
    } 
} 
関連する問題