2017-11-08 14 views
1

SparkデータフレームをRDD [行]に変換するので、最終的なスキーマにマップしてHive Orcテーブルに書き込むことができます。私は入力内の任意のスペースを実際のnullに変換したいので、ハイブテーブルは空の文字列の代わりに実際にnullを保存することができます。Dataframe to RDD [行]空白をヌルに置き換える

入力データフレーム(パイプ区切りの値を持つ単一の列):

col1 
1|2|3||5|6|7|||...| 

マイコード:

inputDF.rdd. 
    map { x: Row => x.get(0).asInstanceOf[String].split("\\|", -1)}. 
    map { x => Row (nullConverter(x(0)),nullConverter(x(1)),nullConverter(x(2)).... nullConverter(x(200)))} 


def nullConverter(input: String): String = { 
    if (input.trim.length > 0) input.trim 
    else null 
} 

はそれをやってというよりもnullConverter機能200回の呼び出しのいずれかのきれいな方法はあります。単一の列に基づいて

答えて

2

更新:あなたのnullConverterまたは任意の他のロジックのUDFを作成し

​​

はあなたのアプローチで行く、私は次のように行います

import org.apache.spark.sql.functions._ 
val nullConverter = udf((input: String) => { 
    if (input.trim.length > 0) input.trim 
    else null 
}) 

dfでudfを使用し、すべての列に適用:

val convertedDf = inputDf.select(inputDf.columns.map(c => nullConverter(col(c)).alias(c)):_*) 

これで、RDDロジックを実行できます。

+0

をRDDするデータフレームを変換する際に、私は1つの列のみを有し、それはパイプ区切り形式の値を有しても意味がありません。空白を値の一部としてNULLに変換したい。 – Nats

+0

@Nats - 更新 – manojlds

+0

ありがとうございました!それは助ける – Nats

1

これは、RDDに変換する前にDataFrame APIを使用する方が簡単です。 、配列の各要素のために、今すぐ

val numCols = df2.first.getAs[Seq[String]](0).length 

nullConverter UDF使用し、それ自身の列に割り当てます。

val df = Seq(("1|2|3||5|6|7|8||")).toDF("col0")  // Example dataframe 
val df2 = df.withColumn("col0", split($"col0", "\\|")) // Split on "|" 

は、配列の長さを見つける:まず、データを分割。

val nullConverter = udf((input: String) => { 
    if (input.trim.length > 0) input.trim 
    else null 
}) 

val df3 = df2.select((0 until numCols).map(i => nullConverter($"col0".getItem(i)).as("col" + i)): _*) 

例のデータフレームを使用して結果:

+----+----+----+----+----+----+----+----+----+----+ 
|col0|col1|col2|col3|col4|col5|col6|col7|col8|col9| 
+----+----+----+----+----+----+----+----+----+----+ 
| 1| 2| 3|null| 5| 6| 7| 8|null|null| 
+----+----+----+----+----+----+----+----+----+----+ 

今RDDに変換したり、ニーズに応じて、データフレームとしてデータを使用し続けます。

+0

ありがとう!助けになる – Nats

0

import org.apache.spark.sql.functions._ 

df = sc.parallelize([ 
    (1, "foo bar"), (2, "foobar "), (3, " ") 
]).toDF(["k", "v"]) 

df.select(regexp_replace(col("*"), " ", "NULL")) 
関連する問題