2017-10-12 3 views
0

私は以下の構造を有するRDDを持っているの作成方法:
((ByteArray, Idx), ((srcIdx,srcAdress), (destIdx,destAddress)))RDD(スカラ)のネストされたforloop

このビットコインのblockchainのエッジ(トランザクション)の表現。 (ByteArray, Idx)は識別子として見ることができ、残りはエッジです。私の最終的な目標は、ノードをブロックチェーンのグラフ表現に集約することです。私がこれを行うために必要な構造の最初の変更は、同じビットコイントランザクションにあるソースを1つのエッジに(そして最終的には1つのノードに)配置することです。このようにして、私は同じユーザーに属する公開鍵を「クラスタ化」します。
((ByteArray, Idx), (List((srcIdx, srcAddress)), (destIdx, destAddress)))
その他の形式でも同じ機能(たとえばScalaではこれが不可能または論理的でない場合)があります。

私の現在の思考プロセスは以下の通りです。 Javaでは、RDD内のアイテムに対してネストされたforループを実行し、それぞれのループは同じキー((ByteArray, Idx))を持つアイテムのリストを作成します。この後、重複を削除します。 しかし、私はRDDとScalaを扱っているので、これは不可能です。次に、.collect()を実行して、自分のRDD上で別の.map()機能を使用しようとしました。そのコレクションをマップ機能内でループさせました。しかし、Sparkはこれを好まなかった。明らかにコレクションは直列化できないからだ。 次に、私は次のように「ネスト」マップ関数を作成しようとした:フィルタ(またはマップ)関数は().MAPで利用できないので、これは、許可されていない

val aggregatedTransactions = joinedTransactions.map(f => { 
    var list = List[Any](f._2._1) 

    val filtered = joinedTransactions.filter(t => f._1 == t._1) 

    for(i <- filtered){ 
    list ::= i._2._1 
    } 

    (f._1, list, f._2._2) 
}) 

。代替案は何ですか?

私はスカラにとって新しく、有用な背景情報は非常に高く評価されています。

+0

私は、あなたの問題の性質を考えると、誤解を避けるために、入力+出力の例を提供するために有用であろうと思います – dk14

答えて

5

最終的な目標は、ノードをブロックチェーンのグラフ表現に集約することです。私がこれを行うために必要な構造の最初の変更は、同じビットコイントランザクションにあるソースを1つのエッジに(そして最終的には1つのノードに)配置することです。

だから、基本的にあなたがgroupByKeyにしたい:

joinedTransactions.groupByKey().map { 
    // process data to get desired shape 
} 
-1

ネストされたRDDSは、しかし、RDD内のコレクションが可能 であることは不可能です。 forループネスト

を使用できcartesian

DEFデカルト[U](その他:RDD [U])(暗黙arg0に:ClassTag [U]):RDD [(T、 U) ] PermalinkこのRDDのデカルト積と、もう1つの を返します。つまり、aがすべて にあり、bがotherにあるすべての要素(a、b)のペアのRDDです。あなたもそれを達成することができスパークSQLを使用して

val nestedForRDD = rdd1.cartesian(rdd2) 

nestedForRDD.map((rdd1TypeVal, rdd2TypeVal) => { 
    //Do your inner-nested evaluation code here 
}) 

http://bigdatums.net/2016/02/12/how-to-extract-nested-json-data-in-spark/

関連する問題