2017-03-06 10 views
0

2つのDstreamを実行しようとしています。最初の1つでdfをtmpビューとして登録し、別のDstreamで次のように使用してください。spark streaming - あるストリームでtmpビューを作成し、別のストリームで使用する

dstream1.foreachRDD { rdd => 
    import org.apache.spark.sql._ 
    val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate 
    import spark.implicits._ 
    import spark.sql 

    val records = rdd.toDF("record") 
    records.createOrReplaceTempView("records") 
} 
dstream2.foreachRDD { rdd => 
    import org.apache.spark.sql._ 
    val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate 
    import spark.implicits._ 
    import spark.sql 

    val records2 = rdd.toDF("record2") 
    val oldRecord = spark.table("records") 
    records2.join(oldRecod).write.json(...) 
} 
streamingContext.remember(Seconds(60)) 
    streamingContext.start() 
    streamingContext.awaitTermination() 

私はつねに何か正しいことをしていないので、明らかに私はを受け取ります。

これを行う方法はありますか?

ありがとうございます!

答えて

0

実際には、 という問題がありました。ローカルでテストしたときに、計算のために余分なコアを残して、ストリームからデータを取り出す必要があるという問題がありました。

私はmaster = local [2]を使用しました。したがって、各コアは各ストリームを処理するために使用され、残りは何も実行されません。 一度私はmaster = local [4]に変更しました。それは正常に機能しました

関連する問題