2017-10-16 6 views
0

hdfsからmongodbにデータを移動しようとしています。私は以下のようなコマンドラインでこれを達成することができます。hdfsからmongodbへのデータのエクスポート

hadoop fs -text "/user/name.txt" | mongoimport --host 127.0.0.1:27018 -d cds -c hello --type tsv --headerline 

私はこれにスカラコードを書く必要があります。私はファイルシステムに複数のファイルを持っています。私はmongo-hadoopコネクタをチェックしましたが、これとは逆のことが必要です。 hdfsからファイルを読み込み、scalaのmongodbにダンプします。

+0

Apacheのスパークは、データソースに応じて、とにかく... Mongoのコネクタがあり、ここでスパークすることによりMongoImport、おそらくHDFSは必要ありません –

+0

また、あなたはそのライブラリを誤解しましたか? * MongoDB(またはデータ形式のバックアップファイル、BSON)を入力ソース、**または出力先***として使用できるようにします –

答えて

0

述べたように、あなたも、ちょうどこのcomplete note here

public class LogReducer extends Reducer<Text, IntWritable, NullWritable, MongoUpdateWritable> { 

    @Override 
    public void reduce(final Text pKey, 
         final Iterable<IntWritable> pValues, 
         final Context pContext) 
      throws IOException, InterruptedException{ 

     int count = 0; 
     for(IntWritable val : pValues){ 
      count += val.get(); 
     } 

     BasicBSONObject query = new BasicBSONObject("devices", new ObjectId(pKey.toString())); 
     BasicBSONObject update = new BasicBSONObject("$inc", new BasicBSONObject("logs_count", count)); 
     pContext.write(null, new MongoUpdateWritable(query, update, true, false)); 
    } 

} 

またはハイブ表によるような単純なMRを導入することで、分散同じことを実行することができ、シングルスレッドのアプリケーションとして上記動作するので同じことを行うための複数の方法がありますが、 here多くのコードを書く必要はありません。 それとも、やっているすべてのテキストファイルをダウンロードし、それをストリーミングする場合に

package com.mongodb.spark_examples; 

import com.mongodb.spark.MongoSpark; 
import com.mongodb.spark.config.WriteConfig; 

import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.sql.SparkSession; 

import org.bson.Document; 

import static java.util.Arrays.asList; 

import java.util.HashMap; 
import java.util.Map; 


public final class WriteToMongoDBWriteConfig { 

    public static void main(final String[] args) throws InterruptedException { 

    SparkSession spark = SparkSession.builder() 
     .master("local") 
     .appName("MongoSparkConnectorIntro") 
     .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection") 
     .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection") 
     .getOrCreate(); 

    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); 

    // Create a custom WriteConfig 
    Map<String, String> writeOverrides = new HashMap<String, String>(); 
    writeOverrides.put("collection", "spark"); 
    writeOverrides.put("writeConcern.w", "majority"); 
    WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides); 

    // Create a RDD of 10 documents 
    JavaRDD<Document> sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map 
     (new Function<Integer, Document>() { 
     public Document call(final Integer i) throws Exception { 
       return Document.parse("{spark: " + i + "}"); 
      } 
     }); 

    /*Start Example: Save data from RDD to MongoDB*****************/ 
    MongoSpark.save(sparkDocuments, writeConfig); 
    /*End Example**************************************************/ 

    jsc.close(); 

    } 

} 

Scalaの

def main(args: Array[String]): Unit = { 
     import org.apache.spark.sql.SparkSession 

    val spark = SparkSession.builder() 
     .master("local") 
     .appName("MongoSparkConnectorIntro") 
     .config("spark.mongodb.input.uri", "mongodb://192.168.2.13:28111,192.168.2.14:28112,192.168.2.15:28113/test.myCollection") 
     .config("spark.mongodb.output.uri", "mongodb://192.168.2.13:28111,192.168.2.14:28112,192.168.2.15:28113/test.myCollection") 
     .getOrCreate() 
    import org.apache.spark.sql.SparkSession 

    val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(spark.sparkContext))) 
    val sparkDocuments = spark.sparkContext.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}"))) 

    MongoSpark.save(sparkDocuments, writeConfig) 

    } 
関連する問題