2017-06-22 24 views
2

多くのファイルをS3にアップロードする必要があります。そのジョブを順番に完了するまでに数時間かかることがあります。それがまさにコトルの新しいコルーチンの優れた点です。そこで、私は、スレッドベースの実行サービスをもう一度やってみるのではなく、まず最初に試してみたかったのです。Kotlin Coroutinesによる同時S3ファイルアップロード

fun upload(superTiles: Map<Int, Map<Int, SuperTile>>) = runBlocking { 
    val s3 = AmazonS3ClientBuilder.standard().withRegion("eu-west-1").build() 
    for ((x, ys) in superTiles) { 
     val jobs = mutableListOf<Deferred<Any>>() 
     for ((y, superTile) in ys) { 
      val job = async(CommonPool) { 
       uploadTile(s3, x, y, superTile) 
      } 
      jobs.add(job) 
     } 
     jobs.map { it.await() } 
    } 
} 

suspend fun uploadTile(s3: AmazonS3, x: Int, y: Int, superTile: SuperTile) { 
    val json: String = "{}" 
    val key = "$s3Prefix/x4/$z/$x/$y.json" 
    s3.putObject(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata)) 
} 

問題:ここで

は私の(簡体字)のコードであるコードはまだ非常に遅く、ロギングは要求がまだ順次実行されていることが明らかになった:次のいずれかが作成される前に、ジョブが終了します。非常に少数のケース(10のうち1)だけで、同時に実行されているジョブがあります。

なぜコードはそれほど速く/同時に実行されませんか?それについて私は何ができますか?

+0

無教育推測:複数のクライアントを持つように非同期セクション内の 'val s3 = AmazonS3ClientBuilder ... 'を移動しますか? –

+0

はどちらもうまくいきませんでした。私の無教養の推測は今、 'putObject'が要求をブロックしていることです。コルーチンは何も変更できません。 – linqu

+0

正確です。 S3 SDKはノンブロッキングIO(NIO経由)をサポートしていないようですので、アップロードごとに1つのスレッドが必要です。複数のパラレルで実行することはできますが、並行して実行することはお勧めできません。ある時点では、ネットワーク帯域幅によっても制限されます。 – diesieben07

答えて

5

あなたは非同期 APIで作業するときに使用しているAmazonS3.putObject APIは、古い学校のブロック、同期APIですので、あなたがそのCommonPool内のスレッドの数と同じくらい多くの同時アップロードを取得しながら、Kotlinのコルーチンは、エクセルあなたは使用しています。 uploadTileの機能にsuspendという機能をマーキングすることは、本体に中断機能を使用していないため変更されません。

アップロードタスクでより多くのスループットを得るための最初のステップは、そのための非同期APIの使用を開始することです。私はAmazon S3 TransferManagerその財布を見てお勧めします。それがあなたの問題を最初に解決するかどうかを確認してください。

コトルリンコルーチンは、非同期APIを使いやすい論理ワークフローに組み合わせるのに役立つように設計されています。たとえば、次の拡張機能を書き込むことによって、コルーチンで使用するためにTransferManagerの非同期APIを適用することは容易である:

suspend fun Upload.await(): UploadResult = suspendCancellableCoroutine { cont -> 
    addProgressListener { 
     if (isDone) { 
      // we know it should not actually wait when done 
      try { cont.resume(waitForUploadResult()) } 
      catch (e: Throwable) { cont.resumeWithException(e) } 
     } 
    } 
    cont.invokeOnCompletion { abort() } 
} 

この拡張機能は、あなたがTransferManagerで動作する非常に流暢なコードを書くことができ、あなたのuploadTile機能を書き換えることができますuploadTileのこの新しいバージョンはabovを定義した懸濁機能awaitをどのように使用するか

suspend fun uploadTile(tm: TransferManager, x: Int, y: Int, superTile: SuperTile) { 
    val json: String = "{}" 
    val key = "$s3Prefix/x4/$z/$x/$y.json" 
    tm.upload(PutObjectRequest("my_bucket", ByteArrayInputStream(json.toByteArray()), metadata)) 
     .await() 
} 

お知らせ:TransferManager代わりAmazonS3のインターフェイスをブロックで作業するので動作するようにe。

関連する問題