2016-08-04 5 views
0

ScalaのSparkデータフレームの列のすべての要素に関数を適用しようとしています。入力は、「{カウント:10}」のように見える文字列である、と私は唯一のIntの一部を返却したいのですが - この例10に私はおもちゃの例でこれを行うことができます。Sparkデータフレームの列のすべての要素にマップ関数を適用する

val x = List("{\"count\": 107}", "{\"count\": 9}", "{\"count\": 456}")  
val _list = x.map(x => x.substring(10,x.length-1).toInt) 

しかし、私は私のデータフレームにUDFを適用しようとすると、私はエラーを取得:

val getCounts: String => Int = _.substring(10,x.length-1).toInt 
import org.apache.spark.sql.functions.udf 
val myUDF = udf(getCounts) 

df.withColumn("post_shares_int", myUDF('post_shares)).show 

エラー出力:

org.apache.spark.SparkException: Task not serializable 

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2060) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) 
    at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:56) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187) 
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) 
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) 
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) 
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
.... 

これを行う方法上の任意のヘルプは非常にいただければ幸いです。

答えて

1

カスタムUDFを忘れて、以下のコメントに続いhere

df.withColumn(
    "post_shares_int", 
    regexp_extract(df("post_shares"), '^{\\w+:(\\d+)}$', 1) 
).show 

を文書化され、そのタスクのために利用できる機能、 すなわち regexp_extractはすでに存在し、それが get_json_objectどれを使用するのが最適ですjsonの文字列を解析します

df.withColumn(
    "post_shares_int", 
    get_json_object(df("post_shares"), '$.count') 
).show 
+0

あなたは正規表現を抽出していますJSONの文字列...あなたはJSONを解析するべきではありませんか? –

+0

@ cricket_007あなたは絶対に正しいです、私はそのような機能を気づいていませんでした。 – cheseaux

+0

@ Feynman27のソリューションが編集されました – cheseaux

関連する問題