2017-08-05 6 views
0

現在、1分あたりのリクエストで機械学習用のデータを豊かにしようとしています。データはKafkaのトピックに保存され、アプリケーションの開始時にトピックの内容全体が読み込まれて処理されるため、すべてのデータが同時に到着するので、自分の知る限りでスパークストリーミングのウィンドウ操作を使用することはできません。マッピング中のRDDのタイムスタンプからの要求数を計算する

私のアプローチは、次のことを試してみた:

val kMeansFeatureRdd = kMeansInformationRdd.map(x => { 

    val begin = x._2 //Long - unix timestamp millis 
    val duration = x._3 //Long 
    val rpm = kMeansInformationRdd.filter(y => (x._2 - 60000 <= y._2 && x._2 >= y._2)).count() 

    (duration, rpm) 

}) 

しかし、このアプローチに私は次の例外を取得:

org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: 
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. 
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758. 

は私がやりたいことを達成する方法はありますか?

これ以上の情報が必要な場合は、コメントを削除して、必要なものを更新します。

編集:

RDDのブロードキャストは機能しません。収集したアレイをブロードキャストしても、許容されるパフォーマンスにはなりません。

val collected = kMeansInformationRdd.collect() 


    val kMeansFeatureRdd = kMeansInformationRdd.map(x => { 
     val begin = x._2 //Long - unix timestamp millis 
     val duration = x._3 //Long 

     val rpm = collected.filter(y => (x._2 - 60000 <= y._2 && x._2 >= y._2)).size 

     (duration, rpm) 

    }) 

UPDATE: - しかし限り

このコードは、ジョブが途中速く成し遂げることが、少なくとも可能な実行が、本当にため、オプションではありません恐ろしく遅いされ、どのような

私はそれがまだ遅くなるのは、フィルタリングされた配列が成長するにつれ、1分あたりの要求が高くなるということです。誰かが問題を - または一般的に改善する可能性のあるパフォーマンスの問題 - 私が知っていれば私は幸せになると思う。

kMeansInformationRdd = kMeansInformationRdd.cache() 

    kMeansInformationRdd.sortBy(_._2, true) 

    var kMeansFeatureArray: Array[(String, Long, Long)] = Array() 
    var buffer: collection.mutable.Map[String, Array[Long]] = collection.mutable.Map() 
    var counter = 0 


    kMeansInformationRdd.collect.foreach(x => { 
     val ts = x._2 
     val identifier = x._1 //make sure the identifier represents actually the entity that receives the traffic -e.g. machine (IP?) not only endpoint 

     var bufferInstance = buffer.get(identifier).getOrElse(Array[Long]()) 

     bufferInstance = bufferInstance ++ Array(ts) 

     bufferInstance = bufferInstance.filter(p => p > ts-1000)   

     buffer.put(identifier, bufferInstance) 

     val rpm = bufferInstance.size.toLong 

     kMeansFeatureArray = kMeansFeatureArray ++ Array((identifier, x._3, rpm)) //identifier, duration, rpm 
     counter = counter +1 
     if(counter % 10000==0){ 
     println(counter) 
     println((identifier, x._3, rpm)) 
     println((instanceSizeBefore, instanceSizeAfter)) 
     } 

    }) 

    val kMeansFeatureRdd = sc.parallelize(kMeansFeatureArray) 
+0

正確に意図は何ですか?各要素について、開始から1分以内にすべての要素を見つけ、それらを数えますか?時間範囲は各イベントの具体的な開始に基づいていることが重要ですか?または我々はイベントの連続体の上にウィンドウを作成し、そこに数えることができますか? – maasg

答えて

0

EDITセクションで指定されているコードが正しくありません。変数がSparkでブロードキャストされる正しい方法ではありません。次のように正しい方法は次のとおりです。もちろん

val collected = sc.broadcast(kMeansInformationRdd.collect()) 


    val kMeansFeatureRdd = kMeansInformationRdd.map(x => { 
     val begin = x._2 //Long - unix timestamp millis 
     val duration = x._3 //Long 

     val rpm = collected.value.filter(y => (x._2 - 60000 <= y._2 && x._2 >= y._2)).size 

     (duration, rpm) 

    }) 

、あなたはsc.broadcastの代わりに、同様のグローバル変数を使用することができますが、それは推奨されません。どうして?

  1. 外部を使用する場合:

    理由で直接外部変数を使用しての違いは、と)(sc.broadcastを使用して変数を放送(私のいわゆる「グローバル変数」のような)であるということです変数を直接指定すると、sparkはシリアル化された変数のコピーを各タスクと一緒に送信します。 sc.broadcastによって、変数はEXECUTORごとに1つのコピーが送信されます。タスクの数は、通常、Executorの10倍です。したがって、変数(配列など)が十分に大きい場合(10〜20K以上)、前者の操作ではネットワーク変換に時間がかかり、頻繁なGCが発生し、スパークが遅くなります。したがって、大きな変数(> 10-20K)は明示的に放送されることが示唆されている。

  2. 外部変数を直接使用すると、変数は永続化されず、タスクで終了し、再利用できなくなります。 sc.broadcast()によって変数はエグゼキュータのメモリに自動永続化されていますが、明示的にそれをアンパサリするまで持続します。したがって、sc.broadcast変数はタスクとステージ間で利用できます。

変数が複数回使用されることが予想される場合は、sc.broadcast()が推奨されます。

+0

収集したアレイのブロードキャストを試みましたが、パフォーマンスの向上は見られませんでした。ジョブは永遠に実行され、終了しません。私は現在、比較的小さなテストセットしか実行していません。 – LST

+0

'collect'配列にはいくつの要素がありますか?現在 – himanshuIIITian

+0

150kです。私はアルゴリズムアプローチを改善する作業をしています。私は、データセット全体でfilter()がこの量のデータを処理する方法ではないと思います。 – LST

関連する問題