for (fordate <- 2 to 30) {
val dataRDD = sc.textFile("s3n://mypath" + fordate + "/*")
val a = 1
val c = fordate - 1
for (b <- a to c) {
val cumilativeRDD1 = sc.textFile("s3n://mypath/" + b + "/*")
val cumilativeRDD : org.apache.spark.rdd.RDD[String] = sc.union(cumilativeRDD1, cumilativeRDD)
if (b == c) {
val incrementalDEviceIDs = dataRDD.subtract(cumilativeRDD)
val countofIDs = incrementalDEviceIDs.distinct().count()
println(s"201611 $fordate $countofIDs")
}
}
}
私は毎日デバイスIDを取得するデータセットを持っています。私は一日あたりの増分数を把握する必要がありますが、私は自分自身にcumilativeRDD
に参加するときには、エラー、次のsaysthrows: Scala - それ自体にRDDを追加します。
は、私はこれをどのように克服することができます。forward reference extends over definition of value cumilativeRDD
。このプロセスの*意図*を記述できますか? – maasg
だから、私が正しく理解しているならば、*意図*は 'day [x]'を 'sum [1、x-1](day [i])'と比較することです。可変スコープの問題を解決するほかに、この実装はネットワークからのデータではn^2です。 'day 1' x' n 'times、 'day 2' x' n-1' times、...を読んでいます。私はあなたのプロセスを見直して、見たデータを集計し、1回のパスで比較することをお勧めします。 – maasg