2017-05-29 4 views
1

私は、次のネストされたJSONオブジェクト(cellsDF)記述ポリゴン(したがって、スカラ座やスパークを使用してそれらの多くを)持っている:配列でネストされたJSON構造体を受け入れるユーザ定義関数の入力パラメータの型は何ですか?

root 
|-- geometry: struct (nullable = true) 
| |-- coordinates: array (nullable = true) 
| | |-- element: array (containsNull = true) 
| | | |-- element: array (containsNull = true) 
| | | | |-- element: string (containsNull = true) 
| |-- type: string (nullable = true) 
|-- properties: struct (nullable = true) 
| |-- Cell: string (nullable = true) 
| |-- SignalStyrka: long (nullable = true) 
|-- type: string (nullable = true) 

例ラインは、私はそれらを見つけるしたい

{ "type": "Feature", "properties": { "SignalStyrka": -82, "Cell": " 112" }, "geometry": { "type": "Polygon", "coordinates": [ [ [ 1292600.0, 6246350.0 ], [ 1292600.0, 6246400.0 ], [ 1292550.0, 6246400.0 ], [ 1292550.0, 6246350.0 ], [ 1292600.0, 6246350.0 ] ] ] } } 

です与えられた点を含む多角形。私はこれを見つけるためにスカラーでUDFを書いてきましたが、SparkはこのネストされたJSONでUDFを実行する方法が気に入らないようです。 inPolygon UDFはまだ書かれていないことに注意してください。概念全体がUDFで動作するかどうかをテストしたいだけです。 candidateCellsは事前に定義されており、コードのその部分が機能しています。 私はSOからいくつかの提案を試みました(例えば、爆発)。私はそれをPythonで動作させることができましたが、パフォーマンスには満足できませんでした。どんなサポートも高く評価されます。

val cellsDF = spark.read.json("s3n://coverage-vectors/20170509/*.json.gz") 
cellsDF: org.apache.spark.sql.DataFrame = [geometry: struct<coordinates: array<array<array<string>>>, type: string>, properties: struct<Cell: string, SignalStyrka: bigint> ... 1 more field] 

def isCandidate(cell: String): Boolean = { 
    candidateCells contains cell 
} 

def inPolygon(coordinates: Array[Array[Array[String]]]): Boolean = { 
    coordinates.isEmpty 

} 

import org.apache.spark.sql.functions.udf 
val udfCandidate = udf(isCandidate _) 
val udfInPolygon = udf(inPolygon _) 

cellsDF.filter(udfCandidate($"properties.Cell")).filter(udfInPolygon($"geometry.coordinates")).count() 

isCandidate: (cell: String)Boolean 
inPolygon: (coordinates: Array[Array[Array[String]]])Boolean 
import org.apache.spark.sql.functions.udf 
udfCandidate: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,Some(List(StringType))) 
udfInPolygon: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,Some(List(ArrayType(ArrayType(ArrayType(StringType,true),true),true)))) 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 4 times, most recent failure: Lost task 0.3 in stage 69.0 (TID 179, ip-172-31-12-172.eu-west-1.compute.internal, executor 3): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<array<array<string>>>) => boolean) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [[[Ljava.lang.String; 
    at $anonfun$1.apply(<console>:36) 
    ... 13 more 
+0

'[Array [Array [String]]]'を 'Array [Array [collection.mutable.WrappedArray [String]]]' –

+0

YESに変更してみてください! Txたくさん!すべての配列は 'collection.mutable.WrappedArray'に変更する必要があります。 – user2180171

+0

それは助けて嬉しい:) –

答えて

1

変更inPolygonの署名がSeq[Seq[Seq[String]]]を受け入れることとすれば完了です。

scala> in.printSchema 
root 
|-- id: integer (nullable = false) 
|-- coordinates: array (nullable = true) 
| |-- element: array (containsNull = true) 
| | |-- element: array (containsNull = true) 
| | | |-- element: string (containsNull = true) 

val myUDF = udf { coordinates: Seq[Seq[Seq[String]]] => 1 } 
scala> in.select(myUDF($"coordinates")).show 
+----------------+ 
|UDF(coordinates)| 
+----------------+ 
|    1| 
+----------------+ 

あなたは非常に低レベルCatalystTypeConverters.getConverterForTypeで明らかにArrayConverterを使用して変換を見ることができます。

関連する問題