私はApache Sparkには新しく、米国の州によってデータフレームを再分割しようとしています。私はそれ自身のRDDに各パーティションに分割し、特定の場所に保存する:Spark Dataframeの再パーティション化を保証する方法
schema = types.StructType([
types.StructField("details", types.StructType([
types.StructField("state", types.StringType(), True)
]), True)
])
raw_rdd = spark_context.parallelize([
'{"details": {"state": "AL"}}',
'{"details": {"state": "AK"}}',
'{"details": {"state": "AZ"}}',
'{"details": {"state": "AR"}}',
'{"details": {"state": "CA"}}',
'{"details": {"state": "CO"}}',
'{"details": {"state": "CT"}}',
'{"details": {"state": "DE"}}',
'{"details": {"state": "FL"}}',
'{"details": {"state": "GA"}}'
]).map(
lambda row: json.loads(row)
)
rdd = sql_context.createDataFrame(raw_rdd).repartition(10, "details.state").rdd
for index in range(0, rdd.getNumPartitions()):
partition = rdd.mapPartitionsWithIndex(
lambda partition_index, partition: partition if partition_index == index else []
).coalesce(1)
if partition.count() > 0:
df = sql_context.createDataFrame(partition, schema=schema)
for event in df.collect():
print "Partition {0}: {1}".format(index, str(event))
else:
print "Partition {0}: No rows".format(index)
テストするために、私は50行(この例では10)、異なる各持つS3からファイルをロードします状態をdetails.state
列に入力します。動作を模倣するために、上記の例でデータを並列化しましたが、動作は同じです。私は尋ねた50のパーティションを取得しますが、いくつかは使用されておらず、複数のパーティションには複数の状態のエントリがあります。ここでは10のサンプルセットのための出力です:
Partition 0: Row(details=Row(state=u'AK'))
Partition 1: Row(details=Row(state=u'AL'))
Partition 1: Row(details=Row(state=u'CT'))
Partition 2: Row(details=Row(state=u'CA'))
Partition 3: No rows
Partition 4: No rows
Partition 5: Row(details=Row(state=u'AZ'))
Partition 6: Row(details=Row(state=u'CO'))
Partition 6: Row(details=Row(state=u'FL'))
Partition 6: Row(details=Row(state=u'GA'))
Partition 7: Row(details=Row(state=u'AR'))
Partition 7: Row(details=Row(state=u'DE'))
Partition 8: No rows
Partition 9: No rows
私の質問は:再パーティション戦略がスパークするだけの提案であるか、私のコードとは根本的に間違って何かがあるのでしょうか?