-1

バッチを使用してCassandraにデータを挿入しています。私はジョブを実行すると例外よりも下になります。CassandraバッチInvalidQueryException - バッチが大きすぎます

caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large 
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:136) 
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)  
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:184)  
    at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:43)  
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:798) 
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:617)  
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005) 
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)  
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)  
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)  
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) 

私はこの問題に関して多くのブログを読んでいます。しかしそれは役に立たない。私は初期化中に "spark.cassandra.output.batch.size.bytes"をspark confに設定しようとしました。それでも私の問題は解決していません。同じエラーが発生しています。私のバッチには約1000個の挿入文があります。

以下のコードを見てください。

CassandraConnector connector = CassandraConnector.apply(javaSparkContext.getConf()); 
pairRDD.mapToPair(earnCalculatorKeyIterableTuple2 -> { 
      if (condition) { 
       do something...... 
      } 
      else { 
       Session session = connector.openSession(); 
       BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);   batch.setConsistencyLevel(ConsistencyLevel.valueOf(LOCAL_QUOROM)); 
       PreparedStatement statement = session.prepare('my insert query'); 
       for (condition) { 
        if (!condition) { 
         break; 
        } 
        Tuple2._2.forEach(s -> { 
         if (!condition) { 
          LOG.info(message); 
         } 
         else { 
          BoundStatement boundStatement = statement.bind("bind variables"); 
          batch.add(boundStatement); 
         } 
        }); 
        session.execute(batch); 
        batch.clear(); 
       } 
       session.close(); 
      } 
      return Tuple2; 
     }); 
     return s; 
    } 

ありがとうございました。

+0

実際にスパークを使用していますか?あなたのトレースにSpark Cassandra Connectorレベルがないようで、batch.size.bytesを変更すると、挿入された文の数が変わるためです。 – RussS

+0

はい私はspark-cassandraコネクタを使用しています。私はbatch.size.bytes = autoを与えてみました。まだそれで修正されていません。 – sandy

+2

実際にコードサンプルを提供できますか? – RussS

答えて

1

バッチを手動で作成していて、バッチが大きすぎます。各バッチに少ない行を追加します。これを手動で行うにはたくさんの方法がありますが、最も簡単なのは、Xステートメントが追加されるたびにバッチを送信するカウンタを追加することです。

変更するパラメータは、saveToCassandraによって行われる自動バッチ処理にのみ関連しています。

関連する問題