複数のJavaRDDをマージしようとしましたが、2つしかマージされません。私はしばらくの間これで苦労してきましたが、全体的に私は複数のコレクションを取得し、sqlContextを使用してグループを作成し、すべての結果を印刷できるようにしたいと考えています。複数のJavaRDDをマージする
ここに私のコードのコレクション SQLContext sqlContext =新しいorg.apache.spark.sql.SQLContext(SC)を印刷
JavaRDD<AppLog> logs = mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.ppa_logs").union(
mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.fav_logs").union(
mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.pps_logs").union(
mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.dd_logs").union(
mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.ppt_logs")
)
)
)
);
public JavaRDD<AppLog> mapCollection(JavaSparkContext sc ,String uri){
Configuration mongodbConfig = new Configuration();
mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
mongodbConfig.set("mongo.input.uri", uri);
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
mongodbConfig, // Configuration
MongoInputFormat.class, // InputFormat: read from a live cluster.
Object.class, // Key class
BSONObject.class // Value class
);
return documents.map(
new Function<Tuple2<Object, BSONObject>, AppLog>() {
public AppLog call(final Tuple2<Object, BSONObject> tuple) {
AppLog log = new AppLog();
BSONObject header =
(BSONObject) tuple._2();
log.setTarget((String) header.get("target"));
log.setAction((String) header.get("action"));
return log;
}
}
);
}
//;あなたが複数のJavaRDDs
をマージしたい場合は
DataFrame logsSchema = sqlContext.createDataFrame(logs, AppLog.class);
logsSchema.registerTempTable("logs");
DataFrame groupedMessages = sqlContext.sql(
"select * from logs");
// "select target, action, Count(*) from logs group by target, action");
// "SELECT to, body FROM messages WHERE to = \"[email protected]\"");
groupedMessages.show();
logsSchema.printSchema();
をチェック! 2番目の質問:再帰的な方法でユニオンを呼び出すのはなぜですか(再帰的に実行されないことはわかっています)。私はrdd1.union(rdd2).union(rdd3)をそういう意味で使っています。共用体の戻り値の型はrddでなければなりません。あなたの文章スタイルで - > mapCollection(sth1、sth2).union(mapCollection(sth1、sth2))など – hasan
こんにちは、私はrddを印刷するためにsqlcontextを使用し、上記の質問を更新して、データの印刷方法を確認できます。私はiveが多くの方法を試したとして再帰的なスタイルのコードを使用しましたが、これは最も近いiveが来たこれを働かせませんでした。どんな勧告? –
それは私にとって正しいようです。私を混乱させる唯一の事は再帰的な文体です。おそらく、あなたはそれぞれのログrddをロードして後で結合を行い、カウントまたは各rddの最初の要素を出力して、それらが空でないかどうかを調べることができます。機能的な文章スタイルは、より良い可読性のために導かれます。ちょうど助言のために – hasan