2016-05-06 2 views
0

こちらにご負担ください。私は3つのRDD(Hadoopから来ている)を持っています。 3つすべてには、一致/結合できるippaddressやboxnumberなどの固有のキーがあります。ここには、すべてのテーブルのサンプルデータがあります。テーブル列のボックス番号は、一致する前にnumberに変換する必要があります。 JavaでJavaを使ってsparkで3つのRDDテーブルを結合するには?

Table A: 
ipaddress|boxnumber|cardnumber 
94.254.57.16|59774DEa1|0D1EDF40 
94.154.57.176|5F7377Ga9|0D3F796D 

Table B: 
cardno,boxnumber 
1500914,2000096 
1500413,2211469 

Table C: 
ipaddress|kanal|bitrate|kanaltimespent|date|country 
94.254.57.16|sky|2023|003DF6A.ts|12-02-2016|chile 
94.154.57.176|ITV|3425|003DF6A.ts|23-04-2014|egypt 

私の最初の試み:

//TABLE A 
    JavaSparkContext sc = SetupSparkContext("SparkSample");  
    JavaRDD<ExtractTable_A> ta_RDD= ExtractTable_A.getRDD(sc); 
    JavaPairRDD<String, ExtractTable_A> A_PairRDD = ta_RDD.mapToPair(new PairFunction<extractTable_A, String, ExtractTable_A>() 
    { 
    @Override 
    public Tuple2<String, ExtractTable_A> call(ExtractTable_A extractTable_A) throws Exception 
    { 
     String [] A= extractTable_A.toString().split("|") ; 
     return new Tuple2<>(A[0],extractTable_A); 
     } 
    }); 
    //TABLE B 
    JavaRDD<ExtractOttPdl> tb_RDD = ExtractTableB.getRDD(sc);  
    JavaPairRDD<String, ExtractTable_B> BPairRDD = tb_RDD.mapToPair(new PairFunction<extractTable_B, String, ExtractTable_B>() 
    { 
    @Override 
    public Tuple2<String, ExtractTable_B> call(ExtractTable_B extractTable_B) throws Exception 
    { 
     String [] B= extractTable_B.toString().split(",") ; 
     return new Tuple2<>(B[1],extractTable_B); 
     } 
    }); 

    //TABE C 
    JavaRDD<ExtractTable_C> tc_RDD = ExtractTableC.getRDD(sc);  
    JavaPairRDD<String, ExtractTable_C> CPairRDD = tb_RDD.mapToPair(new PairFunction<extractTable_C, String, ExtractTable_C>() 
    { 
    @Override 
    public Tuple2<String, ExtractTableC> call(ExtractTableC extractTable_C) throws Exception 
    { 
     String [] C= extractTable_A.toString().split("|") ; 
     return new Tuple2<>(C[0],extractTable_A); 
     } 
    }); 

    //At this point i need to join and create an .txt output file 

最終的な結果は、私が参加するために管理しているこれらのヘッダーを持つファイル

KANAL|BITRATE|TIMESPENT|DATE|COUNTRY 

=== ===更新 ことshoudテーブルAとテーブルBは今、私はテーブルAにテーブルCに参加する方法に固執していますか?

 //Joined table A and B 
    JavaPairRDD<String, Tuple2<ExtractTableA, ExtractTableB>> join_1 = A_PairRDD.join(B_PairRDD); 
    . . . 
    //Joined table A and C 
    JavaPairRDD<String, Tuple2<ExtractTableA, ExtractTableC>> Join_2 = A_PairRDD.join(B_PairRDD); 


    // Output results from TableA and TableB 
    join_1.map(in -> { 
    return new ResultStringBuilder("|") 
       .append(Long.parseLong((in._2()._1().getCardno().trim()),16)) 
       .append(Long.parseLong((in._2()._1().getBoxno().trim()),16)) 
       .append(in._2()._2().getBoxno()) 
       *** HERE I NEED TO ALSO APPEND THE COLUMN FROM TableC 
       .toString(); 
    }) 
      .saveAsTextFile("c:\outfile"); 

答えて

0

あなたはスパークAPIで作業しているとき、あなたは常にRDDは不変であるので、あなたはRDDのsturctureで何かを変更する場合、新しいRDDを作成することを忘れないでください。 3つの方法は、この場合には参加行うために

、 新しいキーと値のペアでPairRDDを持ちたいので、表のためのためのユニークなキーあなたが最初の2つのテーブルを結合した後、 を新しいJavaPairRDDを作成する必要がありますA、B、Cは異なる。 あなたがテーブルを結合することができ 方法は、このようなものです(最初の最初またはAC ABへの参加のいずれか)これには2つの方法が考えられます:

表A - 表B(キーとPairRDD:多分boxnumberやカード番号、またはその両方)

あなたは表Aと表Bに参加した後、あなたは私達がのキーに一意の列を移動した後、表C.

// joinedAB is RDD resulting from join operation of Table A and C 
JavaPairRDD joinedABForC = joinedAB.map(l -> new Tuple2(l[0], l)); 
// now joinedABForC has ipaddress as the RDD's key 
// join rdd joinedABForC with Table C 

に参加したいので、あなたがキーIPアドレスを持つ新しいPairRDDを作成する必要がありますpairRddを使用すると、これをテーブルCに参加させることができ、3方向ジョインが行われます。

参加したテーブルAB - テーブルC(PairRddキー:ipaddress)

+0

フィードバックいただきありがとうございます。私はあなたが示唆したことをしたが、依然として立ち往生している。私は私の質問を更新しました。あなたは一見を持てますか? –

+0

ありがとう、私は今問題を解決しました。 –

関連する問題