0

私は文字列メッセージを送信しているtestという名前のKafkaトピックです。その後、Spark Structured Streamingを介していくつかの条件に基づいてこれらのメッセージをフィルタリングしています。このように:RowInemptyを送信するときにSpark Structured StreamingでArrayIndexOutOfBounds例外が発生する

scala> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("startingOffsets", "earliest").option("subscribe", "test") 
df: org.apache.spark.sql.streaming.DataStreamReader = [email protected] 

scala> import org.apache.spark.sql.types._ 
import org.apache.spark.sql.types._ 

scala> val schema = StructType(StructField("message", StringType) :: Nil) 
schema: org.apache.spark.sql.types.StructType = StructType(StructField(message,StringType,true)) 

scala> val data = df.load().select(from_json(col("value").cast("string"), schema).as("value")) 
data: org.apache.spark.sql.DataFrame = [value: struct<message: string>] 

scala> import org.apache.spark.sql.catalyst.encoders.RowEncoder 
import org.apache.spark.sql.catalyst.encoders.RowEncoder 

scala> implicit val encoder = RowEncoder(schema) 
encoder: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] = class[message[0]: string] 

scala> import org.apache.spark.sql.Row 
import org.apache.spark.sql.Row 

scala> val q = data.select("value.*").map(row => if(row.getString(0) == "hello") row else Row.empty) 
q: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [message: string] 

scala> q.writeStream.outputMode("append").format("console").start() 
res0: org.apache.spark.sql.streaming.StreamingQuery = [email protected]ef3b7ac 

しかし、すぐに、私は、すなわち、条件に失敗メッセージを送信するよう、{"message":"he"}、それは誤り以下の私を与える:私はここにArrayIndexOutOfBounds例外を取得していますなぜ

scala> ------------------------------------------- 
Batch: 0 
------------------------------------------- 
17/06/10 12:07:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.lang.ArrayIndexOutOfBoundsException: 0 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:173) 
    at org.apache.spark.sql.Row$class.isNullAt(Row.scala:191) 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:165) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 
17/06/10 12:07:10 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 0 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:173) 
    at org.apache.spark.sql.Row$class.isNullAt(Row.scala:191) 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:165) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

17/06/10 12:07:10 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 
17/06/10 12:07:10 ERROR StreamExecution: Query [id = c6d64268-e8a4-4171-abb5-84ea7696833c, runId = 809b5544-7d91-4528-8ea0-05239d2690f7] terminated with error 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 0 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:173) 
    at org.apache.spark.sql.Row$class.isNullAt(Row.scala:191) 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:165) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1504) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1492) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1491) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1491) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:819) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:819) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:819) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1719) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1674) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1663) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:635) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2019) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2040) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2059) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935) 
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2820) 
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2371) 
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2371) 
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2804) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) 
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2803) 
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2371) 
    at org.apache.spark.sql.execution.streaming.ConsoleSink.addBatch(console.scala:49) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:650) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:650) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:650) 
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:57) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:649) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:299) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:288) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:288) 
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:57) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:288) 
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:284) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:202) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:173) 
    at org.apache.spark.sql.Row$class.isNullAt(Row.scala:191) 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:165) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:320) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

私が理解することはできません。データが空の場合は、空であるDataFrame/Datasetを取得する必要があります。例外ではありません。

これは正常な動作ですか?

答えて

1

ここでの問題はRow.emptyは列のない行を意味します。あなたのコードでは、Sparkに行スキーマがStructType(StructField("message", StringType) :: Nil)であると言ったので、Row.emptyを返さないでください。代わりにnull列を持つ行であるRow(null)を返す必要があります。

+0

お返事ありがとうございます!それは私の問題を解決する。 – himanshuIIITian

関連する問題