TL; DRpivot
集合であります2.2.0までのSpark Structured Streamingではサポートされていません(まだ2.3.0-SNAPSHOTではサポートされていないようです)。
私は今日マスターから構築されたSpark 2.3.0-SNAPSHOTを使用します。
scala> spark.version
res0: String = 2.3.0-SNAPSHOT
UnsupportedOperationCheckerに(あなたがスタックトレースに見つけることができる)ストリーミングクエリのみがサポートされている操作を使用しています(論理的計画)かどうかをチェックします。
pivot
を実行するときには、pivot
を利用できる唯一のインターフェイスとして、groupBy
を最初に指定する必要がありました。
pivot
2つの問題があります。
pivot
は、の値を生成するために、どのように多くの列を知りたがっているので、ストリーミングデータセットでは不可能であるcollect
を行います。
pivot
ストリーミングストラクチャードそのスパークが
がの定義に旋回するようにカラムのない問題1を見てみましょうサポートしていません(実際groupBy
横に)別の集合体です。
val sq = spark
.readStream
.format("rate")
.load
.groupBy("value")
.pivot("timestamp") // <-- pivot with no values
.count
.writeStream
.format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
rate
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:351)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:90)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:90)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3189)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2665)
at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:327)
... 49 elided
最後の2行は問題、すなわちpivot
doesカバー下collect
ひいては問題を示しています。
他の問題は、あなたがあなたに旋回するように列の値を指定したいにもかかわらず、その後に起因multiple aggregationsに他の問題を取得したい(とあなたが起こっているとして、それは実際にstreamingないbatchのチェックだと見ることができるということです最初のケースで)。
val sq = spark
.readStream
.format("rate")
.load
.groupBy("value")
.pivot("timestamp", Seq(1)) // <-- pivot with explicit values
.count
.writeStream
.format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
Project [value#128L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#141[0] AS 1#142L]
+- Aggregate [value#128L], [value#128L, pivotfirst(timestamp#127, count(1) AS `count`#137L, 1000000, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#141]
+- Aggregate [value#128L, timestamp#127], [value#128L, timestamp#127, count(1) AS count(1) AS `count`#137L]
+- StreamingRelation DataSource([email protected],rate,List(),None,List(),None,Map(),None), rate, [timestamp#127, value#128L]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:351)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:92)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:278)
... 49 elided