2016-10-13 2 views
1

複数のJavaRDDをマージしようとしましたが、2つしかマージされません。私はしばらくの間これで苦労してきましたが、全体的に私は複数のコレクションを取得し、sqlContextを使用してグループを作成し、すべての結果を印刷できるようにしたいと考えています。複数のJavaRDDをマージする

ここに私のコードのコレクション SQLContext sqlContext =新しいorg.apache.spark.sql.SQLContext(SC)を印刷

JavaRDD<AppLog> logs = mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.ppa_logs").union(
           mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.fav_logs").union(
           mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.pps_logs").union(
            mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.dd_logs").union(
            mapCollection(sc, "mongodb://hadoopUser:[email protected]:27017/hbdata.ppt_logs") 
           ) 
           ) 
          ) 
         ); 


public JavaRDD<AppLog> mapCollection(JavaSparkContext sc ,String uri){ 

    Configuration mongodbConfig = new Configuration(); 
    mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat"); 
    mongodbConfig.set("mongo.input.uri", uri); 

    JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
     mongodbConfig,   // Configuration 
     MongoInputFormat.class, // InputFormat: read from a live cluster. 
     Object.class,    // Key class 
     BSONObject.class   // Value class 
    ); 

    return documents.map(

     new Function<Tuple2<Object, BSONObject>, AppLog>() { 

      public AppLog call(final Tuple2<Object, BSONObject> tuple) { 
       AppLog log = new AppLog(); 
       BSONObject header = 
       (BSONObject) tuple._2(); 

       log.setTarget((String) header.get("target")); 
       log.setAction((String) header.get("action")); 

       return log; 
      } 
     } 
    ); 
} 

//;あなたが複数のJavaRDDsをマージしたい場合は

DataFrame logsSchema = sqlContext.createDataFrame(logs, AppLog.class); 
    logsSchema.registerTempTable("logs"); 

    DataFrame groupedMessages = sqlContext.sql(
     "select * from logs"); 
     // "select target, action, Count(*) from logs group by target, action"); 

     // "SELECT to, body FROM messages WHERE to = \"[email protected]\""); 



    groupedMessages.show(); 

    logsSchema.printSchema(); 
+0

をチェック! 2番目の質問:再帰的な方法でユニオンを呼び出すのはなぜですか(再帰的に実行されないことはわかっています)。私はrdd1.union(rdd2).union(rdd3)をそういう意味で使っています。共用体の戻り値の型はrddでなければなりません。あなたの文章スタイルで - > mapCollection(sth1、sth2).union(mapCollection(sth1、sth2))など – hasan

+0

こんにちは、私はrddを印刷するためにsqlcontextを使用し、上記の質問を更新して、データの印刷方法を確認できます。私はiveが多くの方法を試したとして再帰的なスタイルのコードを使用しましたが、これは最も近いiveが来たこれを働かせませんでした。どんな勧告? –

+0

それは私にとって正しいようです。私を混乱させる唯一の事は再帰的な文体です。おそらく、あなたはそれぞれのログrddをロードして後で結合を行い、カウントまたは各rddの最初の要素を出力して、それらが空でないかどうかを調べることができます。機能的な文章スタイルは、より良い可読性のために導かれます。ちょうど助言のために – hasan

答えて

2

、単に代わりにrdd1.union(rdd2).union(rdd3)sc.union(rdd1,rdd2,..)を使用しています。

また、あなたはちょうど2つのRDDSが団結していることを認識しないか、このRDD.union vs SparkContex.union

+0

おかげさまです。私はまだ最初の2つのrddを取得したように思えます。あまりにも確かでない理由 –

+0

sc.uniom(rdd1、rdd2、...)を使用した後でも –

+0

誰かが私を叩いてください。私は、「トップ20の行しか表示していません」という私の値を印刷する場所の底に気づいた。私はこれを過去6〜7時間働いています –

関連する問題