私はpysparkを使ってSparkで機械学習アルゴリズムのデータを準備しようとしています。私は、時代、ID、クラス(0または1)のエポックを含むRDDを持っています。次のようになります。PysparkのRDDからの配列の抽出
rdd = sc.parallelize([Row(my_date=1465488788,id=4,my_class=1), Row(my_date=1465488790,id=5,my_class=0), Row(my_date=1465488801,id=23,my_class=1), Row(my_date=1465488805,id=23,my_class=1), Row(my_date=1465488809,id=5,my_class=0), Row(my_date=1465488810,id=32,my_class=0),Row(my_date=1465488826,id=38,my_class=1)])
このデータは日付順に並べられています。私がしたいのは、このデータからシーケンスを抽出することです。このRDDの同じ時間ウィンドウ内の連続したエントリは、同じシーケンスに入る必要があります。例えば、20秒の時間間隔内のすべてのデータは、同じ順序でなければならない。シーケンスの時間ウィンドウは、上記のデータ・セットの20秒であるならば、私は3つの配列に続く作成する必要があります。
[Row(my_seq=[4,5,23,23], my_class=1),Row(my_seq=[5,23,23,5,32], my_class=0),Row(my_seq=[5,32,38], my_class=1)]
クラス番号は、シーケンス内の最新の要素のクラスでなければなりません。私のデータセットはかなり大きくなるので、私はこれを並行して行う必要があります。私は、日付フィールドにデータをグループ化することによって、これを実行しようとしましたが、それはうまくいきませんでした:
def createSequences(in_arr):
in_arr = list(in_arr)
seq_arr = []
sorted_arr = []
for e in in_arr:
seq_arr.append(e["id"])
sorted_arr.append(int(str(e["my_date"]) + str(e["my_class"])))
sorted_arr.sort()
if str(sorted_arr[-1])[-1] == "1":
return Row(is_class = 1, seq = seq_arr, sorted_arr = sorted_arr)
else:
return Row(is_class = 0, seq = seq_arr, sorted_arr = sorted_arr)
offset = 1465488788
time_window = 20
grouped = clustered_logs.map(lambda row: (int((row["my_date"] - offset)/time_window), row)) \
.groupByKey() \
.map(lambda l: createSequences(l[1]))
それと同じ順序にちょうどグループ同じウィンドウサイズ内のすべてのエントリを、私は、最新のを使用することはできませんそれらが次の第1シーケンスの時間ウィンドウ内にある場合には、シーケンス内のエントリSparkでこれを達成する方法があれば、これで私を助けてもらえますか? おかげで...
感謝。しかし、この方法では、2つの時間枠の間に重複があると、(実際には多くの)シーケンスを失うことになります。たとえば、私のデータがABBCDBで、ABBCが最初の時間枠に属している場合、それは2番目のDB部分から始まる次のシーケンスをグループ化します。しかし、私はそれがABBC、BBCD、BCDBなどであることを望んでいます。私はそれが不可能だと思います、それですか?ありがとう... –