私は、ストリーミングがDStream内のRDDを使用して2つのDStreamを結合する方法を観察しようとしていますが、奇妙な結果が混乱しています。スパークストリーミング参加wierdの結果
私のコードでは、ソケットストリームからデータを収集し、いくつかのロジックで2つのPairedDStreamに分割しています。参加のためにいくつかのバッチを収集するために、最後の3つのバッチを収集するためのウィンドウを作成しました。ただし、結合の結果は無意味です。私の理解を助けてください。
object Join extends App {
val conf = new SparkConf().setMaster("local[4]").setAppName("KBN Streaming")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val BATCH_INTERVAL_SEC = 10
val ssc = new StreamingContext(sc, Seconds(BATCH_INTERVAL_SEC))
val lines = ssc.socketTextStream("localhost", 8091)
//println(s"lines.slideDuration : ${lines.slideDuration}")
//lines.print()
val ds = lines.map(x => x)
import scala.util.Random
val randNums = List(1, 2, 3, 4, 5, 6)
val less = ds.filter(x => x.length <= 2)
val lessPairs = less.map(x => (Random.nextInt(randNums.size), x))
lessPairs.print
val greater = ds.filter(x => x.length > 2)
val greaterPairs = greater.map(x => (Random.nextInt(randNums.size), x))
greaterPairs.print
val join = lessPairs.join(greaterPairs).window(Seconds(30), Seconds(30))
join.print
ssc.start
ssc.awaitTermination
}
試験結果:
---------------------------------- ---------時間:1473344240000 ms ------------------------------------ -------(1、b)(4、s)
----------------------------- --------------時刻:1473344240000 ms ------------------------------- ------------(5,333)
------------------------------- ------------ Ti私:1473344250000 ms -------------------------------------------(2 、x)
-------------------------------------------時間:1473344250000 ms -------------------------------------------(4 、)
-------------------------------------------時間:1473344260000 ms -------------------------------------------(2 、a)(0、b)
-------------------------------------- -----時間:1473344260000 ms ---------------------------------------- ---(2,10)(1,1)(3,2)
-------------------------------------------時刻:1473344260000 ms -------------------------------------------(4、(b、 2つの))