2017-03-23 3 views
0

私が使用しているスパークバージョンは2.0+ です。これは、パイプ(|)で区切られた値ファイルをDataframeに読み込んだ後、SQLのようなクエリを実行することだけです。私はカンマで区切られたファイルも試しました。 私はspark-shellを使ってsparkと対話しています spark-csv jarをダウンロードし、spark-shellを--packagesオプションで実行してセッションにインポートしました。それは正常にインポートされました。spark psvファイルからデータフレームへの変換エラー

import spark.implicits._ 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql._ 
val session = 
SparkSession.builder().appName("test").master("local").getOrCreate() 
    val df = session.read.format("com.databricks.spark.csv").option("header", "true").option("mode", "DROPMALFORMED").load("testdata.txt"); 

WARN Hive: Failed to access metastore. This class should not accessed in runtime. 
apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hi 
at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1236) 
at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174) 
at org.apache.hadoop.hive.ql.metadata.Hive.<clinit>(Hive.java:166) 
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503) 
at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:171) 
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) 
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) 
at java.lang.reflect.Constructor.newInstance(Unknown Source) 
at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:258) 
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:359) 
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:263) 
at org.apache.spark.sql.hive.HiveSharedState.metadataHive$lzycompute(HiveSharedState.scala:39) 

答えて

0

psvファイルをRDDに直接ロードし、必要に応じて分割してスキーマを適用することができます。これはJavaの例です。

import org.apache.spark.sql.SparkSession; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.StructField; 
import org.apache.spark.sql.types.StructType; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.sql.Dataset; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.RowFactory; 

public class RDDtoDF_Update { 
    public static void main(final String[] args) throws Exception { 

     SparkSession spark = SparkSession 
       .builder() 
       .appName("RDDtoDF_Updated") 
       .master("local[2]") 
       .config("spark.some.config.option", "some-value") 
       .getOrCreate(); 

     StructType schema = DataTypes 
       .createStructType(new StructField[] { 
         DataTypes.createStructField("eid", DataTypes.IntegerType, false), 
         DataTypes.createStructField("eName", DataTypes.StringType, false), 
         DataTypes.createStructField("eAge", DataTypes.IntegerType, true), 
         DataTypes.createStructField("eDept", DataTypes.IntegerType, true), 
         DataTypes.createStructField("eSal", DataTypes.IntegerType, true), 
         DataTypes.createStructField("eGen", DataTypes.StringType,true)}); 


     String filepath = "F:/Hadoop/Data/EMPData.txt"; 
     JavaRDD<Row> empRDD = spark.read() 
       .textFile(filepath) 
       .javaRDD() 
       .map(line -> line.split("\t")) 
       .map(r -> RowFactory.create(Integer.parseInt(r[0]), r[1].trim(),Integer.parseInt(r[2]), 
         Integer.parseInt(r[3]),Integer.parseInt(r[4]),r[5].trim())); 


     Dataset<Row> empDF = spark.createDataFrame(empRDD, schema); 
     empDF.groupBy("edept").max("esal").show(); 

ありがとうございます。

+0

psvファイルをDataframeに直接ロードするというアイデアは、SQL上でクエリを実行できるようにするためです。私はRDDとして読み込み、解析してからデータフレームに変換することができますが、データフレームに直接インポートしたいのですが、なぜそうではないのでしょうか?必要な前処理がなく、データがパイプで区切られている場合。 – jane

関連する問題