2016-11-26 7 views
0

同じキーを共有していなくても、1つのRDDの要素を別のRDDに基づいてフィルタリングする方法はありますか?別のRDDに基づいて1つのRDDをプルーン

私は2つのRDDS持っている - ABCとXYZ

abc.collectは、()(この

[[a,b,c],[a,c,g,e],[a,e,b,x],[b,c]] 

xyz.collectのように見えます)この

[a,b,c] 

は、今私がしたいように見えますxyzに存在しないRDD abcのすべての要素を除外します。

言っ操作した後、RDD ABCが次のようになります。

[[a,b,c],[a,c],[a,b],[b,c]] 

私はこのようになり、コード書きました:

​​

をしかし、それは私に、このエラーを与える:

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation 

私は以来、無駄にマップの代わりにフィルタ、ルックアップを試しました。私は同じエラーが発生し続けます。

xyzで収集操作を実行してこのエラーを解決できることはわかっていますが、大規模なデータセットでこれを実行していて、あまりにも多くのメモリを超えているために.collect()がAWSサーバーを強制終了します。したがって、私は.collect()などの高価な操作を使わずにこれを行う必要があります。

答えて

2

次のことが可能です。

# Add index 
abc.zipWithIndex() \ 
    # Flatten values 
    .flatMap(lambda x: [(k, x[1]) for k in x[0]]) \ 
    # Join with xyz (works as filter) 
    .join(xyz.map(lambda x: (x, None))) \ 
    # Group back by index 
    .map(lambda x: (x[1][0], x[0])) \ 
    .groupByKey() \ 
    .map(lambda x: list(x[1])) 

か、xyzにブルームフィルタを作成し、abcをマッピングするためにそれを使用することができます。

+0

私はこれを試しましたが、ハッシュできないタイプのリストエラーで終わった – Piyush

+0

もう一度試してみてください。このステップをステップバイステップで実行しました。なぜ初めて動作しなかったのか分かりません。ありがとうLostinOverflow – Piyush

関連する問題