2017-04-22 5 views
0

私はカサンドラ3.10.0私のコードは、摂取私のCassandra Prepared Statementはなぜデータの摂取が遅いのですか?

でDatastaxエンタープライズ5.1を実行している私は3ノードカサンドラクラスタに摂取したい10万名前のJavaのリストを持っているが、それはlooooong時間がかかります。私はクラスタでストレステストを実行し、毎秒25,000回以上の書き込みを行うことができました。私の摂取コードでは、約200 /秒のひどいパフォーマンスを得ています。

My Java Listには100,000の名前があり、myListと呼ばれます。私はデータを取り込むために次の準備されたステートメントとセッションの実行を使用します。

PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)"); 

     int id = 0; 

     for(int i = 0; i < myList.size(); i++) { 
      id += 1; 
      session.execute(prepared.bind(id, myList.get(i))); 
     } 

私は自分のコードにクラスタモニターを追加して、何が起こっていたのかを確認しました。ここに私の監視コードがあります。

/// Monitoring Status of Cluster 
    final LoadBalancingPolicy loadBalancingPolicy = 
    cluster.getConfiguration().getPolicies().getLoadBalancingPolicy(); 
    ScheduledExecutorService scheduled = 
    Executors.newScheduledThreadPool(1); 
     scheduled.scheduleAtFixedRate(() -> { 
      Session.State state = session.getState(); 
      state.getConnectedHosts().forEach((host) -> { 
       HostDistance distance = loadBalancingPolicy.distance(host); 
       int connections = state.getOpenConnections(host); 
       int inFlightQueries = state.getInFlightQueries(host); 
       System.out.printf("%s connections=%d, current load=%d, maxload=%d%n", 
         host, connections, inFlightQueries, 
         connections * 
           poolingOptions.getMaxRequestsPerConnection(distance)); 
      }); 
    }, 5, 5, TimeUnit.SECONDS); 

モニタ5第2の出力は、3回の反復のために次のことを示しています

/192.168.20.25:9042 connections=1, current load=1, maxload=32768 
/192.168.20.26:9042 connections=1, current load=0, maxload=32768 
/192.168.20.34:9042 connections=1, current load=0, maxload=32768 
/192.168.20.25:9042 connections=1, current load=1, maxload=32768 
/192.168.20.26:9042 connections=1, current load=0, maxload=32768 
/192.168.20.34:9042 connections=1, current load=0, maxload=32768 
/192.168.20.25:9042 connections=1, current load=0, maxload=32768 
/192.168.20.26:9042 connections=1, current load=1, maxload=32768 
/192.168.20.34:9042 connections=1, current load=0, maxload=32768 

私は非常に効果的に自分のクラスタを利用していますということを表示されません。私は何が間違っているのか分かりませんし、ヒントを高く評価しています。

ありがとうございました!

答えて

3

executeAsyncを使用します。

提供されたクエリを非同期で実行します。このメソッドはブロックされません。クエリが基になるネットワークスタックに渡されるとすぐに戻ります。特に、このメソッドから戻っても、クエリが有効であるか、または実行中のノードに送信されたことさえ保証されません。 ResultSetFutureにアクセスすると、問合せの失敗に関する例外がスローされます。

大量のデータを挿入しています。 executeAsyncを使用していて、クラスタがそのようなデータ量を処理できなかった場合、例外がスローされます。 executeAsyncをセマフォで制限することができます。

例:

PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)"); 

int numberOfConcurrentQueries = 100; 
final Semaphore semaphore = new Semaphore(numberOfConcurrentQueries); 

int id = 0;  

for(int i = 0; i < myList.size(); i++) { 
    try { 
     id += 1; 
     semaphore.acquire(); 
     ResultSetFuture future = session.executeAsync(prepared.bind(id, myList.get(i))); 
     Futures.addCallback(future, new FutureCallback<ResultSet>() { 
      @Override 
      public void onSuccess(ResultSet result) { 
       semaphore.release(); 
      } 

      @Override 
      public void onFailure(Throwable t) { 
       semaphore.release(); 
      } 
     }); 
    } catch (Exception e) { 
     semaphore.release(); 
     e.printStackTrace(); 
    } 
} 

出典:
https://stackoverflow.com/a/30526719/2320144 https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/Session.html#executeAsync-com.datastax.driver.core.Statement-

+0

なぜあなたはIDが必要?成功カウント? –

+0

idはパーティションキー – mithrix

+0

@mithrix回答を更新 –

関連する問題