2016-07-12 18 views
0

私はpysparkを使ってSparkで機械学習アルゴリズムのデータを準備しようとしています。私は、時代、ID、クラス(0または1)のエポックを含むRDDを持っています。次のようになります。PysparkのRDDからの配列の抽出

rdd = sc.parallelize([Row(my_date=1465488788,id=4,my_class=1), Row(my_date=1465488790,id=5,my_class=0), Row(my_date=1465488801,id=23,my_class=1), Row(my_date=1465488805,id=23,my_class=1), Row(my_date=1465488809,id=5,my_class=0), Row(my_date=1465488810,id=32,my_class=0),Row(my_date=1465488826,id=38,my_class=1)]) 

このデータは日付順に並べられています。私がしたいのは、このデータからシーケンスを抽出することです。このRDDの同じ時間ウィンドウ内の連続したエントリは、同じシーケンスに入る必要があります。例えば、20秒の時間間隔内のすべてのデータは、同じ順序でなければならない。シーケンスの時間ウィンドウは、上記のデータ・セットの20秒であるならば、私は3つの配列に続く作成する必要があります。

[Row(my_seq=[4,5,23,23], my_class=1),Row(my_seq=[5,23,23,5,32], my_class=0),Row(my_seq=[5,32,38], my_class=1)] 

クラス番号は、シーケンス内の最新の要素のクラスでなければなりません。私のデータセットはかなり大きくなるので、私はこれを並行して行う必要があります。私は、日付フィールドにデータをグループ化することによって、これを実行しようとしましたが、それはうまくいきませんでした:

def createSequences(in_arr): 
    in_arr = list(in_arr) 
    seq_arr = [] 
    sorted_arr = [] 
    for e in in_arr: 
     seq_arr.append(e["id"]) 
     sorted_arr.append(int(str(e["my_date"]) + str(e["my_class"]))) 
    sorted_arr.sort() 
    if str(sorted_arr[-1])[-1] == "1": 
     return Row(is_class = 1, seq = seq_arr, sorted_arr = sorted_arr) 
    else: 
     return Row(is_class = 0, seq = seq_arr, sorted_arr = sorted_arr) 
offset = 1465488788 
time_window = 20 
grouped = clustered_logs.map(lambda row: (int((row["my_date"] - offset)/time_window), row)) \ 
         .groupByKey() \ 
         .map(lambda l: createSequences(l[1])) 

それと同じ順序にちょうどグループ同じウィンドウサイズ内のすべてのエントリを、私は、最新のを使用することはできませんそれらが次の第1シーケンスの時間ウィンドウ内にある場合には、シーケンス内のエントリSparkでこれを達成する方法があれば、これで私を助けてもらえますか? おかげで...

答えて

0
time_window = 20 
clustered_logs.groupBy(lambda x: (int(x /time_window), x.my_class))\ 
    .map(lambda x: Row(my_class=x[0][1], my_seq=[ t.id for t in x[1]])) 
  • は窓やクラスによってint(x /time_window)
  • グループでウィンドウを識別します。グループ化キーはタプル(window, class)
  • mapですが、xはタプル((73274439, 1) , itrable<Row>)です。タプル内の2番目の値は、リストを作成するためにforの理解度を使用して反復されます。

参考:答えをScala doc RDD

+0

感謝。しかし、この方法では、2つの時間枠の間に重複があると、(実際には多くの)シーケンスを失うことになります。たとえば、私のデータがABBCDBで、ABBCが最初の時間枠に属している場合、それは2番目のDB部分から始まる次のシーケンスをグループ化します。しかし、私はそれがABBC、BBCD、BCDBなどであることを望んでいます。私はそれが不可能だと思います、それですか?ありがとう... –

関連する問題