2017-03-03 11 views
0

PostgreSQLサーバの1つのテーブル(合計4,000万行)の小さなデータを小さなバッチ(各csvの6000行)に読み込もうとしています。私はひかりCPがこの目的には理想的だと思いました。HikariCPでのPostgresqlのパフォーマンスの問題

Java 8(1.8.0_65)、Postgres JDBCドライバ9.4.1211およびHikariCP 2.4.3を使用してデータを挿入したときのスループットです。

6000行4分42秒。

私は間違っていますが、どのようにして挿入速度を改善できますか?私のセットアップについて

さらにいくつかの単語:

  • プログラムは、(株)ネットワークの後ろに私のラップトップで実行されます。
  • Postgresサーバー9.4は、db.m4.largeと50 GB SSDを使用したAmazon RDSです。
  • まだテーブルに定義されている明示的なインデックスまたは主キーは定義されていません。
  • プログラムは、以下のようにリクエストを保持するために大規模なスレッドプールと非同期各行を挿入:

    private static ExecutorService executorService = new ThreadPoolExecutor(5, 1000, 30L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000)); 
    

DataSource設定である:

 private DataSource getDataSource() { 
       if (datasource == null) { 
        LOG.info("Establishing dataSource"); 
        HikariConfig config = new HikariConfig(); 
        config.setJdbcUrl(url); 
        config.setUsername(userName); 
        config.setPassword(password); 
        config.setMaximumPoolSize(600);// M4.large 648 connections tops 
        config.setAutoCommit(true); //I tried autoCommit=false and manually committed every 1000 rows but it only increased 2 minute and half for 6000 rows 
        config.addDataSourceProperty("dataSourceClassName","org.postgresql.ds.PGSimpleDataSource"); 
        config.addDataSourceProperty("dataSource.logWriter", new PrintWriter(System.out)); 
        config.addDataSourceProperty("cachePrepStmts", "true"); 
        config.addDataSourceProperty("prepStmtCacheSize", "1000"); 
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048"); 
        config.setConnectionTimeout(1000); 

        datasource = new HikariDataSource(config); 
       } 
       return datasource; 
      } 

このIは、ソースデータを読み取る場合:

private void readMetadata(String inputMetadata, String source) { 
      BufferedReader br = null; 
      FileReader fr = null; 
      try { 
       br = new BufferedReader(new FileReader(inputMetadata)); 
       String sCurrentLine = br.readLine();// skip header; 
       if (!sCurrentLine.startsWith("xxx") && !sCurrentLine.startsWith("yyy")) { 
        callAsyncInsert(sCurrentLine, source); 
       } 
       while ((sCurrentLine = br.readLine()) != null) { 
        callAsyncInsert(sCurrentLine, source); 
       } 
      } catch (IOException e) { 
       LOG.error(ExceptionUtils.getStackTrace(e)); 
      } finally { 
       try { 
        if (br != null) 
         br.close(); 

        if (fr != null) 
         fr.close(); 

       } catch (IOException ex) { 
        LOG.error(ExceptionUtils.getStackTrace(ex)); 
       } 
      } 
    } 

データを挿入しています非同期(またはJDBCをしよう!):

  private void callAsyncInsert(final String line, String source) { 
        Future<?> future = executorService.submit(new Runnable() { 
         public void run() { 
          try { 
           dataLoader.insertRow(line, source); 
          } catch (SQLException e) { 
           LOG.error(ExceptionUtils.getStackTrace(e)); 
           try { 
            errorBufferedWriter.write(line); 
            errorBufferedWriter.newLine(); 
            errorBufferedWriter.flush(); 
           } catch (IOException e1) { 
            LOG.error(ExceptionUtils.getStackTrace(e1)); 
           } 
          } 
         } 
        }); 
        try { 
         if (future.get() != null) { 
          LOG.info("$$$$$$$$" + future.get().getClass().getName()); 
         } 
        } catch (InterruptedException e) { 
         LOG.error(ExceptionUtils.getStackTrace(e)); 
        } catch (ExecutionException e) { 
         LOG.error(ExceptionUtils.getStackTrace(e)); 
        } 
       } 

私DataLoader.insertRowは以下の通りです:

  public void insertRow(String row, String source) throws SQLException { 
        String[] splits = getRowStrings(row); 
        Connection conn = null; 
        PreparedStatement preparedStatement = null; 
        try { 
         if (splits.length == 15) { 
          String ... = splits[0]; 
          //blah blah blah 

          String insertTableSQL = "insert into xyz(...) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) "; 
          conn = getConnection(); 
          preparedStatement = conn.prepareStatement(insertTableSQL); 
          preparedStatement.setString(1, column1); 
          //blah blah blah 
          preparedStatement.executeUpdate(); 
          counter.incrementAndGet(); 
          //if (counter.get() % 1000 == 0) { 
           //conn.commit(); 
          //} 
         } else { 
          LOG.error("Invalid row:" + row); 
         } 
        } finally { 
         /*if (conn != null) { 
          conn.close(); //Do preparedStatement.close(); rather connection.close 
         }*/ 
         if (preparedStatement != null) { 
          preparedStatement.close(); 
         } 
        } 
       } 

pgAdmin4で監視する場合、私はいくつかのことに気づいた:

  • 数が最も多いです秒あたりのトランザクション数は50に近い値でした。
  • アクティブなデータベースセッションは1つだけでした。セッションの合計数は15でした。
  • あまりにも多くのブロックI/Oは、(約500を打つ、わからないことは心配をする必要があります場合)あなたは絶対の外を準備中のステートメントで、バッチ挿入を使用したい

screenshot from pgAdmin

+0

接続プールのサイズを小さくし、使用するスレッド数を減らします。接続数が増えてもスレッド数が増えても必ずしもパフォーマンスが向上するとは限りません。接続(およびスレッド)によって実際にはパフォーマンスとスループットが低下します。また、**あなたのメソッドで接続を閉じて、接続プールに戻して再利用する必要があります。 –

+0

また、ボトルネックが非同期の挿入であるかどうかを実際に確認してください。表示されないコード(問題は 'callAsyncInsert'を呼び出します)に問題がある可能性があります。 –

+0

ありがとうございます。 – bkrish

答えて

2

ループ、自動コミットをオフにします。擬似コード:

PreparedStatement stmt = conn.prepareStatement("insert into xyz(...) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") 
while (<data>) { 
    stmt.setString(1, column1); 
    //blah blah blah 
    stmt.addBatch(); 
} 
stmt.executeBatch(); 
conn.commit(); 

単一接続の単一のスレッドでも> 5000行/秒を挿入できます。

UPDATE:マルチスレッド化する場合、接続数はデータベースのCPUコア数x1.5または2にする必要があります。処理スレッドの数はそれに一致し、各処理スレッドは上記のパターンを使用して1つのCSVファイルを処理する必要があります。しかし、同じテーブルに同時に挿入されるものが多いと、DBにロック競合が発生しやすくなります。この場合、最適な並行性が見つかるまで、処理スレッドの数をバックオフする必要があります。

適切なサイズのプールと並行性は、> 20K行/秒を達成する上で簡単に上回るはずです。

また、HikariCP v2.6.0にアップグレードしてください。

+0

マルチスレッドインポートのスレッド数は、サーバー上のCPU数だけでなく、そのサーバー上のハードディスク数によっても決まります。 –

+0

@a_horse_with_no_name trueの場合、Amazon RDSではその番号を知る方法がありません。 – brettw

+0

私は提案ごとにプログラムを修正しました。 2.6.0にアップグレードされました。バッチ挿入を追加し、データをロードするための接続のみを使用しました。今は2種類のデータセットと大きな違いがあります。データセット#1は、1つのcsvファイル内の500K行です(正確には499951) - 00:02:08.670分です。データセット#2は83 CSVファイルで498Kで、6K行ごとに00:02:09.674分を要しました。だから私はスループット3840ish/secを得ることができます。私はマクロロギング、エラー処理などの重いフレームワークを持っていないと、もっと上手くいくかもしれませんが、私はこれに満足しています。このフレームワークのためのWoolridgeさん、ありがとうございました。 – bkrish