2016-03-31 6 views
2

私のアプリケーションでチェックポイントを使用していて、アプリケーションが失敗で起動すると、SQLContextNullPointerExceptionが表示されます。
シリアライゼーション/デシリアライズの問題により、アプリケーションがSQLContextを回復できないと仮定します。 SQLContextはシリアル化できませんか?ここでチェックポイントSqlContext nullpointerExceptionの問題

は、Spark SQLが内部キュータ側にSQLContextを使用する必要があるため

//DriverClass 
    final JavaSparkContext javaSparkCtx = new JavaSparkContext(conf); 
    final SQLContext sqlContext = new SQLContext(javaSparkCtx); 

    JavaStreamingContextFactory javaStreamingContextFactory = new JavaStreamingContextFactory() { 
     @Override 
     public JavaStreamingContext create() { //only first time executed 
      // TODO Auto-generated method stub 

      JavaStreamingContext jssc = new JavaStreamingContext(javaSparkCtx, Durations.minutes(1)); 
      jssc.checkpoint(CHECKPOINT_DIRECTORY); 

      HashMap < String, String > kafkaParams = new HashMap < String, String >(); 
      kafkaParams.put("metadata.broker.list", 
          "abc.xyz.localdomain:6667"); 
      //.... 
      JavaDStream <String> fullMsg = messages 
              .map(new MapFunction()); 

      fullMsg.foreachRDD(new SomeClass(sqlContext)); 
      return jssc; 
     } 
    }; 
} 

//Closure Class 
public class SomeClass implements Serializable, Function < JavaRDD <String> , Void > { 
    SQLContext sqlContext; 
    public SomeClass(SQLContext sqlContext) { 
     // TODO Auto-generated constructor stub 
     this.sqlContext = sqlContext; 
    } 
    public void doSomething() { 
     this.sqlContext.createDataFrame();**// here is the nullpointerException** 
    } 
    //....... 
} 

答えて

4

SQLContext以下の私のコードは直列化可能です。ただし、Streamingチェックポイントにシリアル化しないでください。代わりに、あなたはより多くの詳細については、このSQLContext sqlContext = SQLContext.getOrCreate(rdd.context());

参照ストリーミングDocsのようなRDDからそれを取得する必要があります:http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html#dataframe-and-sql-operations

+0

素晴らしい、おかげでzsxwing。それは魅力のように働いた。 – Bill

+1

SQLContextではなくHiveContextを使用している場合はどうなりますか? – Dazzler

+0

既にHiveContextを作成している場合、SQLContext.getOrCreateはそれを返します。 – zsxwing