2016-10-31 14 views
1

私はDynamo DBストリーム+ lamdbaをトリガとして使用して、Dynamines DBデータをRedshiftに変換するkinesisを呼び出します。 Dynamoストリームを使用して、異なる地域のRedshiftにDynamo DBデータをロードする方法を提案できますか?RedhisftへのDynamo DBデータ

+1

あなたはキネシス消防ホースを使用していますか? –

答えて

0

私は、Dynamo dbからRedshiftにデータを移動できるプログラムを作成しましたが、ストリームなしでは動作しません。コードを見て、これがあなたのケースを助けるかどうか、またはこれに対応するアイデアを得ることができます。

1.レッドシフトによる接続の作成。 2.レディシフトに挿入するためのプリペアドステートメントを作成します。 3.ページ設定を使用してDynamoから一括してデータを取得します。 4.バッチデータをResdhiftにバッチで挿入します。

public void createConnectionWithRedshift() { 
     final String DB_URL = "jdbc:redshift://ao.cepuhmobd.us-west-2.redshift.amazonaws.com:5439/events"; 
     // final String DB_URL = args[0]; 
     // Database credentials 
     final String USER = "abc"; 
     final String PASS = "abc"; 
     Connection conn = null; 
     try { 
      // STEP 3: Open a connection 
      System.out.println("Connecting to database..."); 
      conn = DriverManager.getConnection(DB_URL, USER, PASS); 
      // createNewTable(conn); 
      // STEP 4: Execute a query 
      preparedStatement = conn.prepareStatement("insert into Events " + "(Vin,timestamp,eventtype,source,data)" + "VALUES (?,?,?,?,?)"); 
     } catch (SQLException se) { 
      se.printStackTrace(); 
     } 
    }// end main 


    public void replicateDynamoToRedshidt(int pages, int batchSize, int scanSize) 
      throws TableNeverTransitionedToStateException, InterruptedException { 
createConnectionWithRedshift();//Redshift Connection 
for (int i = 0; i < pages; i = i + 1) { 
      List<EventLogEntity> results = findAll(new PageRequest(i, batchSize));//Fetching the data from Dynamo in batches 
      List<HeadUnitData> headUnitDataList = headUnitEvents(results); 
      for (int j = 0; j < headUnitDataList.size(); j++) { 
       HeadUnitData headUnitData = headUnitDataList.get(j); 
       insertData(headUnitData.getVin(), headUnitData.getType(), headUnitData.getSource(), headUnitData.getData());//Inserting the data into Redshidt in batches 
      } 
      try { 
       preparedStatement.executeBatch(); 
       System.out.println("Inserted in Database : " + results.size()); 
      } catch (SQLException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 
関連する問題