私は、次のネストされたJSONオブジェクト(cellsDF
)記述ポリゴン(したがって、スカラ座やスパークを使用してそれらの多くを)持っている:配列でネストされたJSON構造体を受け入れるユーザ定義関数の入力パラメータの型は何ですか?
root
|-- geometry: struct (nullable = true)
| |-- coordinates: array (nullable = true)
| | |-- element: array (containsNull = true)
| | | |-- element: array (containsNull = true)
| | | | |-- element: string (containsNull = true)
| |-- type: string (nullable = true)
|-- properties: struct (nullable = true)
| |-- Cell: string (nullable = true)
| |-- SignalStyrka: long (nullable = true)
|-- type: string (nullable = true)
例ラインは、私はそれらを見つけるしたい
{ "type": "Feature", "properties": { "SignalStyrka": -82, "Cell": " 112" }, "geometry": { "type": "Polygon", "coordinates": [ [ [ 1292600.0, 6246350.0 ], [ 1292600.0, 6246400.0 ], [ 1292550.0, 6246400.0 ], [ 1292550.0, 6246350.0 ], [ 1292600.0, 6246350.0 ] ] ] } }
です与えられた点を含む多角形。私はこれを見つけるためにスカラーでUDFを書いてきましたが、SparkはこのネストされたJSONでUDFを実行する方法が気に入らないようです。 inPolygon UDFはまだ書かれていないことに注意してください。概念全体がUDFで動作するかどうかをテストしたいだけです。 candidateCellsは事前に定義されており、コードのその部分が機能しています。 私はSOからいくつかの提案を試みました(例えば、爆発)。私はそれをPythonで動作させることができましたが、パフォーマンスには満足できませんでした。どんなサポートも高く評価されます。
val cellsDF = spark.read.json("s3n://coverage-vectors/20170509/*.json.gz")
cellsDF: org.apache.spark.sql.DataFrame = [geometry: struct<coordinates: array<array<array<string>>>, type: string>, properties: struct<Cell: string, SignalStyrka: bigint> ... 1 more field]
def isCandidate(cell: String): Boolean = {
candidateCells contains cell
}
def inPolygon(coordinates: Array[Array[Array[String]]]): Boolean = {
coordinates.isEmpty
}
import org.apache.spark.sql.functions.udf
val udfCandidate = udf(isCandidate _)
val udfInPolygon = udf(inPolygon _)
cellsDF.filter(udfCandidate($"properties.Cell")).filter(udfInPolygon($"geometry.coordinates")).count()
isCandidate: (cell: String)Boolean
inPolygon: (coordinates: Array[Array[Array[String]]])Boolean
import org.apache.spark.sql.functions.udf
udfCandidate: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,Some(List(StringType)))
udfInPolygon: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,Some(List(ArrayType(ArrayType(ArrayType(StringType,true),true),true))))
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 4 times, most recent failure: Lost task 0.3 in stage 69.0 (TID 179, ip-172-31-12-172.eu-west-1.compute.internal, executor 3): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<array<array<string>>>) => boolean)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [[[Ljava.lang.String;
at $anonfun$1.apply(<console>:36)
... 13 more
'[Array [Array [String]]]'を 'Array [Array [collection.mutable.WrappedArray [String]]]' –
YESに変更してみてください! Txたくさん!すべての配列は 'collection.mutable.WrappedArray'に変更する必要があります。 – user2180171
それは助けて嬉しい:) –