2016-10-26 19 views
2

データベースに何百万ものデータ行を挿入しようとしています。私はこの目的のためにThreadPoolExecutorを使用しようとしています。私は9000レコードごとにバッチを作成し、各スレッドにバッチを送信しています。ここでは、ThreadPool Sizeを20に固定しました。サイズが大きくなると、失敗しています。 ThreadPoolExecutorで使用可能なスレッドの数を確認するにはどうしたらいいですか?また、スレッドプールに空きスレッドがあるまで待機するにはどうすればよいですか。マルチスレッドを使用してDBに百万データを挿入する

聴覚は私のコードです、間違っていると助けてください。

int threadCount=10; 
     ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadCount); 
     int i=0; 

     StringBuffer sb=new StringBuffer(); 
     sb.append("BEGIN BATCH"); 
     sb.append(System.lineSeparator()); 
     int cnt =metaData.getColumnCount(); 
     while(rs.next()) 
     {  
      String query ="INSERT INTO "+table+" ("+columnslist.get(1)+")VALUES("+i; 
      for (int j=1 ; j <= cnt ; j++) 
      { 
       if(metaData.getColumnTypeName(j).contains("int") || metaData.getColumnTypeName(j).contains("number")) 
       { 
         query +=","+ rs.getInt(j); 
       } 
       else if(metaData.getColumnTypeName(j).contains("varchar") || metaData.getColumnTypeName(j).contains("date") || metaData.getColumnTypeName(j).contains("getTimestamp")) 
       { 
         query +=",'"+parseColumnData(rs.getString(j))+"'"; 
       } 
       else 
       { 
         query +=",'"+parseColumnData(rs.getString(j))+"'"; 
       } 
      } 
       query +=");"; 
       sb.append(query);sb.append(System.lineSeparator()); 
       if(i%9000==0) 
       { 
        sb.append("APPLY BATCH"); 
        System.out.println(threadPool.getActiveCount()); 

        Thread t = new Thread(new ExcecuteTask(sb.toString(),session)); 
        threadPool.execute(t);    
        sb.setLength(0); 
        sb.append("BEGIN BATCH"); 
        sb.append(System.lineSeparator()); 

       } 
       i++; 
      } 
      sb.append("APPLY BATCH"); 

      Thread t = new Thread(new ExcecuteTask(sb.toString(),session)); 
      threadPool.execute(t); 
      sb.setLength(0); 

      threadPool.shutdown(); 
      while (threadPool.getTaskCount() != threadPool.getCompletedTaskCount()) 
      { 
      } 

      System.out.println(table+" Loaded sucessfully"); 





public class ExcecuteTask implements Runnable 
{ 
     private String sb; 
     private Session session; 

     public ExcecuteTask(String s,Session session) 
     { 
      sb = s; 
      this.session=session; 
     } 
     public void run() 
     { 
      session.executeAsync(sb.toString()); 
     } 
} 
+0

あなたは 'FixedThreadPool'を使っていますが、拡張可能なスレッド番号を持つ' CachedThreadPool'を探しているようです。 – Pievis

+0

あなたは間違っています。 'Connection.prepareStatement'を調べてコンパイルされたステートメント(' PreparedStatement'インスタンス)を取得してください。 'PreparedStatement.setObject'を使用して挿入する行の値を設定し、' Statement.addBatch'を使って行をバッチに追加します。バッチを実行するには 'Statement.executeBatch'を使います(つまり、あなたのケースに行を挿入します)。別のラウンドの挿入には 'Statement.clearBatch'を使用してください。 SQLステートメントの文字列を連結することは、多数の理由(SQLインジェクションがその1つである)のためにno-noです。文字列を '+ ='で連結することは、文字列連結のためのno-noです。... –

+0

SQLインジェクション防止の詳細については、こちらをご覧ください:[SQL Injection Preventionチートシート](https://www.owasp.org/index.php/SQL_Injection_Prevention_Cheat_Sheet ) –

答えて

1

あなたはそれにgetActiveCountメソッドを呼び出すことによってThreadPoolExecutor内のアクティブスレッドのおおよその数を見つけることができます。しかし、あなたはする必要はありません。 Java documentation for Executors.newFixedThreadPool

から

共有アンバウンド形式のキューなしで動作するスレッドの固定数を再利用するスレッドプールを作成します。どの時点でも、たいていのnThreadsスレッドはアクティブな処理タスクになります。すべてのスレッドがアクティブなときに追加のタスクがサブミットされると、スレッドが使用可能になるまでキュー内で待機します。シャットダウン前に実行中にエラーが発生してスレッドが終了した場合、必要に応じて後続のタスクを実行するために新しいスレッドが使用されます。プール内のスレッドは、明示的にシャットダウンするまで存在します。

スレッドプールにタスクを送信し続けることができるようになり、スレッドが利用可能になると、スレッドプールが選択されて実行されます。

Threadオブジェクトにタスクをラップしているので、スレッドプールに送信する必要はありません。

関連する問題