2017-01-19 8 views
1

多くのIDを効率的に検索したい。Apache Sparkでの効率的なデータフレーム参照

+-------+----------------+ 
| URI|  Links_lists| 
+-------+----------------+ 
| URI_1|[URI_8,URI_9,...| 
| URI_2|[URI_6,URI_7,...| 
| URI_3|[URI_4,URI_1,...| 
| URI_4|[URI_1,URI_5,...| 
| URI_5|[URI_3,URI_2,...| 
+-------+----------------+ 

私の最初のステップは、df_sourceの外にRDDを作るために、次のようになります:

rdd_source = df_source.rdd 
私は何を持っていることは、この dataframe df_sourceのように見えますが、10人の労働者に配布夫婦百万のレコードを持つこと dataframeです

out of rdd_source私はRDDを作成して、IDを持つURIだけを含んでいます。私はそのようにこれを行う:

rdd_index = rdd_source.map(lambda x: x[0]).zipWithUniqueId() 

今私も.flatMap()すべての関係が含まれていRDDまででrdd_source。これまではLinks_list列にのみ含まれていました。

rdd_relations = rdd_source.flatMap(lamda x: x) 

私が参加し、私は(私はこれで間違っているかもしれない)と思いますしたいので、今私は戻ってdataframesrdd_indexrdd_relationsの両方を変換するには、dataframesに参加する高速です。

schema_index = StructType([ 
    StructField("URI", StringType(), True), 
    StructField("ID", IntegerType(), True)) 

df_index = sqlContext.createDataFrame(rdd_index, schema=schema_index) 

schema_relation = StructType([ 
    StructField("URI", StringType(), True), 
    StructField("LINK", StringType(), True)) 

df_relations = sqlContext.createDataFrame(rdd_relations, schema=schema_relation) 

結果dataframesは、これら二つのようになります。私がどうなるdf_relationsに長い文字列のURIを置き換えるために、今

df_index: 
+-------+-------+ 
| URI|  ID| 
+-------+-------+ 
| URI_1|  1| 
| URI_2|  2| 
| URI_3|  3| 
| URI_4|  4| 
| URI_5|  5| 
+-------+-------+ 

df_relations: 
+-------+-------+ 
| URI| LINK| 
+-------+-------+ 
| URI_1| URI_5| 
| URI_1| URI_8| 
| URI_1| URI_9| 
| URI_2| URI_3| 
| URI_2| URI_4| 
+-------+-------+ 

df_indexに参加し、最初の参加:

df_relations =\ 
df_relations.join(df_index, df_relations.URI == df_index.URI,'inner')\ 
      .select(col(ID).alias(URI_ID),col('LINK')) 

これは私にこのように見てdataframe得なければならない:

df_relations: 
+-------+-------+ 
| URI_ID| LINK| 
+-------+-------+ 
|  1| URI_5| 
|  1| URI_8| 
|  1| URI_9| 
|  2| URI_3| 
|  2| URI_4| 
+-------+-------+ 

そして、第二の参加:

df_relations =\ 
df_relations.join(df_index, df_relations.LINK == df_index.URI,'inner')\ 
      .select(col(URI_ID),col('ID').alias(LINK_ID)) 

これが最後のdataframe私が必要とする1になるはずですが。このように見える

df_relations: 
+-------+-------+ 
| URI_ID|LINK_ID| 
+-------+-------+ 
|  1|  5| 
|  1|  8| 
|  1|  9| 
|  2|  3| 
|  2|  4| 
+-------+-------+ 

ここで、すべてのURIはdf_indexのIDで置き換えられます。

これは、リレーションテーブルの両方の列にあるすべてのURIのIDを調べる効率的な方法ですか、これを行うより効果的な方法はありますか?

私はあなたが説明した動作のためにRDDを使用する必要はありませんPythonの3.5

答えて

1

とApacheのスパーク2.1.0を使用しています。 RDDを使用すると非常にコストがかかることがあります。

import pyspark.sql.functions as f 
# add a unique id for each URI 
withID = df_source.withColumn("URI_ID", f.monotonically_increasing_id()) 
# create a single line from each element in the array 
exploded = withID.select("URI_ID", f.explode("Links_lists").alias("LINK") 
linkID = withID.withColumnRenamed("URI_ID", "LINK_ID").drop("Links_lists") 
joined= exploded.join(linkID, on=exploded.LINK==linkID.URI).drop("URI").drop("LINK") 

最後に、(基本的に置き換え列でdf_sourceである)のlinkIDが比較的小さい場合(つまり、完全に単一のワーカーに含まれていることができます。第二に、あなたは、2つのあなただけのいずれかを行うことができ、加入する必要はありません)あなたはそれを放送することができます。参加前に以下を追加してください:

linkID = f.broadcast(linkID) 
+0

私は明日お試しいただきありがとうございます! – Thagor

関連する問題