2016-07-25 2 views
0

私のpysparkプロセスの出力部分は、サイズが不揃いですが、予想通りn ** 2パターン(0,1,2,4,8,16など)です。再分割して、pysparkは、倍精度で増加する部分で負荷を不均一に分散させます。

[(0, u'{"group_id":"1","pertubations":"Current Affairs,Sport,Technology"}'), 
(67, u'{"group_id":"2","pertubations":"Current Affairs,Sport,Celeb Gossip"}')] 

いくつかの些細な処理:

私はこのようにGoogleのBigQueryのからデータをロードします:出力のようになります

dConf = { 
    "mapred.bq.project.id": project_id, 
    "mapred.bq.gcs.bucket": bucket, 
    "mapred.bq.input.project.id": project_id, 
    "mapred.bq.input.dataset.id":dataset_id, 
    "mapred.bq.input.table.id": table_id 
} 

rdd_dataset_raw = sc.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", 
    "org.apache.hadoop.io.LongWritable", 
    "com.google.gson.JsonObject", 
    conf=dConf 
) 

を(rdd_dataset_raw.take(2)を)これは私のプロセスです

rdd_dataset = (
    rdd_dataset_raw 
    .repartition(nr_partitions) 
    .map(lambda t, json=json: json.loads(t[1])) 
) 

このようになります。

[{u'group_id': u'1', u'pertubations': u'Current Affairs,Sport,Technology'}, 
{u'group_id': u'2', u'pertubations': u'Current Affairs,Sport,Celeb Gossip'}] 

私は、GoogleのストレージにRDDを保存する:これはnr_partitions一部のファイルを作成します

rdd_dataset_raw.saveAsTextFile("gs://bucket/directory") 

ただし、これらの部分ファイルのサイズは均等ではありません。これらは、n**2で増加します.nはパーツファイル番号です。換言すれば、

part-00000は0行
part-00001が含ま1つのライン
part-00002 2行
part-00003が含ま含ま4行
part-00004が含ま等8行
が含ま

これらのほとんどは、実際に終了直ちに、後の部分のメモリが不足します。

何が起こっているのですか?パーティションを均等にロードさせるにはどうしたらいいですか?

答えて

0

それはpartitionByrepartitionを交換するのと同じくらい簡単だった:

rdd_dataset = (
    rdd_dataset_raw 
    .partitionBy(nr_partitions) 
    .map(lambda t, json=json: json.loads(t[1])) 
) 

注これは可能な限り早期に行われるために必要な。パーティション化されていないrddを渡してから、後でパーティションを分割しました。

Docs

関連する問題