1
私はspark 2.0コードを持っています。これは.gz(テキスト)ファイルを読み込み、HIVEテーブルに書き込みます。Sparkの最初の数行をスキップ
私の知ることができますすべてのファイルの最初の2行を無視しますか?最初の2行をスキップします。
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("SparkSessionFiles")
.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate();
JavaRDD<mySchema> peopleRDD = spark.read()
.textFile("file:///app/home/emm/zipfiles/myzips/")
.javaRDD()
.map(new Function<String, mySchema>()
{
@Override
public mySchema call(String line) throws Exception
{
String[] parts = line.split(";");
mySchema mySchema = new mySchema();
mySchema.setCFIELD1 (parts[0]);
mySchema.setCFIELD2 (parts[1]);
mySchema.setCFIELD3 (parts[2]);
mySchema.setCFIELD4 (parts[3]);
mySchema.setCFIELD5 (parts[4]);
return mySchema;
}
});
// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> myDF = spark.createDataFrame(peopleRDD, mySchema.class);
myDF.createOrReplaceTempView("myView");
spark.sql("INSERT INTO myHIVEtable SELECT * from myView");
UPDATE:修正コード
ラムダは私の日食に取り組んでいません。したがって、通常のJava構文を使用しました。私は今例外を取得しています。
.....
Function2 removeHeader= new Function2<Integer, Iterator<String>, Iterator<String>>(){
public Iterator<String> call(Integer ind, Iterator<String> iterator) throws Exception {
System.out.println("ind="+ind);
if((ind==0) && iterator.hasNext()){
iterator.next();
iterator.next();
return iterator;
}else
return iterator;
}
};
JavaRDD<mySchema> peopleRDD = spark.read()
.textFile(path) //file:///app/home/emm/zipfiles/myzips/
.javaRDD()
.mapPartitionsWithIndex(removeHeader,false)
.map(new Function<String, mySchema>()
{
........
Java.util.NoSuchElementException
at java.util.LinkedList.removeFirst(LinkedList.java:268)
at java.util.LinkedList.remove(LinkedList.java:683)
at org.apache.spark.sql.execution.BufferedRowIterator.next(BufferedRowIterator.java:49)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.next(WholeStageCodegenExec.scala:374)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.next(WholeStageCodegenExec.scala:368)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
at com.comcast.emm.vodip.SparkSessionFiles.SparkSessionFiles$1.call(SparkSessionFiles.java:2480)
at com.comcast.emm.vodip.SparkSessionFiles.SparkSessionFiles$1.call(SparkSessionFiles.java:2476)
私の更新されたコードを見てください。 index == 0これはrddパーティションまたは各ファイルを参照していますか?実際には、各ファイルから最初の2行を削除したいと思います。 – AKC
あなたのコメントの後に自分の答えを更新しました。私はそれが助けて欲しい –
これはどちらが良いでしょうか?1)フィルタ変換を適用します(最初の2行にはフィルタ条件に使用できる独自の値があります)か、またはmapPartitionWithIndexを使用してRDDに結合しますか? – AKC