2017-06-12 19 views
0

S3のHiveデータから読み取り、HFilesを生成するSparkジョブを作成しました。大きなデータセットに対してSparkジョブを実行できません

このジョブは、1つのORCファイル(約190 MB)を読み取ってもうまく動作しますが、S3ディレクトリ全体、約400個のORCファイルを読み込んだときに約400 * 190 MB = 76 GBのデータこの次のエラー/スタックトレースを投げ続ける:

17/06/12 01:48:03 ERROR server.TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/importer-all.jar, byteCount=194727686, body=FileSegmentManagedBuffer{file=/tmp/importer-all.jar, offset=0, length=194727686}} to /10.211.XX.XX:39149; closing connection 
java.nio.channels.ClosedChannelException 
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source) 
17/06/12 01:48:03 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 0.0 (TID 6, ip-10-211-127-63.ap-northeast-2.compute.internal, executor 9): java.nio.channels.ClosedChannelException 
    at org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:60) 
    at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:179) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230) 
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893) 
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691) 
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:455) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) 
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) 
    at java.lang.Thread.run(Thread.java:745) 

17/06/12 01:48:03 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 0.0 (TID 541, ip-10-211-126-250.ap-northeast-2.compute.internal, executor 72, partition 6, PROCESS_LOCAL, 6680 bytes) 
17/06/12 01:48:03 ERROR server.TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/importer-all.jar, byteCount=194727686, body=FileSegmentManagedBuffer{file=/tmp/importer-all.jar, offset=0, length=194727686}} to /10.211.XX.XX:39151; closing connection 
java.nio.channels.ClosedChannelException 
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source) 

私クラスタはそれを処理するのに十分な大きさです。(これはすでに確認された)

それは40個のノードがあり、800 GB以上のメモリが利用可能、320 VCores。

そして、ここに私のJavaコードです:

protected void sparkGenerateHFiles(JavaRDD<Row> rdd) throws IOException { 
     System.out.println("In sparkGenerateHFiles...."); 
     JavaPairRDD<ImmutableBytesWritable, KeyValue> javaPairRDD = rdd.mapToPair(
      new PairFunction<Row, ImmutableBytesWritable, KeyValue>() { 
      public Tuple2<ImmutableBytesWritable, KeyValue> call(Row row) throws Exception { 
       System.out.println("running call now ...."); 
       String key = (String) row.get(0); 
       String value = (String) row.get(1); 

       ImmutableBytesWritable rowKey = new ImmutableBytesWritable(); 
       byte[] rowKeyBytes = Bytes.toBytes(key); 
       rowKey.set(rowKeyBytes); 

       KeyValue keyValue = new KeyValue(rowKeyBytes, 
        Bytes.toBytes("fam"), 
        Bytes.toBytes("qualifier"), 
        ProductJoin.newBuilder() 
         .setId(key) 
         .setSolrJson(value) 
         .build().toByteArray()); 

       return new Tuple2<ImmutableBytesWritable, KeyValue>(rowKey, keyValue); 
      } 
     }); 
     Partitioner partitioner = new IntPartitioner(2); 
     // repartition and sort the data - HFiles want sorted data 
     JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRDD = 
      javaPairRDD.repartitionAndSortWithinPartitions(partitioner); 


     Configuration baseConf = HBaseConfiguration.create(); 
     Configuration conf = new Configuration(); 
     conf.set(HBASE_ZOOKEEPER_QUORUM, importerParams.zkQuorum); 
     Job job = new Job(baseConf, "map data"); 
     HTable table = new HTable(conf, importerParams.hbaseTargetTable); 
     System.out.println("gpt table: " + table.getName()); 
     HFileOutputFormat2.configureIncrementalLoad(job, table); 
     System.out.println("Done configuring incremental load...."); 

     Configuration config = job.getConfiguration(); 


     repartitionedRDD.saveAsNewAPIHadoopFile(
      "HFILE_OUTPUT_PATH", 
      ImmutableBytesWritable.class, 
      KeyValue.class, 
      HFileOutputFormat2.class, 
      config 
      ); 
     System.out.println("Saved to HFILE_OUTPUT_PATH: " + HFILE_OUTPUT_PATH); 
    } 

protected JavaRDD<Row> readJsonTable() { 
     System.out.println("In readJsonTable....."); 
     SparkSession.Builder builder = SparkSession.builder().appName("Importer"); 
     String hiveTable = ""; 
     if (importerParams.local) { 
      builder.master("local"); 
      hiveTable = HIVE_TABLE_S3A_DEV_SAMPLE; 
     } else { 
      hiveTable = importerParams.hiveSourceTable; 
     } 
     SparkSession spark = builder.getOrCreate(); 

     SparkContext sparkContext = spark.sparkContext(); 

     // this is important. need to set the endpoint to ap-northeast-2 
     sparkContext.hadoopConfiguration() 
      .set("fs.s3a.endpoint", "s3.ap-northeast-2.amazonaws.com"); 

     Dataset<Row> rows = null; 
     if (importerParams.local) { 
      rows = spark.read().format("orc").load(hiveTable); 
     } else { 
      rows = spark.read().format("orc").load(hiveTable);//use this one temporarily 
//   rows = spark.read().format("orc").load(HIVE_TABLE_S3A_PREFIX 
      // + importerParams.latestDateHour); 
     } 
     System.out.println("Finished loading hive table from S3, rows.count() = " 
      + (rows != null ? rows.count() : 0)); 

     return rows.toJavaRDD(); 
    } 

メインプログラム:

私はStackOverflowの上で最も近いものpostを見た:私が試した何

 long startTime = System.currentTimeMillis(); 
     JavaRDD<Row> rdd = readJsonTable(); 

     sparkGenerateHFiles(rdd); 
     System.out.println("it took " + (System.currentTimeMillis() - startTime)/1000 + " seconds to generate HFiles...\n\n\n\n"); 

が。 それから私はこれを設定しました builder.config("spark.shuffle.blockTransferService", "nio"); まだ運がありません。

ご協力いただきありがとうございます。

+1

データが歪んで例外が発生した可能性があります。 – Wang

答えて

0

@Wangが指摘したように、それは実際に私のデータが歪んでいる問題のためです。この問題を解決するための

、私がやったことです:私は私のHBaseのテーブルを再作成

が、今回、私がSPLITSを使用し、80個の領域に私のHBaseのテーブルを分割します。 私のSparkコードでは、カスタマイズされたPartitionerを書いて、そのキーに基づいて各エントリを再分割しました。HOTSPOTTINGは発行されません。つまり、1つのリージョンサーバーが過負荷になっていて、

他のいくつかのトリックがあるように正しいことを行うようにしてください、デフォルトでは、HBaseのテーブルを作成するために、SPLITSを使用した場合、第1の領域と最後地域のendkeystartkeyが空の文字列""で、道に沿って学びましたあまりにも怒らないでください。

私のpartitionerの実例があります。

ありがとうございます!

+0

これはビッグデータの非常に一般的な問題です。ロングテールの分布を避けるために良いキーを見つけることができます。 – tk421

関連する問題