2017-05-26 15 views
0

私は今、shapefileのパーサーをsparkで書くことに取り組んでいます。 NewAPIHadoopFileを利用して、元の.shpファイルからバイナリレコードを1つずつ抽出します。問題は、プログラムがローカルディスクからファイルを取得するときに動作することです。しかし、hdfsからファイルを読み込むと、DataInputStreamから取得したバイトフローはもはや元のファイルと統合されなくなりました。例外は次のとおりです。 RecordReaderでhdfsからバイナリファイルを読み込むには?

java.lang.NegativeArraySizeException 
    at ShapeFileParse.ShapeParseUtil.parseRecordPrimitiveContent(ShapeParseUtil.java:53) 
    at spatial.ShapeFileReader.nextKeyValue(ShapeFileReader.java:54) 
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1702) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
17/05/25 17:19:15 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NegativeArraySizeException 
    at ShapeFileParse.ShapeParseUtil.parseRecordPrimitiveContent(ShapeParseUtil.java:53) 
    at spatial.ShapeFileReader.nextKeyValue(ShapeFileReader.java:54) 
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1702) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

マイコードは以下の通りです:

public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
     ShapeParseUtil.initializeGeometryFactory(); 
     FileSplit fileSplit = (FileSplit)split; 
     long start = fileSplit.getStart(); 
     long end = start + fileSplit.getLength(); 
     int len = (int)fileSplit.getLength(); 
     Path filePath = fileSplit.getPath(); 
     FileSystem fileSys = filePath.getFileSystem(context.getConfiguration()); 
     FSDataInputStream inputStreamFS = fileSys.open(filePath); 
     //byte[] wholeStream = new byte[len]; 
     inputStream = new DataInputStream(inputStreamFS); 
     //IOUtils.readFully(inputStream, wholeStream, 0, len); 
     //inputStream = new DataInputStream(new ByteArrayInputStream(wholeStream)); 
     ShapeParseUtil.parseShapeFileHead(inputStream); 
    } 

そして、ここでは、私がテストしIOUtils.readFullyを使用する場合、私は

public class ShapeInputFormat extends FileInputFormat<ShapeKey, BytesWritable> { 
    public RecordReader<ShapeKey, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
     return new ShapeFileReader(split, context); 
    } 

    @Override 
    protected boolean isSplitable(JobContext context, Path filename) { 
     return false; 
    } 
} 

私FileInputFormatを設定する方法で、全体のバイト配列私は良いです。しかし、DataInputStreamを使用すると、フローが正しく処理されません。だから私はhdfs上のファイルがまだ統合されていると信じています。私のファイルは560kbしかないので、現在hdfsには1つのファイルしかありません。複数のブロックに割り当てることはできないと思います。

私はスパークするのが初めてです。これはちょっとした問題です。本当に教えてくれて本当にありがとう。

答えて

0

各レコードのバイト配列を読み込むときにread()をreadFully()に変更することで、自分で考え出しました。私は利用可能な()私はちょうど分割のサイズに等しいので、まだ少し混乱しています。しかし、inputStream.read()を呼び出すと、割り当てられた長さまで読み込めませんでした。

関連する問題