2017-05-10 7 views
2

Apache Spark Streamingを使用して、1週間のウィンドウサイズでスライド平均を計算したいと考えています。結果は、タイムリーに、すなわち分針待ち時間で配信されるべきである。ストリーミングデータの1週間分の保存は私の場合は実用的ではありませんが、正確な結果を計算する必要があります(私が間違っていれば修正してください)。Apache Spark Streamingでの長期的なスライド平均の概算

このように、私はある種の近似を目指しています。私の考えは、1時間前の平均結果のストリームを生成するために1時間の回転ウィンドウを使用することでした。s_1その後、1週間のスライド平均をs_1で使用して、1週間前の平均結果を計算しますs_2s_1s_2以外にも、1時間以上の平均値のスライドを含む別のストリームs_3があります。ストリームs_2s_3に参加し、参加タプル(t_2, t_3)には(t_2 + t_3)/2を出します。すべての平均タプルに、含めるタプルの最小タイムスタンプと最大タイムスタンプを添付します。これらのタイムスタンプを使用して、s_2とs_3のタプルが重複しないようにします。たとえば、次のように

s_2 tumbling window size 2 (tuples) 
s_3 sliding window size 2, interval 1 (tuples) 
stream 3 4 9 8 7  

time s_2 2_3 out 
1  -  3  3 
2  -  3.5 3.5 
3  3.5 6.5 3.5 the s_3 tuple 6.5 is ignored because min_timestamp(6.5) <= max_timestamp(3.5) 
4  3.5 8.5  6 (compute (3.5 + 8.5)/2 
5  6  7.5  6 the s_3 tuple 7.5 is ignored because min_timestamp(7.5) <= max_timestamp(6) 

私は、これは参加S_2とS_3が違いによるスライド間隔にスパークでは許可されていないので、私はスパークでそれを行う方法を見つけ出すことができませんでしたApacheの嵐で動作するように取得することができましたが。

質問1:これをSpark Streamingでどのように実装できますか?

質問2:ストリーム処理システム内で効率的に長時間スライディング平均を計算する方法はありますか?

答えて

2

かなり正確にストリーム上で近似解を生成するための戦略がいくつかあります。私たちが使用する戦略の1つは、火花の無限の流れでstratified samplingです。 Apache Sparkへの層別サンプリングをSnappyDataというオープンソースプロジェクトに導入しました。まあ、スパークで行方不明のいくつかのものの中に。 SnappyDataは、ストリーム上で一様なランダムサンプルを維持できますが、開発者がストリームで重要な列/ディメンションを選択できるようにすることで、高い精度を保証します。例えば、あなたの例では、毎分または毎時間に十分なサンプルがキャプチャされるようにします。サンプルはSpark Dataframe/Columnテーブルとして表示され、照会可能です。 avg/sum/count/etcなどの集約クエリが実行されると、リソースの一部と時間を使用して一連のアルゴリズムを使用して回答が計算されます。

これは、Snappydataでどのように見えるかを示す擬似コードです。

Create sample table MyInfiniteStream on <Stream> options (qcs 'min(timestamp), fraction '0.01') 
    // Of course, you can use the Dataframe api to do this instead of SQL too. 
    // your DStream <Stream> is registered with SnappyData 
    // min(timestamp) tells which columns to use for stratification 
    // fraction indicates what percentage of the input data to retain in the sample. 

次に、エラー制約の有無にかかわらずSpark SQLクエリを直接実行することができます。良いことは、あなたの時間間隔がきめ細かくか粗いことです。

select avg(myMeasureColumn), dimension d from MyInfiniteStream group by d with Error 0.1 
// this would ensure the result is always at least 90% accurate. 
select avg(myMeasureColumn), dimension d from MyInfiniteStream where timestamp >x and timestamp < y group by d with Error 0.1 

あなたはより良いアイデアを得ることができますhere。 SnappyDataはSparkと完全に互換性があります。

また、オンラインサンプリング(ストリームで直接)や、信頼区間のエラーを計算するための組み込みアルゴリズムはありませんが、Sparkで直接実装することもできます。データセットの 'サンプル'メソッドをチェックアウトします。

+0

上記に加えて、(現在のセットがフラッシュされた)サンプリングのウィンドウは、「サンプルテーブルを作成する」のtimeIntervalオプションで制御することができます。オプション(qcs '...'、小数点 '0.01'、timeInterval '120s')。timeIntervalsにはウォールクロックの代わりに使用できるタイムスタンプ列も明示的にサポートされています。これはタイムスタンプまたは日付タイプでなければならない "サンプルテーブルを作成する" "timeSeriesColumn ''"オプションで設定できます。次に、 "qcs"をグループ化/フィルタリングしてより正確な精度を得るための列のセットにすることができます。 – Sumedh