同じCassandraテーブルに書き込む複数のデータフロー(DStreams)を持つSpark Streamingアプリケーションがあります。大量のランダムデータでアプリケーションをテストすると、デバッグに役立つ情報がほとんどないSpark Cassandra Connectorのエラーが表示されます。エラーは次のようになります。Spark Cassandra Connectorでの不正なクエリのエラー処理
java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.InvalidQueryException: Key may not be empty
at com.baynote.shaded.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
at com.baynote.shaded.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
at com.baynote.shaded.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at com.datastax.spark.connector.rdd.CassandraJoinRDD$$anonfun$fetchIterator$1.apply(CassandraJoinRDD.scala:268)
at com.datastax.spark.connector.rdd.CassandraJoinRDD$$anonfun$fetchIterator$1.apply(CassandraJoinRDD.scala:268)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Key may not be empty
at com.datastax.driver.core.Responses$Error.asException(Responses.java:136)
at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)
at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:184)
at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:43)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:798)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:617)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:831)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:346)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:254)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
問題は、どのDStreamとそのデータが原因であるかを知ることができないことです。 Cassandraに書き込むすべてのDStreamをチェックしたり、自分自身のデータバリデーターを書くことができますが、より汎用的なソリューションを探しています。
もう1つの問題は、エラーがジョブを無視するのではなくジョブ全体を強制終了し、他のデータの書き込みを続行することです。基本的に単純な非スパーク書き込みの場合、私は例外をキャッチしてログに記録し、残りのデータを書き続けます。 Spark Cassandra Connectorにそういうことをする方法はありますか?
私はこれら2つの問題について何かできることはありますか?
どのようにデータを保存していますか?それはrddまたはデータフレーム、ケースクラスまたは通常の方法を使用していますか? – Kaushal
タプルのRDDです。 – Soid
の代わりにデフォルト値を試してみてください。入力データを確認するための検証を書くこともできます。 – Kaushal