2017-09-19 32 views
0

JSONからORCへの変換プロセスを自動化する必要があります。 JsonReaderがマップタイプとthrows an exceptionを処理しない点を除いて、私はApacheのORC-toolsパッケージを使用してほぼそこに到達することができました。したがって、以下は動作しますが、マップタイプは処理されません。Java:ファイルからJSONを読み込み、ORCに変換してファイルに書き込む

Path hadoopInputPath = new Path(input); 
    try (RecordReader recordReader = new JsonReader(hadoopInputPath, schema, hadoopConf)) { // throws when schema contains Map type 
     try (Writer writer = OrcFile.createWriter(new Path(output), OrcFile.writerOptions(hadoopConf).setSchema(schema))) { 
      VectorizedRowBatch batch = schema.createRowBatch(); 
      while (recordReader.nextBatch(batch)) { 
       writer.addRowBatch(batch); 
      } 
     } 
    } 

だから、私は将来的に、私はマイナーコードの変更と、このようなAVROなどの他の形式に変換することができます追加の利点を持っているJSONツーORC変換用のハイブクラスを使用してに探し始めました。しかし、私はHiveクラスを使ってこれを行う最良の方法が何であるか分かりません。具体的には、以下に示すようにHCatRecordをファイルに書き込む方法が明確ではありません。

HCatRecordSerDe hCatRecordSerDe = new HCatRecordSerDe(); 
    SerDeUtils.initializeSerDe(hCatRecordSerDe, conf, tblProps, null); 

    OrcSerde orcSerde = new OrcSerde(); 
    SerDeUtils.initializeSerDe(orcSerde, conf, tblProps, null); 

    Writable orcOut = orcSerde.serialize(hCatRecord, hCatRecordSerDe.getObjectInspector()); 
    assertNotNull(orcOut); 

    InputStream input = getClass().getClassLoader().getResourceAsStream("test.json.snappy"); 
    SnappyCodec compressionCodec = new SnappyCodec(); 
    try (CompressionInputStream inputStream = compressionCodec.createInputStream(input)) { 
     LineReader lineReader = new LineReader(new InputStreamReader(inputStream, Charsets.UTF_8)); 
     String jsonLine = null; 
     while ((jsonLine = lineReader.readLine()) != null) { 
      Writable jsonWritable = new Text(jsonLine); 
      DefaultHCatRecord hCatRecord = (DefaultHCatRecord) jsonSerDe.deserialize(jsonWritable); 
      // TODO: Write ORC to file???? 
     } 
    } 

上記のコードを完成させる方法や、JSON-ORCを行う簡単な方法については、大変感謝しています。ここで

+0

正直なところ、Spark/Pig /実際のHiveQLを使用してこれを行います –

+0

マップは通常のJSONオブジェクトと似ていませんか?したがって、構造を誇示するのか? –

+0

cricket_007、このJSONからORCへの変換は、すでにJSONデータを受け取っている他のもの(アーカイブなど)を行うWebサービスの一部として行う必要があります。したがって、Spark/Hiveジョブでこの変換を行うことは、JSONデータをこれらのジョブに再送信する必要があるため、私たちの選択肢ではありません(他の場所でこの変換に使用しましたが)。 – alecswan

答えて

0

は、私がcricket_007提案あたりのスパークライブラリを使用してやってしまったものです:(幸せのmaven-重複ファインダー・プラグインを保つために、いくつかの除外を用いて)

Mavenの依存関係:

<properties> 
     <dep.jackson.version>2.7.9</dep.jackson.version> 
     <spark.version>2.2.0</spark.version> 
     <scala.binary.version>2.11</scala.binary.version> 
    </properties> 

    <dependency> 
     <groupId>com.fasterxml.jackson.module</groupId> 
     <artifactId>jackson-module-scala_${scala.binary.version}</artifactId> 
     <version>${dep.jackson.version}</version> 
     <exclusions> 
      <exclusion> 
       <groupId>com.google.guava</groupId> 
       <artifactId>guava</artifactId> 
      </exclusion> 
     </exclusions> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-hive_${scala.binary.version}</artifactId> 
     <version>${spark.version}</version> 
     <exclusions> 
      <exclusion> 
       <groupId>log4j</groupId> 
       <artifactId>apache-log4j-extras</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>org.apache.hadoop</groupId> 
       <artifactId>hadoop-client</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>net.java.dev.jets3t</groupId> 
       <artifactId>jets3t</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>com.google.code.findbugs</groupId> 
       <artifactId>jsr305</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>stax</groupId> 
       <artifactId>stax-api</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>org.objenesis</groupId> 
       <artifactId>objenesis</artifactId> 
      </exclusion> 
     </exclusions> 
    </dependency> 

Javaコードの概要:

SparkConf sparkConf = new SparkConf() 
    .setAppName("Converter Service") 
    .setMaster("local[*]"); 

SparkSession sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate(); 

// read input data 
Dataset<Row> events = sparkSession.read() 
    .format("json") 
    .schema(inputConfig.getSchema()) // StructType describing input schema 
    .load(inputFile.getPath()); 

// write data out 
DataFrameWriter<Row> frameWriter = events 
    .selectExpr(
     // useful if you want to change the schema before writing it to ORC, e.g. ["`col1` as `FirstName`", "`col2` as `LastName`"] 
     JavaConversions.asScalaBuffer(outputSchema.getColumns())) 
    .write() 
    .options(ImmutableMap.of("compression", "zlib")) 
    .format("orc") 
    .save(outputUri.getPath()); 

これは誰かが開始するのに役立ちます。

関連する問題