2017-08-03 5 views
0

私はPySparkを使用していますが、RDDに長い文字列が含まれていると、2つのRDDをマージするのがなぜ失敗するのか、特に長い文字列を持つ2つのRDDをマージすると、値の順番が混ざります。

、Iは、構造を有する2つのRDDS

RDD1 =([KEY1、[string11、string12]、[KEY2、[string21、string22]]、...)

を有します

RDD2 =([キー1、[string13、string14、string15]]、[KEY2、[string23、string24、string25]]、...)

文字列は、彼らがしている、すなわち(かなり長くなることができます

数MBの価値があります)。私の最終目標は、 "合併し、平坦化" コンテンツ

RDD3 =([キー1、string11、string12、string13、string14、string15]、[KEY2、string21、string22、string23、string24とRDD新しいを取得することです、string25]、...)私はPythonコマンド

rdd3 = sparkContext.union([rdd1, rdd2]).groupByKey() \ 
     .mapValues(lambda x: list(x)).map(lambda x: [x[0]] + list(x[1][0]) + list(x[1][1])) 

それは簡単な作業のように思えると、文字列が小さい場合は確かに、このコマンドがうまく機能を使用し、この目的のために

。しかし、非常に長い文字列を得RDDの順序は突然

RDD3 =([KEY1、string14、string15、string12、string13、string11]、[KEY2、string21、等一見ランダムに混ざっていますstring22、string24、string25、string23]、...)

unionは順序を保持するように思われるが、混合はどこかgroupByKeymapの間に発生しなければなりません。私は平坦化が問題ではないと思う。しかしgroupByKeyResultIterableを返すので、詳細を把握するのは難しいです。要約すると、実際に何が起こっているのか分かりません。誰も私にヒントを与えることができますか?私は現在、いくつかのワーカーを持つローカルテストクライアントでSparkを実行しています。

答えて

1

ここで起こっていることはシャッフルです。シャッフル中の操作の順序は非決定的です。場合によっては順序を保持することもできますが、保証されていないため、ローカルモードなどの単純なケースに限定されています。

各シャッフル後に追加の注文情報とリゾート値を保管しない限り(非常に高価)、回避策はありません。

関連する問題