2017-06-21 12 views
2

私の既存のプロジェクトでは、Hadoopのmap-reduceを使用してXML形式のカスタムキーと値を持つシーケンスファイルを生成しています。SparkのプレーンテキストファイルをHadoopシーケンスファイルに変換

XML値は、入力ソースから一度に1行を読み込むことによって生成され、RecordReaderは、XML形式の次の値をプレーンテキストから返すように実装されています。

入力ソースファイルは、3行(1行ヘッダと行の残りの実際のデータを有している)

id|name|value 
1|Vijay|1000 
2|Gaurav|2000 
3|Ashok|3000 

ポストマップ法は、シーケンスファイルは、以下のようにデータを持っていた:

FeedInstanceKey{feedInstanceId=1000, entity=bars} <?xml version='1.0' encoding='UTF-8'?><bars><id>1</id><name>Vijay</name><value>1000</value></bars> 
FeedInstanceKey{feedInstanceId=1000, entity=bars} <?xml version='1.0' encoding='UTF-8'?><bars><id>2</id><name>Gaurav</name><value>2000</value></bars> 
FeedInstanceKey{feedInstanceId=1000, entity=bars} <?xml version='1.0' encoding='UTF-8'?><bars><id>3</id><name>Ashok</name><value>3000</value></bars> 

質問:希望をSparkでこれを実装する。基本的には、入力ファイルを読み、上記のようにキー値のペアを生成します。

既存のInputFormat、したがって私のHadoopマッパークラスで使用されているRecordReaderを再利用する方法はありません。

RecordReaderはプレーンテキスト行をXMLに変換し、context.write()メソッドで書き込むためのHadoopマップメソッドに値として返すロジックを持っています。

お勧めです。

答えて

3

これは、External DatasetsセクションのSparkドキュメントに記載されています。あなたのための重要な部分は、次のとおりです。他のHadoop InputFormatsについて、あなたは任意のJobConf と入力フォーマットクラス、キークラスと値クラスを取る JavaSparkContext.hadoopRDD方法を、使用することができます

。これらを入力ソースでHadoopジョブと同じように設定してください( )。 「新しい」MapReduce API(org.apache.hadoop.mapreduce)に基づいて、InputFormatsにJavaSparkContext.newAPIHadoopRDDを使用することもできます。

ここでそれを使用する方法をdemostrating簡単な例です:

public final class ExampleSpark { 

    public static void main(String[] args) throws Exception { 
     JavaSparkContext spark = new JavaSparkContext(); 
     Configuration jobConf = new Configuration(); 

     JavaPairRDD<LongWritable, Text> inputRDD = spark.newAPIHadoopFile(args[0], TextInputFormat.class, LongWritable.class, Text.class, jobConf); 
     System.out.println(inputRDD.count()); 

     spark.stop(); 
     System.exit(0); 
    } 
} 

あなたはJavaSparkContext hereのJavadocを参照してくださいすることができます。

関連する問題