2017-06-11 25 views
1

GZIPストリームを使用して動的に作成されたデータを圧縮し、S3にアップロードするには、圧縮ファイルあたり±1ギガのデータが必要です。データを圧縮してS3にアップロードします。

ファイルサイズが大きく、複数のファイルを並行して処理する予定なので、データ全体をメモリに保持することはできず、できるだけ早くS3にデータをストリーミングしたいと考えています。

また、私は圧縮データの正確なサイズを知ることができません。この質問を読む "Can I stream a file upload to S3 without a content-length header?"しかし、GZIPingと組み合わせる方法を理解できません。

私はAmazon S3: Multipart upload

を使用して、私はGZIPOutputStreamを作成することができた場合は、同時に(できればで5 MBの)圧縮データのチャンクを読みながら、パートによってそれに一部のデータを送信することを行っていると思うし、S3にアップロード私がしようとしていることは可能ですか?または、私の唯一の選択肢は、データをローカルストレージ(私のハードディスク)に圧縮し、圧縮ファイルをアップロードすることですか?

答えて

1

私は答えのために何もしないしていないので、これは私はそれをやった方法です:

package roee.gavriel; 

import java.io.ByteArrayInputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.io.OutputStream; 
import java.util.LinkedList; 
import java.util.List; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

import com.amazonaws.services.s3.AmazonS3; 
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; 
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; 
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; 
import com.amazonaws.services.s3.model.PartETag; 
import com.amazonaws.services.s3.model.UploadPartRequest; 

public class S3UploadStream extends OutputStream { 

    private final static Integer PART_SIZE = 5 * 1024 * 1024; 

    private final AmazonS3 s3client; 
    private final String bucket; 
    private final String key; 

    // The upload id given to the multiple parts upload by AWS. 
    private final String uploadId; 
    // A tag list. AWS give one for each part and expect then when the upload is finish. 
    private final List<PartETag> partETags = new LinkedList<>(); 
    // A buffer to collect the data before sending it to AWS. 
    private byte[] partData = new byte[PART_SIZE]; 
    // The index of the next free byte on the buffer. 
    private int partDataIndex = 0; 
    // Total number of parts that where uploaded. 
    private int totalPartCountIndex = 0; 
    private volatile Boolean closed = false; 
    // Internal thread pool which will handle the actual part uploading. 
    private final ThreadPoolExecutor executor; 

    public S3UploadStream(AmazonS3 s3client, String bucket, String key, int uploadThreadsCount) { 
     this.s3client = s3client; 
     this.bucket = bucket; 
     this.key = key; 
     InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucket, key); 
     InitiateMultipartUploadResult initResponse = s3client.initiateMultipartUpload(initRequest); 
     this.uploadId = initResponse.getUploadId(); 
     this.executor = new ThreadPoolExecutor(uploadThreadsCount, uploadThreadsCount, 60, TimeUnit.SECONDS, 
       new LinkedBlockingQueue<Runnable>(100)); 
    } 


    @Override 
    public synchronized void write(int b) throws IOException { 
     if (closed) { 
      throw new IOException("Trying to write to a closed S3UploadStreamer"); 
     } 
     partData[partDataIndex++] = (byte)b; 
     uploadPart(false); 
    } 

    @Override 
    public synchronized void close() { 
     if (closed) { 
      return; 
     } 
     closed = true; 

     // Flush the current data in the buffer 
     uploadPart(true); 

     executor.shutdown(); 
     try { 
      executor.awaitTermination(2, TimeUnit.MINUTES); 
     } catch (InterruptedException e) { 
      //Nothing to do here... 
     } 

     // Close the multiple part upload 
     CompleteMultipartUploadRequest compRequest = 
       new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags); 

     s3client.completeMultipartUpload(compRequest); 

    } 

    private synchronized void uploadPart(Boolean force) { 

     if (!force && partDataIndex < PART_SIZE) { 
      // the API requires that only the last part can be smaller than 5Mb 
      return; 
     } 

     // Actually start the upload 
     createUploadPartTask(partData, partDataIndex); 

     // We are going to upload the current part, so start buffering data to new part 
     partData = new byte[PART_SIZE]; 
     partDataIndex = 0;   
    } 

    private synchronized void createUploadPartTask(byte[] partData, int partDataIndex) { 
     // Create an Input stream of the data 
     InputStream stream = new ByteArrayInputStream(partData, 0, partDataIndex); 

     // Build the upload request 
     UploadPartRequest uploadRequest = new UploadPartRequest() 
       .withBucketName(bucket) 
       .withKey(key) 
       .withUploadId(uploadId) 
       .withPartNumber(++totalPartCountIndex) 
       .withInputStream(stream) 
       .withPartSize(partDataIndex); 

     // Upload part and add response to our tag list. 
     // Make the actual upload in a different thread 
     executor.execute(() -> { 
      PartETag partETag = s3client.uploadPart(uploadRequest).getPartETag(); 
      synchronized (partETags) { 
       partETags.add(partETag); 
      } 
     }); 
    } 
} 

そしてここS3 GZIPファイルに多くのGUIDを書くためにそれを使用するコードの小さなsnippestです:

int writeThreads = 3; 
int genThreads = 10; 
int guidPerThread = 200_000; 
try (S3UploadStreamer uploadStreamer = new S3UploadStreamer(s3client, "<YourBucket>", "<YourKey>.gz", writeThreads)) { 
    try (GZIPOutputStream stream = new GZIPOutputStream(uploadStreamer)) { 
     Semaphore s = new Semaphore(0); 
     for (int t = 0; t < genThreads; ++t) { 
      new Thread(() -> { 
       for (int i = 0; i < guidPerThread; ++i) { 
        try { 
         stream.write(java.util.UUID.randomUUID().toString().getBytes()); 
         stream.write('\n'); 
        } catch (IOException e) { 
        } 
       } 
       s.release(); 
      }).start(); 
     } 
     s.acquire(genThreads); 
    } 
} 
関連する問題