2017-04-10 4 views
0

私はスパークデータセットとして寄木細工ファイルをロードしています。クエリから新しいデータセットをクエリして作成できます。今度は、データセット(「ハッシュキー」)に新しい列を追加して値(例:md5sum(nameValue))を生成したいと思います。どうすればこれを達成できますか?スパークデータセットに列を追加してデータを変換する

public static void main(String[] args) { 

    SparkConf sparkConf = new SparkConf(); 

    sparkConf.setAppName("Hello Spark"); 
    sparkConf.setMaster("local"); 

    SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example") 
      .config("spark.master", "local").config("spark.sql.warehouse.dir", "file:///C:\\spark_warehouse") 
      .getOrCreate(); 

    Dataset<org.apache.spark.sql.Row> df = spark.read().parquet("meetup.parquet"); 
    df.show(); 

    df.createOrReplaceTempView("tmpview"); 

    Dataset<Row> namesDF = spark.sql("SELECT * FROM tmpview where name like 'Spark-%'"); 

    namesDF.show(); 

} 

出力は次のようになります。

+-------------+-----------+-----+---------+--------------------+ 
|   name|meetup_date|going|organizer|    topics| 
+-------------+-----------+-----+---------+--------------------+ 
| Spark-H20| 2016-01-01| 50|airisdata|[h2o, repeated sh...| 
| Spark-Avro| 2016-01-02| 60|airisdata| [avro, usecases]| 
|Spark-Parquet| 2016-01-03| 70|airisdata| [parquet, usecases]| 
+-------------+-----------+-----+---------+--------------------+ 

答えて

1

ちょうどあなたのクエリにMD5用スパークSQL関数を追加します。

Dataset<Row> namesDF = spark.sql("SELECT *, md5(name) as modified_name FROM tmpview where name like 'Spark-%'"); 
0
Dataset<Row> ds = sqlContext.read() 
    .format("com.databricks.spark.csv") 
    .option("inferSchema", "true") 
    .option("header", "true") 
    .option("delimiter","|") 
    .load("/home/cloudera/Desktop/data.csv"); 
ds.printSchema(); 

これを印刷する。

root 
|-- ReferenceValueSet_Id: integer (nullable = true) 
|-- ReferenceValueSet_Name: string (nullable = true) 
|-- Code_Description: string (nullable = true) 
|-- Code_Type: string (nullable = true) 
|-- Code: string (nullable = true) 
|-- CURR_FLAG: string (nullable = true) 
|-- REC_CREATE_DATE: timestamp (nullable = true) 
|-- REC_UPDATE_DATE: timestamp (nullable = true) 

Dataset<Row> df1 = ds.withColumn("Key", functions.lit(1)); 
     df1.printSchema(); 

コード上に追加した後、それは一定の値を持つ一つの列を追加します。

root 
|-- ReferenceValueSet_Id: integer (nullable = true) 
|-- ReferenceValueSet_Name: string (nullable = true) 
|-- Code_Description: string (nullable = true) 
|-- Code_Type: string (nullable = true) 
|-- Code: string (nullable = true) 
|-- CURR_FLAG: string (nullable = true) 
|-- REC_CREATE_DATE: timestamp (nullable = true) 
|-- REC_UPDATE_DATE: timestamp (nullable = true) 
|-- Key: integer (nullable = true) 

名前付きの列が表示されます。データセットにキーが追加されています。

constunt値の一部に列を追加する場合は、次のコードを使用して追加できます。

Dataset<Row> df1 = ds.withColumn("Key", functions.lit(ds.col("Code"))); 
     df1.printSchema(); 
     df1.show(); 

ここでは値が何であれCODE列に表示されます。 Keyという名前の列に追加します。

関連する問題