こちらにご負担ください。私は3つのRDD(Hadoopから来ている)を持っています。 3つすべてには、一致/結合できるippaddressやboxnumberなどの固有のキーがあります。ここには、すべてのテーブルのサンプルデータがあります。テーブル列のボックス番号は、一致する前にnumberに変換する必要があります。 JavaでJavaを使ってsparkで3つのRDDテーブルを結合するには?
Table A:
ipaddress|boxnumber|cardnumber
94.254.57.16|59774DEa1|0D1EDF40
94.154.57.176|5F7377Ga9|0D3F796D
Table B:
cardno,boxnumber
1500914,2000096
1500413,2211469
Table C:
ipaddress|kanal|bitrate|kanaltimespent|date|country
94.254.57.16|sky|2023|003DF6A.ts|12-02-2016|chile
94.154.57.176|ITV|3425|003DF6A.ts|23-04-2014|egypt
私の最初の試み:
//TABLE A
JavaSparkContext sc = SetupSparkContext("SparkSample");
JavaRDD<ExtractTable_A> ta_RDD= ExtractTable_A.getRDD(sc);
JavaPairRDD<String, ExtractTable_A> A_PairRDD = ta_RDD.mapToPair(new PairFunction<extractTable_A, String, ExtractTable_A>()
{
@Override
public Tuple2<String, ExtractTable_A> call(ExtractTable_A extractTable_A) throws Exception
{
String [] A= extractTable_A.toString().split("|") ;
return new Tuple2<>(A[0],extractTable_A);
}
});
//TABLE B
JavaRDD<ExtractOttPdl> tb_RDD = ExtractTableB.getRDD(sc);
JavaPairRDD<String, ExtractTable_B> BPairRDD = tb_RDD.mapToPair(new PairFunction<extractTable_B, String, ExtractTable_B>()
{
@Override
public Tuple2<String, ExtractTable_B> call(ExtractTable_B extractTable_B) throws Exception
{
String [] B= extractTable_B.toString().split(",") ;
return new Tuple2<>(B[1],extractTable_B);
}
});
//TABE C
JavaRDD<ExtractTable_C> tc_RDD = ExtractTableC.getRDD(sc);
JavaPairRDD<String, ExtractTable_C> CPairRDD = tb_RDD.mapToPair(new PairFunction<extractTable_C, String, ExtractTable_C>()
{
@Override
public Tuple2<String, ExtractTableC> call(ExtractTableC extractTable_C) throws Exception
{
String [] C= extractTable_A.toString().split("|") ;
return new Tuple2<>(C[0],extractTable_A);
}
});
//At this point i need to join and create an .txt output file
最終的な結果は、私が参加するために管理しているこれらのヘッダーを持つファイル
KANAL|BITRATE|TIMESPENT|DATE|COUNTRY
=== ===更新 ことshoudテーブルAとテーブルBは今、私はテーブルAにテーブルCに参加する方法に固執していますか?
//Joined table A and B
JavaPairRDD<String, Tuple2<ExtractTableA, ExtractTableB>> join_1 = A_PairRDD.join(B_PairRDD);
. . .
//Joined table A and C
JavaPairRDD<String, Tuple2<ExtractTableA, ExtractTableC>> Join_2 = A_PairRDD.join(B_PairRDD);
// Output results from TableA and TableB
join_1.map(in -> {
return new ResultStringBuilder("|")
.append(Long.parseLong((in._2()._1().getCardno().trim()),16))
.append(Long.parseLong((in._2()._1().getBoxno().trim()),16))
.append(in._2()._2().getBoxno())
*** HERE I NEED TO ALSO APPEND THE COLUMN FROM TableC
.toString();
})
.saveAsTextFile("c:\outfile");
フィードバックいただきありがとうございます。私はあなたが示唆したことをしたが、依然として立ち往生している。私は私の質問を更新しました。あなたは一見を持てますか? –
ありがとう、私は今問題を解決しました。 –