2017-08-28 12 views
0

私はSparkを初めて使用しています。私はローカルモード(ウィンドウ)でスパークのJavaを使用して寄木細工のCSVファイルを保存しようとしています。私はこのエラーがある。spark javaにパーケットファイルとして保存

はによって引き起こさ:org.apache.spark.SparkException:書き込み行は

私は他のスレッドと無効スパーク憶測を呼びながら、タスクが失敗した

セット( "spark.speculation"、 "偽")

まだエラーが発生します。私はcsvのテスト用の2つのカラムだけを使用していますが、まだこの問題に陥っています。

入力:

portfolio_id;portfolio_code 
1000042;CHNTESTPF04 
1000042;CHNTESTPF04 
1000042;CHNTESTPF04 
1000042;CHNTESTPF04 
1000042;CHNTESTPF04 
1000042;CHNTESTPF04 
1000042;CHNTESTPF04 

マイコード:

JavaPairRDD<Integer, String> rowJavaRDD = pairRDD.mapToPair(new PairFunction<String, Integer, String>() { 
    private Double[] splitStringtoDoubles(String s){ 
     String[] splitVals = s.split(";"); 
     Double[] vals = new Double[splitVals.length]; 
     for(int i= 0; i < splitVals.length; i++){ 
      vals[i] = Double.parseDouble(splitVals[i]); 
     } 
     return vals; 
    } 

    @Override 
    public Tuple2<Integer, String> call(String arg0) throws Exception { 
     // TODO Auto-generated method stub 
     return null; 
    } 
}); 


SQLContext SQLContext; 
SQLContext = new org.apache.spark.sql.SQLContext(sc); 

Dataset<Row> fundDF = SQLContext.createDataFrame(rowJavaRDD.values(), funds.class); 
fundDF.printSchema(); 

fundDF.write().parquet("C:/test"); 

親切に私はここに欠けているものを助けます。

+0

は、あなたの質問に完全なエラーとスタックトレースをしてください置く:この

を掲示が遅れて申し訳ありません私はこのようなTuple2における機能の分割を()追加することで解決エラーが発生しました。 – Tim

+0

私はこのようなTuple2()関数の分割を追加することで解決エラー得ます。public void実行(文字列トン、文字列をU){ \t公共Tuple2 <文字列、文字列>コール(文字列のREC){ \t文字列[]トークン= rec.split( ";"); \t String [] vals = new String [tokens.length];\t for(int i = 0; i (トークン[0]、トークン[1]); \t}}); – Ans8

+0

@ Ans8答えに解答を入れて受け入れてください。答えが返ってこないので、解答しません。 –

答えて

0

1)ロードCSVのための私のJava /スパークコードはインドDataSetが

import org.apache.spark.sql.Dataset; 
import org.apache.spark.sql.SparkSession; 

SparkSession spark = SparkSession 
        .builder() 
        .appName("csv2parquet") 
        .config("spark.sql.warehouse.dir", "/file:/tmp") 
        .master("local") 
        .getOrCreate(); 

final String dir = "test.csv"; 


Dataset<Row> ds = spark.read().option("header", true).option("inferSchema", true).csv(dir); 

final String parquetFile = "test.parquet"; 
final String codec = "parquet"; 

ds.write().format(codec).save(parquetFile); 

spark.stop(); 

がここに答えるあなたのポンポン

<dependency> 
       <groupId>org.apache.hadoop</groupId> 
       <artifactId>hadoop-mapreduce-client-core</artifactId> 
       <version>2.8.1</version> 
</dependency> 
1

にこれを追加寄木細工して保存)2データセット スパーク見つけてください、そうそれは@Glennie Helles Sindholtの言うように未回答のセクションから外れます。

public void run(String t, String u) 

    { 

    public Tuple2<String,String> call(String rec){ 
     String[] tokens = rec.split(";"); 
     String[] vals = new String[tokens.length]; 
     for(int i= 0; i < tokens.length; i++) 
     { 
      vals[i] =tokens[i]; 
     } 

     return new Tuple2<String, String>(tokens[0], tokens[1]); 

    } 
    }); 
関連する問題