2016-05-11 17 views
1

AWS Kinesisからデータを読み込む簡単なアプリケーションを構築しようとしています。私は1つの断片を使ってデータを読み取ることができましたが、4つの異なる断片からデータを取得したいと思います。キネシスが複数のシャードからデータを取得する

問題は、シャードがアクティブである限り、繰り返し実行されるwhileループがあり、異なるシャードからデータを読み取ることができなくなります。これまでのところ、私は代替アルゴリズムを見つけることができず、KCLベースのソリューションを実装することもできませんでした。事前に 多くのおかげで

public static void DoSomething() { 
     AmazonKinesisClient client = new AmazonKinesisClient(); 
     //noinspection deprecation 
     client.setEndpoint(endpoint, serviceName, regionId); 
     /** get shards from the stream using describe stream method*/ 

     DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); 
     describeStreamRequest.setStreamName(streamName); 
     List<Shard> shards = new ArrayList<>(); 
     String exclusiveStartShardId = null; 
     do { 
      describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId); 
      DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest); 
      shards.addAll(describeStreamResult.getStreamDescription().getShards()); 
      if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { 
       exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); 
      } else { 
       exclusiveStartShardId = null; 
      } 
     }while (exclusiveStartShardId != null); 

     /** shards obtained */ 
     String shardIterator; 

     GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); 
     getShardIteratorRequest.setStreamName(streamName); 
     getShardIteratorRequest.setShardId(shards.get(0).getShardId()); 
     getShardIteratorRequest.setShardIteratorType("LATEST"); 

     GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); 
     shardIterator = getShardIteratorResult.getShardIterator(); 
     GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); 

     while (!shardIterator.equals(null)) { 
      getRecordsRequest.setShardIterator(shardIterator); 
      getRecordsRequest.setLimit(250); 
      GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); 
      List<Record> records = getRecordsResult.getRecords(); 

      shardIterator = getRecordsResult.getNextShardIterator(); 
      if(records.size()!=0) { 
       for(Record r : records) { 
        System.out.println(r.getPartitionKey()); 
       } 
      } 
      try { 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 

      } 
     } 
    } 

答えて

1

あなたが複数の破片から、単一のプロセス/労働者から読まないであろうことをお勧めします。まず、コードの複雑さに加えて、さらに重要なことに、スケールアップの問題があることがわかります。

スケーラビリティの「秘密」は、小さく独立した労働者または他のそのようなユニットを持つことです。このような設計はAWSのHadoop、DynamoDB、Kinesisで見ることができます。これにより、必要に応じて簡単に拡大縮小することができる小型システム(マイクロサービス)を構築することができます。あなたのサービスがより成功を収めるように、またはその使用法のその他の変動によって、より多くの作業/データの単位を簡単に追加することができます。

これらのAWSサービスからわかるように、DynamoDBのように自動的にこのスケーラビリティを得ることができます。また、キネシスストリームにシャードを追加する必要が生じることもあります。しかし、アプリケーションのためには、スケーラビリティを何とか制御する必要があります。

キネシスの場合、AWSラムダまたはKinesisクライアントライブラリ(KCL)を使用して拡大縮小できます。どちらもあなたのストリームのステータス(シャードとイベントの数)を聞き、それを使用してワーカーを追加または削除し、処理するイベントを配信します。これらの両方のソリューションでは、単一のシャードに対して作業している作業者を作成する必要があります。

複数のシャードのイベントを整列させる必要がある場合は、RedisやDynamoDBなどの状態サービスを使用してイベントを整列できます。

+0

問題はデータを読み取ることができないため、ワーカーを使用するとシャードを取得できません。 "consumeShard"メソッドを使いたいと思ったが、十分ではなかった。私はkinesisアプリケーションのawsサンプルをカスタマイズしようとしましたが、DynamoDBに依存しすぎていました。私は自分のオブジェクトや数をthoneのdynamoテーブルに保存したくありません。 – emrahozkan

関連する問題