2017-07-26 19 views

答えて

1

私はあなたの他の重複した質問を見て始めました(それは重複しているとフラグされています)ので、私はこれに答えます。

はい、partition byコマンドを使用するクエリと同様に、ウィンドウ機能を使用できます。 How to find longest sequence of consecutive dates?

同様のアプローチを使用して、データフレーム変換を使用すると、同じことを達成できます。

from pyspark.sql import Window 
from pyspark.sql.functions import row_number 

lst=[[1,1],[2,1],[3,1],[4,1],[5,0],[6,0],[7,0],[8,1],[9,1],[10,1]] 
df=spark.createDataFrame(lst,['ID','Sensor']) 

#define the window specification 
w=Window.partitionBy(df['Sensor']).orderBy(df['ID']) 

group_df=df.select('*',(df['ID']-row_number().over(w)).alias('grp')).orderBy('ID') 
count_df=group_df.groupBy('grp').count() 

#get result by joining sequence counts df back to df containing original columns 
group_df\ 
    .join(count_df,count_df['grp']==group_df['grp'])\ 
    .select('ID','Sensor','count')\ 
    .filter('Sensor=1')\ 
    .orderBy('ID')\ 
    .show() 

は、所望の配列の長さが得られます。答えを

+---+------+-----+                
| ID|Sensor|count| 
+---+------+-----+ 
| 1|  1| 4| 
| 2|  1| 4| 
| 3|  1| 4| 
| 4|  1| 4| 
| 8|  1| 3| 
| 9|  1| 3| 
| 10|  1| 3| 
+---+------+-----+ 
+0

答え、あなたが共有したリンクをありがとうございました。 –

0

解決方法(ノードで収集)はRDD.mapPartitionsで実現できます。これにより、パーティション全体にマップされた関数を提供することができます。つまり、データのサブセット内の連続するサンプル全体を反復処理できます。パーティションを開始または終了するタイミングを1で識別し、パーティション間でシーケンスを結合する必要があります。それは少し醜いかもしれないが、可能でなければならない。

まだデータがない場合は、データをsortにする必要があります。

+0

感謝を。データを最初にソートする必要があるのはなぜですか? また、spark構造化ストリーミングを使用してこの作業を行うためのウィンドウ操作を行うことができると思いますか? –