2016-12-02 9 views
3

私はスパーク1.5を使用します。DataFrame.explodeをカスタムUDFで使用して文字列を部分文字列に分割する方法?

次のように私は、データフレームA_DFを持っている:

interactionsStringある
+--------------------+--------------------+ 
|     id|  interactions| 
+--------------------+--------------------+ 
|  id1   |30439831,30447866...| 
|  id2   |37597858,34499875...| 
|  id3   |30447866,32896718...| 
|  id4   |33029476,31988037...| 
|  id5   |37663606,37627579...| 
|  id6   |37663606,37627579...| 
|  id7   |36922232,37675077...| 
|  id8   |37359529,37668820...| 
|  id9   |37675077,37707778...| 
+--------------------+--------------------+ 

を。

val splitArr = udf { (s: String) => s.split(",").map(_.trim) } 

val B_DF = A_DF.explode(splitArr($"interactions")) 

が、私は次のエラーを取得しています:私はこれは私が次のようにやろうコンマによって分割ストリングのセットに最初の分割によって interactions文字列をを爆発 にしたい

error: missing arguments for method explode in class DataFrame; 
follow this method with `_' if you want to treat it as a partially applied function A_DF.explode(splitArr($"interactions")) 

私は分かりません。

Expression of Type Array[String] does not conform to expected type TraversableOnce[A_] 

任意のアイデア:私は読み込み検査の警告を、取得していたに

val B_DF = A_DF.explode($"interactions") { case (Row(interactions: String) => 
     interactions.split(",").map(_.trim)) 
    } 

:だから私はさらに複雑なものを試してみましたか?

+1

UDFを取っていない爆発、それだけで正常な機能です。 'A_DF.explode(" interactions "、" interaction "){(s:String)=> s.split("、 ")} map(_。trim)}' – lpiepiora

答えて

2

Dataset.explodeは、Spark 2.0.0以降廃止予定です。理由がある場合を除いて、それから離れてください。あなたは警告されています。

あなたがDataFrame.explodeを使用する理由を持っている場合は、署名を参照してください。いずれの場合も

explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]): DataFrame 

explode[A <: Product](input: Column*)(f: (Row) ⇒ TraversableOnce[A])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[A]): DataFrame 

explodeは、2つのパラメータグループ、したがって最初のエラーを使用しています。

(これは2.1.0-SNAPSHOTスパークです)

scala> spark.version 
res1: String = 2.1.0-SNAPSHOT 

scala> val A_DF = Seq(("id1", "30439831,30447866")).toDF("id", "interactions") 
A_DF: org.apache.spark.sql.DataFrame = [id: string, interactions: string] 

scala> A_DF.explode(split($"interactions", ",")) 
<console>:26: error: missing argument list for method explode in class Dataset 
Unapplied methods are only converted to functions when a function type is expected. 
You can make this conversion explicit by writing `explode _` or `explode(_)(_)(_)` instead of `explode`. 
     A_DF.explode(split($"interactions", ",")) 
       ^

次のように私は2.1を使用するようexplodeの廃止についてです警告を注意してください(それを行うことができます。0-SNAPSHOT):

scala> A_DF.explode[String, String]("interactions", "parts")(_.split(",")).show 
warning: there was one deprecation warning; re-run with -deprecation for details 
+---+-----------------+--------+ 
| id|  interactions| parts| 
+---+-----------------+--------+ 
|id1|30439831,30447866|30439831| 
|id1|30439831,30447866|30447866| 
+---+-----------------+--------+ 

次のようにして、他のexplodeを使用することができます。代わりに

scala> import org.apache.spark.sql.Row 
import org.apache.spark.sql.Row 

scala> case class Interaction(id: String, part: String) 
defined class Interaction 

scala> A_DF.explode[Interaction]($"id", $"interactions") { case Row(id: String, ins: String) => ins.split(",").map { it => Interaction(id, it) } }.show 
warning: there was one deprecation warning; re-run with -deprecation for details 
+---+-----------------+---+--------+ 
| id|  interactions| id| part| 
+---+-----------------+---+--------+ 
|id1|30439831,30447866|id1|30439831| 
|id1|30439831,30447866|id1|30447866| 
+---+-----------------+---+--------+ 

使用explode functionとscaladoc(以下に引用する)に記載されるように、あなたは問題ないはずです。


代わりに、functions.explode()

を使用して列を分解することができます。次のように、その後、 explode機能を使用することができ

ds.flatMap(_.words.split(" ")) 

ds.select(explode(split('words, " ")).as("word")) 

またはflatMap()

A_DF.select($"id", explode(split('interactions, ",") as "part")) 
関連する問題