AWS Kinesisからデータを読み込む簡単なアプリケーションを構築しようとしています。私は1つの断片を使ってデータを読み取ることができましたが、4つの異なる断片からデータを取得したいと思います。キネシスが複数のシャードからデータを取得する
問題は、シャードがアクティブである限り、繰り返し実行されるwhileループがあり、異なるシャードからデータを読み取ることができなくなります。これまでのところ、私は代替アルゴリズムを見つけることができず、KCLベースのソリューションを実装することもできませんでした。事前に 多くのおかげで
public static void DoSomething() {
AmazonKinesisClient client = new AmazonKinesisClient();
//noinspection deprecation
client.setEndpoint(endpoint, serviceName, regionId);
/** get shards from the stream using describe stream method*/
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(streamName);
List<Shard> shards = new ArrayList<>();
String exclusiveStartShardId = null;
do {
describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest);
shards.addAll(describeStreamResult.getStreamDescription().getShards());
if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
} else {
exclusiveStartShardId = null;
}
}while (exclusiveStartShardId != null);
/** shards obtained */
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setShardId(shards.get(0).getShardId());
getShardIteratorRequest.setShardIteratorType("LATEST");
GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
while (!shardIterator.equals(null)) {
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(250);
GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
shardIterator = getRecordsResult.getNextShardIterator();
if(records.size()!=0) {
for(Record r : records) {
System.out.println(r.getPartitionKey());
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
問題はデータを読み取ることができないため、ワーカーを使用するとシャードを取得できません。 "consumeShard"メソッドを使いたいと思ったが、十分ではなかった。私はkinesisアプリケーションのawsサンプルをカスタマイズしようとしましたが、DynamoDBに依存しすぎていました。私は自分のオブジェクトや数をthoneのdynamoテーブルに保存したくありません。 – emrahozkan