2017-12-18 18 views
0

私は耐障害性を含むSpark Streaming with Kafkaアプリケーションを実装しようとしています。アプリケーションを再起動すると、再起動前に既に読み込まれたメッセージが読み込まれ、計算が間違っています。この問題を解決するのを手伝ってください。Sparkストリーミングチェックポイントの読み込み後のエラー

ここはJavaで書かれたコードです。

public static JavaStreamingContext createContextFunc() { 

    SummaryOfTransactionsWithCheckpoints app = new SummaryOfTransactionsWithCheckpoints(); 

    ApplicationConf conf = new ApplicationConf(); 
    String checkpointDir = conf.getCheckpointDirectory(); 

    JavaStreamingContext streamingContext = app.getStreamingContext(checkpointDir); 

    JavaDStream<String> kafkaInputStream = app.getKafkaInputStream(streamingContext); 

    return streamingContext; 
} 


public static void main(String[] args) throws InterruptedException { 

    String checkpointDir = conf.getCheckpointDirectory(); 

    Function0<JavaStreamingContext> createContextFunc =() -> createContextFunc(); 
    JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate(checkpointDir, createContextFunc); 

    streamingContext.start(); 
    streamingContext.awaitTermination(); 

} 

public JavaStreamingContext getStreamingContext(String checkpointDir) { 

    ApplicationConf conf = new ApplicationConf(); 
    String appName = conf.getAppName(); 
    String master = conf.getMaster(); 
    int duration = conf.getDuration(); 

    SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master); 
    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true"); 

    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(duration)); 
    streamingContext.checkpoint(checkpointDir); 

    return streamingContext; 
} 

public SparkSession getSession() { 

    ApplicationConf conf = new ApplicationConf(); 
    String appName = conf.getAppName(); 
    String hiveConf = conf.getHiveConf(); 
    String thriftConf = conf.getThriftConf(); 
    int shufflePartitions = conf.getShuffle(); 

    SparkSession spark = SparkSession 
      .builder() 
      .appName(appName) 
      .config("spark.sql.warehouse.dir", hiveConf) 
      .config("hive.metastore.uris", thriftConf) 
      .enableHiveSupport() 
      .getOrCreate(); 

    spark.conf().set("spark.sql.shuffle.partitions", shufflePartitions); 
    return spark; 

} 


public JavaDStream<String> getKafkaInputStream(JavaStreamingContext streamingContext) { 

    KafkaConfig kafkaConfig = new KafkaConfig(); 
    Set<String> topicsSet = kafkaConfig.getTopicSet(); 
    Map<String, Object> kafkaParams = kafkaConfig.getKafkaParams(); 

    // Create direct kafka stream with brokers and topics 
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
      streamingContext, 
      LocationStrategies.PreferConsistent(), 
      ConsumerStrategies.Subscribe(topicsSet, kafkaParams)); 

    JavaDStream<String> logdata = messages.map(ConsumerRecord::value); 

    return logdata; 
} 

ここにgithubプロジェクトへのリンクがあります。 https://github.com/ThisaST/Spark-Fault-Tolerance

答えて

0

コードで次の設定を追加することで問題を解決しました。

sparkConf.set(“spark.streaming.stopGracefullyOnShutdown","true") 
関連する問題