現在、1分あたりのリクエストで機械学習用のデータを豊かにしようとしています。データはKafkaのトピックに保存され、アプリケーションの開始時にトピックの内容全体が読み込まれて処理されるため、すべてのデータが同時に到着するので、自分の知る限りでスパークストリーミングのウィンドウ操作を使用することはできません。マッピング中のRDDのタイムスタンプからの要求数を計算する
私のアプローチは、次のことを試してみた:
val kMeansFeatureRdd = kMeansInformationRdd.map(x => {
val begin = x._2 //Long - unix timestamp millis
val duration = x._3 //Long
val rpm = kMeansInformationRdd.filter(y => (x._2 - 60000 <= y._2 && x._2 >= y._2)).count()
(duration, rpm)
})
しかし、このアプローチに私は次の例外を取得:
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
は私がやりたいことを達成する方法はありますか?
これ以上の情報が必要な場合は、コメントを削除して、必要なものを更新します。
編集:
RDDのブロードキャストは機能しません。収集したアレイをブロードキャストしても、許容されるパフォーマンスにはなりません。
val collected = kMeansInformationRdd.collect()
val kMeansFeatureRdd = kMeansInformationRdd.map(x => {
val begin = x._2 //Long - unix timestamp millis
val duration = x._3 //Long
val rpm = collected.filter(y => (x._2 - 60000 <= y._2 && x._2 >= y._2)).size
(duration, rpm)
})
UPDATE: - しかし限り
このコードは、ジョブが途中速く成し遂げることが、少なくとも可能な実行が、本当にため、オプションではありません恐ろしく遅いされ、どのような
私はそれがまだ遅くなるのは、フィルタリングされた配列が成長するにつれ、1分あたりの要求が高くなるということです。誰かが問題を - または一般的に改善する可能性のあるパフォーマンスの問題 - 私が知っていれば私は幸せになると思う。
kMeansInformationRdd = kMeansInformationRdd.cache()
kMeansInformationRdd.sortBy(_._2, true)
var kMeansFeatureArray: Array[(String, Long, Long)] = Array()
var buffer: collection.mutable.Map[String, Array[Long]] = collection.mutable.Map()
var counter = 0
kMeansInformationRdd.collect.foreach(x => {
val ts = x._2
val identifier = x._1 //make sure the identifier represents actually the entity that receives the traffic -e.g. machine (IP?) not only endpoint
var bufferInstance = buffer.get(identifier).getOrElse(Array[Long]())
bufferInstance = bufferInstance ++ Array(ts)
bufferInstance = bufferInstance.filter(p => p > ts-1000)
buffer.put(identifier, bufferInstance)
val rpm = bufferInstance.size.toLong
kMeansFeatureArray = kMeansFeatureArray ++ Array((identifier, x._3, rpm)) //identifier, duration, rpm
counter = counter +1
if(counter % 10000==0){
println(counter)
println((identifier, x._3, rpm))
println((instanceSizeBefore, instanceSizeAfter))
}
})
val kMeansFeatureRdd = sc.parallelize(kMeansFeatureArray)
正確に意図は何ですか?各要素について、開始から1分以内にすべての要素を見つけ、それらを数えますか?時間範囲は各イベントの具体的な開始に基づいていることが重要ですか?または我々はイベントの連続体の上にウィンドウを作成し、そこに数えることができますか? – maasg