2016-11-02 13 views
2

スパークは、rdd.saveAsObjectFile("file")のファイルにrddを保存することができます。 私はこのファイルをSpark外で読む必要があります。 docによれば、デフォルトのsparkシリアライザを使用すると、このファイルは、標準のJavaシリアル化でシリアル化されたオブジェクトのシーケンスに過ぎません。しかし、私は、ファイルのヘッダーとオブジェクト間の区切りを持っていると思います。私はこのファイルを読んで、jdeserializeを使って(私はクラス定義がないので)各Java/Scalaオブジェクトを逆シリアル化する必要があります。spark rdd.saveAsObjectFileのファイル形式に関するドキュメント

rdd.saveAsObjectFile("file")(Kryoシリアライザではない標準のシリアライザを使用)で作成されたファイル形式に関するドキュメントはどこにありますか?


更新 VladoDemcakの回答に基づいて作業する例:

import org.apache.hadoop.io._ 
import org.apache.hadoop.conf._ 
import org.apache.hadoop.fs._ 
import org.apache.hadoop.io._ 

def deserialize(data: Array[Byte]) = 
    new ObjectInputStream(new ByteArrayInputStream(data)).readObject() 

val path = new Path("/tmp/part-00000") 
val config = new Configuration() 
val reader = new SequenceFile.Reader(FileSystem.get(new Configuration()), path, config) 
val key = NullWritable.get 
val value = new BytesWritable 

while (reader.next(key, value)) { 
    println("key: {} and value: {}.", key, value.getBytes) 
    println(deserialize(value.getBytes())) 
} 
reader.close() 
+1

https://gist.github.com/dportabella/dd8886ebb8d5f0eddd1196e1c30e34f6 –

答えて

1

それはので、私は、私はこのスタッフについて知っているかを説明しようとする非常に興味深い質問です。あなたは、私がsaveAsObjectFileSequenceFileを生成知っているように、私はそう

is API javadocいくつかの詳細について
/** 
    * Save this RDD as a SequenceFile of serialized objects. 
    */ 
    def saveAsObjectFile(path: String): Unit = withScope { 
    this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) 
     .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) 
     .saveAsSequenceFile(path) 
    } 

を見saveAsObjectFileのみマニュアルを確認することができます。

非圧縮のキー/値の記録:そして、それはversionとヘッダたsequenceFileのドキュメント、classnamemetadata ...

に基づいて3種類のSequenceFile形式があります。圧縮されたキー/値レコードを記録する - ここでは「値」のみが圧縮されます。ブロック圧縮されたキー/値レコード - キーと値の両方が「ブロック」で別々に収集され、圧縮されます。 「ブロック」のサイズは設定可能です。

上記のフォーマットはすべて、共通のヘッダー(適切なキーと値のペアを返すために SequenceFile.Readerによって使用されます)を共有します。

シーケンスファイルを読むために、hadoop SequenceFile.Readerの実装を使用できます。

Path path = new Path("/hdfs/file/path/seqfile"); 
SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(new Configuration()), path, config); 
WritableComparable key = (WritableComparable) reader.getKeyClass().newInstance(); 
Writable value = (Writable) reader.getValueClass().newInstance(); 

while (reader.next(key, value)){ 
    logger.info("key: {} and value: {}.", key, value.getBytes()); 
    // (MyObject) deserialize(value.getBytes()); 
} 

reader.close(); 

私はこれをテストしていますが、あなたの質問に気づいdocリンクをもとにしていない:

デフォルトでは、スパークは、あなたがループの中でとてもJavaのObjectOutputStreamの 枠組み

を使用してオブジェクトをシリアライズ値のバイトを取得してデシリアライズすることができます

public static Object deserialize(byte[] data){ 
    return new ObjectInputStream(new ByteArrayInputStream(data)).readObject(); 
} 

あなたのケースでは、あなたがデシリアライズ方法であなたのライブラリー(jdeserialize)を使用する必要がある - 私はrun(InputStream is, boolean shouldConnect)を推測するなど

+0

偉大な、私はこの明日テストします朝。どうも! –

+0

'reader.getKeyClass()を呼び出すと、ランタイムエラーが発生します。newInstance() '、更新された私の質問を参照してください。何か案が? –

+1

それは動作します。私はコードで質問を更新しました。ありがとう! –

関連する問題