2016-09-08 5 views
0

私は、ストリーミングがDStream内のRDDを使用して2つのDStreamを結合する方法を観察しようとしていますが、奇妙な結果が混乱しています。スパークストリーミング参加wierdの結果

私のコードでは、ソケットストリームからデータを収集し、いくつかのロジックで2つのPairedDStreamに分割しています。参加のためにいくつかのバッチを収集するために、最後の3つのバッチを収集するためのウィンドウを作成しました。ただし、結合の結果は無意味です。私の理解を助けてください。

object Join extends App { 

    val conf = new SparkConf().setMaster("local[4]").setAppName("KBN Streaming") 
    val sc = new SparkContext(conf) 
    sc.setLogLevel("ERROR") 

    val BATCH_INTERVAL_SEC = 10 

    val ssc = new StreamingContext(sc, Seconds(BATCH_INTERVAL_SEC)) 
    val lines = ssc.socketTextStream("localhost", 8091) 

    //println(s"lines.slideDuration : ${lines.slideDuration}") 

    //lines.print() 
    val ds = lines.map(x => x) 

    import scala.util.Random 
    val randNums = List(1, 2, 3, 4, 5, 6) 

    val less = ds.filter(x => x.length <= 2) 
    val lessPairs = less.map(x => (Random.nextInt(randNums.size), x)) 
    lessPairs.print 

    val greater = ds.filter(x => x.length > 2) 
    val greaterPairs = greater.map(x => (Random.nextInt(randNums.size), x)) 
    greaterPairs.print 

    val join = lessPairs.join(greaterPairs).window(Seconds(30), Seconds(30)) 
    join.print 

    ssc.start 
    ssc.awaitTermination 
} 

試験結果:

---------------------------------- ---------時間:1473344240000 ms ------------------------------------ -------(1、b)(4、s)

----------------------------- --------------時刻:1473344240000 ms ------------------------------- ------------(5,333)

------------------------------- ------------ Ti私:1473344250000 ms -------------------------------------------(2 、x)

-------------------------------------------時間:1473344250000 ms -------------------------------------------(4 、)

-------------------------------------------時間:1473344260000 ms -------------------------------------------(2 、a)(0、b)

-------------------------------------- -----時間:1473344260000 ms ---------------------------------------- ---(2,10)(1,1)(3,2)

-------------------------------------------時刻:1473344260000 ms -------------------------------------------(4、(b、 2つの))

答えて

0

結合が呼び出されると、2つのRDDが再計算されるため、印刷時に表示される値とは異なる値が設定されます。したがって、両方のRDDが初めて計算されたときにキャッシュする必要があります。したがって、両方のRDDをもう一度再計算する代わりに、後で結合が呼び出されたときに同じ値が使用されます。私は複数の例でこれを試して、それは正常に動作します。私はSparkの基本的なコアコンセプトを欠いていました。著書「スパーク学ぶ」から

0

抜粋:先に述べたようにRDDSスパーク、

永続性(キャッシュ)
は遅延評価され、そして時には我々は同じRDDを複数回使用することをお勧めします。私たちがこれを素朴に行うと、RDD上でアクションを呼び出すたびに、SparkはRDDとそのすべての依存関係を再計算します。

関連する問題