0

cassandraの準備済みステートメントを使用して同期インサートを実行しているため、アプリケーション全体が故障します。 短時間で、約90kの異なるクラスタリングエントリを1つのパーティションに書き込みます。複合キーによる単一パーティションへの約90kのプリペアドステートメントでのcassandraでのエラー

List<Statement> statements = new ArrayList<>(); 
map.forEach((String location, Set<String> set) -> { 
    PreparedStatement updateStatement = preparedStatementSupplier.getUpdatePeriodByLocationStatement(); 
    BoundStatement boundStatement = updateStatement.bind(set, tradePartner, location); 
    statements.add(boundStatement); 
}); 

Iterator<Statement> iterator = statements.iterator(); 
while (iterator.hasNext()) { 
    Statement statement = iterator.next(); 
    try { 
     cassandraOperations.execute(statement); 
     iterator.remove(); 
    } catch (RuntimeException e) { 
     LOG.error("error on forecast data persistence, reason: {}", e.getMessage(), e); 
    } 
} 

public synchronized PreparedStatement getUpdatePeriodByLocationStatement() { 
    if (Objects.isNull(updatePeriodByLocation)) { 
     // CREATE TABLE period_by_location (tp text, loc text, pd set<text>, PRIMARY KEY ((tp), loc)); 
     updatePeriodByLocation = cassandraOperations.getSession().prepare("UPDATE period_by_location SET pd = pd + ? WHERE tp = ? AND loc = ?"); 
     updatePeriodByLocation.setIdempotent(true); 
    } 
    return updatePeriodByLocation; 
} 

サーバー側でタイムアウトが発生し、ドライバが動作を停止すると思われます。 Cassandraはデフォルト設定で多かれ少なかれ実行します。 cassandraノードのエラーは次のようになります。

ERROR [SharedPool-Worker-3] 2017-11-29 15:41:37,084 ErrorMessage.java:338 - Unexpected exception during request 
java.lang.NullPointerException: null 
    at org.apache.cassandra.serializers.UTF8Serializer$UTF8Validator.validate(UTF8Serializer.java:55) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.serializers.UTF8Serializer.validate(UTF8Serializer.java:34) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.serializers.SetSerializer.deserializeForNativeProtocol(SetSerializer.java:88) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.cql3.Sets$Value.fromSerialized(Sets.java:152) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.cql3.Sets$Marker.bind(Sets.java:251) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.cql3.Sets$Adder.execute(Sets.java:286) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.cql3.statements.UpdateStatement.addUpdateForKey(UpdateStatement.java:112) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.cql3.statements.UpdateStatement.addUpdateForKey(UpdateStatement.java:59) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.cql3.statements.ModificationStatement.getMutations(ModificationStatement.java:744) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.cql3.statements.ModificationStatement.executeWithoutCondition(ModificationStatement.java:531) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.cql3.statements.ModificationStatement.execute(ModificationStatement.java:519) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.cql3.QueryProcessor.processStatement(QueryProcessor.java:226) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.cql3.QueryProcessor.processPrepared(QueryProcessor.java:492) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.cql3.QueryProcessor.processPrepared(QueryProcessor.java:469) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.transport.messages.ExecuteMessage.execute(ExecuteMessage.java:142) ~[apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.transport.Message$Dispatcher.channelRead0(Message.java:507) [apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.transport.Message$Dispatcher.channelRead0(Message.java:401) [apache-cassandra-2.2.8.jar:2.2.8] 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) [netty-all-4.0.23.Final.jar:4.0.23.Final] 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) [netty-all-4.0.23.Final.jar:4.0.23.Final] 
    at io.netty.channel.AbstractChannelHandlerContext.access$700(AbstractChannelHandlerContext.java:32) [netty-all-4.0.23.Final.jar:4.0.23.Final] 
    at io.netty.channel.AbstractChannelHandlerContext$8.run(AbstractChannelHandlerContext.java:324) [netty-all-4.0.23.Final.jar:4.0.23.Final] 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_121] 
    at org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$FutureTask.run(AbstractLocalAwareExecutorService.java:164) [apache-cassandra-2.2.8.jar:2.2.8] 
    at org.apache.cassandra.concurrent.SEPWorker.run(SEPWorker.java:105) [apache-cassandra-2.2.8.jar:2.2.8] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121] 

私がこれまでに読んだのは、cassandrOperations.execute()を実行する速度を遅くすることです。これは正しい方法なのでしょうか、それとももっと良い解決策がありますか? 私は何かのヒントを大いに感謝します。 ありがとうございます。

答えて

0

CassandrOperations.execute(…)が適切な方法です。あなたのコードから、私はCassandraOperationsをあなたのコードが大幅に最適化されているように見せて、なぜならCassandraOperationsが例外翻訳以外に少しのオーバーヘッドを加えているのだろうと思っています。

+0

私はCassandraOperationsを使用してプロジェクトを開始し、依然としてそれを使用しています。しかし、春からのオーバーヘッドがあっても、その声明は速く発射されます。私はセマフォについてこの[post](https://stackoverflow.com/questions/27902232/cassandra-cluster-with-bad-insert-performance-and-insert-stability)を読んで、正しい方法であれば疑問に思いました私の問題を解決するには? – hemsbeach