2017-12-01 5 views
2

Sparkストリーミングデータセット(構造化ストリーミング)をピボットしようとしていますが、AnalysisException(以下の抜粋)があります。ストリーミングデータセットをピボットする方法は?

誰かが、構造化されたストリーム(スパーク2.0)では実際にはピボットがサポートされていないことを確認できますか?スレッド「メイン」org.apache.spark.sql.AnalysisExceptionで

例外:ストリーミングソースとクエリはwriteStream.start(で実行されなければならない);; kafka at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ apache $ spark $ sql $ catalyst $ analysis $ UnsupportedOperationChecker $$ throwError(UnsupportedOperationChecker.scala:297) at org.apache.spark。 (UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply(UnsupportedOperationChecker.scala: 34)org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scalaで :127)

答えて

0

TL; DRpivot集合であります2.2.0までのSpark Structured Streamingではサポートされていません(まだ2.3.0-SNAPSHOTではサポートされていないようです)。

私は今日マスターから構築されたSpark 2.3.0-SNAPSHOTを使用します。

scala> spark.version 
res0: String = 2.3.0-SNAPSHOT 

UnsupportedOperationCheckerに(あなたがスタックトレースに見つけることができる)ストリーミングクエリのみがサポートされている操作を使用しています(論理的計画)かどうかをチェックします。

pivotを実行するときには、pivotを利用できる唯一のインターフェイスとして、groupByを最初に指定する必要がありました。

pivot 2つの問題があります。

  1. pivotは、の値を生成するために、どのように多くの列を知りたがっているので、ストリーミングデータセットでは不可能であるcollectを行います。

  2. pivotストリーミングストラクチャードそのスパークが

がの定義に旋回するようにカラムのない問題1を見てみましょうサポートしていません(実際groupBy横に)別の集合体です。

val sq = spark 
    .readStream 
    .format("rate") 
    .load 
    .groupBy("value") 
    .pivot("timestamp") // <-- pivot with no values 
    .count 
    .writeStream 
    .format("console") 
scala> sq.start 
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; 
rate 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:351) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35) 
    at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:64) 
    at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:75) 
    at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:73) 
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:79) 
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:85) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:81) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:90) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:90) 
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3189) 
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2665) 
    at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:327) 
    ... 49 elided 

最後の2行は問題、すなわちpivotdoesカバー下collectひいては問題を示しています。

他の問題は、あなたがあなたに旋回するように列の値を指定したいにもかかわらず、その後に起因multiple aggregationsに他の問題を取得したい(とあなたが起こっているとして、それは実際にstreamingないbatchのチェックだと見ることができるということです最初のケースで)。

val sq = spark 
    .readStream 
    .format("rate") 
    .load 
    .groupBy("value") 
    .pivot("timestamp", Seq(1)) // <-- pivot with explicit values 
    .count 
    .writeStream 
    .format("console") 
scala> sq.start 
org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;; 
Project [value#128L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#141[0] AS 1#142L] 
+- Aggregate [value#128L], [value#128L, pivotfirst(timestamp#127, count(1) AS `count`#137L, 1000000, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#141] 
    +- Aggregate [value#128L, timestamp#127], [value#128L, timestamp#127, count(1) AS count(1) AS `count`#137L] 
     +- StreamingRelation DataSource([email protected],rate,List(),None,List(),None,Map(),None), rate, [timestamp#127, value#128L] 

    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:351) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:92) 
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) 
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) 
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:278) 
    ... 49 elided 
関連する問題