-1
バッチを使用してCassandraにデータを挿入しています。私はジョブを実行すると例外よりも下になります。CassandraバッチInvalidQueryException - バッチが大きすぎます
caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
at com.datastax.driver.core.Responses$Error.asException(Responses.java:136)
at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)
at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:184)
at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:43)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:798)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:617)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
私はこの問題に関して多くのブログを読んでいます。しかしそれは役に立たない。私は初期化中に "spark.cassandra.output.batch.size.bytes"をspark confに設定しようとしました。それでも私の問題は解決していません。同じエラーが発生しています。私のバッチには約1000個の挿入文があります。
以下のコードを見てください。
CassandraConnector connector = CassandraConnector.apply(javaSparkContext.getConf());
pairRDD.mapToPair(earnCalculatorKeyIterableTuple2 -> {
if (condition) {
do something......
}
else {
Session session = connector.openSession();
BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED); batch.setConsistencyLevel(ConsistencyLevel.valueOf(LOCAL_QUOROM));
PreparedStatement statement = session.prepare('my insert query');
for (condition) {
if (!condition) {
break;
}
Tuple2._2.forEach(s -> {
if (!condition) {
LOG.info(message);
}
else {
BoundStatement boundStatement = statement.bind("bind variables");
batch.add(boundStatement);
}
});
session.execute(batch);
batch.clear();
}
session.close();
}
return Tuple2;
});
return s;
}
ありがとうございました。
実際にスパークを使用していますか?あなたのトレースにSpark Cassandra Connectorレベルがないようで、batch.size.bytesを変更すると、挿入された文の数が変わるためです。 – RussS
はい私はspark-cassandraコネクタを使用しています。私はbatch.size.bytes = autoを与えてみました。まだそれで修正されていません。 – sandy
実際にコードサンプルを提供できますか? – RussS