RDD抽象化の前提のため、Sparkアプリケーションでは保存順序が非常に難しいです。あなたが取ることができる最善のアプローチは、ここでやったように、スパークAPIを使ってパンダのロジックを翻訳することです。残念ながら、私はあなたがすべての列に同じフィルター基準を適用できるとは思わないので、手動で複数の列の操作にマスクを翻訳しなければなりませんでした。このDatabricks blog postは、PandasからSparkへの移行に役立ちます。
import pandas as pd
import numpy as np
np.random.seed(1000)
df1 = pd.DataFrame(np.random.randn(10, 4), columns=['a', 'b', 'c', 'd'])
mask = df1.applymap(lambda x: x <-0.7)
df2 = df1[-mask.any(axis=1)]
私たちが望む結果は次のとおりです。
a b c d
1 -0.300797 0.389475 -0.107437 -0.479983
5 -0.334835 -0.099482 0.407192 0.919388
6 0.312118 1.533161 -0.550174 -0.383147
8 -0.326925 -0.045797 -0.304460 1.923010
だからスパークでは、我々は、パンダのデータフレームを使用してデータフレームを作成し、正しい結果セットを取得するためにfilter
を使用します。
df1_spark = sqlContext.createDataFrame(df1).repartition(10)
df2_spark = df1_spark.filter(\
(df1_spark.a > -0.7)\
& (df1_spark.b > -0.7)\
& (df1_spark.c > -0.7)\
& (df1_spark.d > -0.7)\
)
正しい結果が得られます(注文が保存されていないことに注意してください)。
df2_spark.show()
+-------------------+--------------------+--------------------+-------------------+
| a| b| c| d|
+-------------------+--------------------+--------------------+-------------------+
|-0.3348354532115408| -0.0994816980097769| 0.40719210034152314| 0.919387539204449|
| 0.3121180100663634| 1.5331610653579348| -0.5501738650283003|-0.3831474108842978|
|-0.3007966727870205| 0.3894745542873072|-0.10743730169089667|-0.4799830753607686|
| -0.326924675176391|-0.04579718800728687| -0.3044600616968845| 1.923010130400007|
+-------------------+--------------------+--------------------+-------------------+
あなたは絶対にパンダを使用してマスクを作成するためにを必要とする場合は、オリジナルのパンダのデータフレームのインデックスを維持し、インデックス列に基づいて放送変数とフィルタリングを作成することにより、スパークから個々のレコードを削除する必要があります。ここにYMMVの例があります。
インデックスを追加します。
df1['index_col'] = df1.index
df1
a b c d index_col
0 -0.804458 0.320932 -0.025483 0.644324 0
1 -0.300797 0.389475 -0.107437 -0.479983 1
2 0.595036 -0.464668 0.667281 -0.806116 2
3 -1.196070 -0.405960 -0.182377 0.103193 3
4 -0.138422 0.705692 1.271795 -0.986747 4
5 -0.334835 -0.099482 0.407192 0.919388 5
6 0.312118 1.533161 -0.550174 -0.383147 6
7 -0.822941 1.600083 -0.069281 0.083209 7
8 -0.326925 -0.045797 -0.304460 1.923010 8
9 -0.078659 -0.582066 -1.617982 0.867261 9
は、Spark放送変数にマスクを変換します
myIdx = sc.broadcast(df2.index.tolist())
スパークAPIを使用してデータフレームを作成し、変更します。
df1_spark.rdd.filter(lambda row: row and row['index_col'] not in myIdx.value).collect()
df2_spark = df1_spark.rdd.filter(lambda row: row and row['index_col'] in myIdx.value).toDF()
df2_spark.show()
+-------------------+--------------------+--------------------+-------------------+---------+
| a| b| c| d|index_col|
+-------------------+--------------------+--------------------+-------------------+---------+
|-0.3007966727870205| 0.3894745542873072|-0.10743730169089667|-0.4799830753607686| 1|
|-0.3348354532115408| -0.0994816980097769| 0.40719210034152314| 0.919387539204449| 5|
| 0.3121180100663634| 1.5331610653579348| -0.5501738650283003|-0.3831474108842978| 6|
| -0.326924675176391|-0.04579718800728687| -0.3044600616968845| 1.923010130400007| 8|
+-------------------+--------------------+--------------------+-------------------+---------+
良いのフィルタの
else
一部を分割しては怒鳴る言及したと仮定すると。私はちょうどこのパズルを解決するためのいくつかのより良い例で試しました。ただ見ているだけです。 –