2017-03-22 7 views
1

私はspark 2.0コードを持っています。これは.gz(テキスト)ファイルを読み込み、HIVEテーブルに書き込みます。Sparkの最初の数行をスキップ

私の知ることができますすべてのファイルの最初の2行を無視しますか?最初の2行をスキップします。

SparkSession spark = SparkSession 
      .builder() 
      .master("local") 
       .appName("SparkSessionFiles") 
       .config("spark.some.config.option", "some-value") 
       .enableHiveSupport() 
       .getOrCreate(); 

    JavaRDD<mySchema> peopleRDD = spark.read() 
     .textFile("file:///app/home/emm/zipfiles/myzips/") 
     .javaRDD() 
     .map(new Function<String, mySchema>() 
     { 
      @Override 
      public mySchema call(String line) throws Exception 
       { 

        String[] parts = line.split(";"); 
        mySchema mySchema = new mySchema(); 

        mySchema.setCFIELD1  (parts[0]); 

        mySchema.setCFIELD2  (parts[1]); 
        mySchema.setCFIELD3  (parts[2]); 
        mySchema.setCFIELD4  (parts[3]); 
        mySchema.setCFIELD5  (parts[4]); 
       return mySchema; 

        } 
     }); 

// Apply a schema to an RDD of JavaBeans to get a DataFrame 
    Dataset<Row> myDF = spark.createDataFrame(peopleRDD, mySchema.class); 

    myDF.createOrReplaceTempView("myView"); 

    spark.sql("INSERT INTO myHIVEtable SELECT * from myView"); 

UPDATE:修正コード

ラムダは私の日食に取り組んでいません。したがって、通常のJava構文を使用しました。私は今例外を取得しています。

..... 
    Function2 removeHeader= new Function2<Integer, Iterator<String>, Iterator<String>>(){ 
     public Iterator<String> call(Integer ind, Iterator<String> iterator) throws Exception { 
      System.out.println("ind="+ind); 
      if((ind==0) && iterator.hasNext()){ 
       iterator.next(); 
       iterator.next(); 
       return iterator; 
      }else 
       return iterator; 
     } 
    }; 

JavaRDD<mySchema> peopleRDD = spark.read() 
    .textFile(path) //file:///app/home/emm/zipfiles/myzips/ 
    .javaRDD() 
    .mapPartitionsWithIndex(removeHeader,false) 
    .map(new Function<String, mySchema>() 
    { 
    ........ 


Java.util.NoSuchElementException 
     at java.util.LinkedList.removeFirst(LinkedList.java:268) 
     at java.util.LinkedList.remove(LinkedList.java:683) 
     at org.apache.spark.sql.execution.BufferedRowIterator.next(BufferedRowIterator.java:49) 
     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.next(WholeStageCodegenExec.scala:374) 
     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.next(WholeStageCodegenExec.scala:368) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
     at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31) 
     at com.comcast.emm.vodip.SparkSessionFiles.SparkSessionFiles$1.call(SparkSessionFiles.java:2480) 
     at com.comcast.emm.vodip.SparkSessionFiles.SparkSessionFiles$1.call(SparkSessionFiles.java:2476) 

答えて

1

あなたはそのようなこと行うことができます:Scalaで

JavaRDD<mySchema> peopleRDD = spark.read() 
    .textFile("file:///app/home/emm/zipfiles/myzips/") 
    .javaRDD() 
    .mapPartitionsWithIndex((index, iter) -> { 
       if (index == 0 && iter.hasNext()) { 
        iter.next(); 
        if (iter.hasNext()) { 
         iter.next(); 
        } 
       } 
    return iter; 
    }, true); 
    ... 

を、それは構文が単純です。たとえば、次のように

rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(2) else iter } 

EDIT:

私は例外を避けるために、コードを変更しました。

このコードでは、すべてのファイルではなく、RDDの最初の2行だけが削除されます。

すべてのファイルの最初の2行を削除する場合は、ファイルごとにRDDを実行し、各RDDに.mapPartitionWithIndex(...)を適用してから、各RDDのunionを実行することをおすすめします。

+0

私の更新されたコードを見てください。 index == 0これはrddパーティションまたは各ファイルを参照していますか?実際には、各ファイルから最初の2行を削除したいと思います。 – AKC

+0

あなたのコメントの後に自分の答えを更新しました。私はそれが助けて欲しい –

+0

これはどちらが良いでしょうか?1)フィルタ変換を適用します(最初の2行にはフィルタ条件に使用できる独自の値があります)か、またはmapPartitionWithIndexを使用してRDDに結合しますか? – AKC

関連する問題