9

DynamoDBストリームを使用してDynamoDBテーブルの変更をキャプチャしようとしましたが、AWSが提供するJava DynamoDBストリームのKinesisアダプタ。 ScalaアプリでAWS Java SDKを使用しています。AWS Java DynamoDBストリームを使用したDynamoDBストリームの処理Kinesisアダプタ

AWS guideに続き、AWS公開code exampleを実行することから始めました。しかし、Amazonのコードを自分の環境で動かすことに問題があります。私の問題はKinesisClientLibConfigurationオブジェクトにあります。

例コードでは、KinesisClientLibConfigurationは、DynamoDBによって提供されるストリームARNで構成されています。

new KinesisClientLibConfiguration("streams-adapter-demo", 
    streamArn, 
    streamsCredentials, 
    "streams-demo-worker") 

Iが最初に私ダイナモテーブルから現在のARNを配置することにより、私のScalaのアプリで同様のパターンに続く:

lazy val streamArn = dynamoClient.describeTable(config.tableName) 
.getTable.getLatestStreamArn 

をそして提供ARNとKinesisClientLibConfiguration作成:

lazy val kinesisConfig :KinesisClientLibConfiguration = 
new KinesisClientLibConfiguration(
    "testProcess", 
    streamArn, 
    defaultProviderChain, 
    "testWorker" 
).withMaxRecords(1000) 
    .withRegionName("eu-west-1") 
    .withMetricsLevel(MetricsLevel.NONE) 
    .withIdleTimeBetweenReadsInMillis(500) 
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) 

提供されたストリームのARNを確認しました。すべてがAWSコンソールに表示されているものと一致します。二番目のパラメータは以下のようにリストされているよう

com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask call 
SEVERE: Caught exception while sync'ing Kinesis shards and leases 
com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation  
error detected: Value 'arn:aws:dynamodb:eu-west-1:STREAM ARN' at 
'streamName' failed to satisfy constraint: Member must satisfy regular 
expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code: 
400; Error Code: ValidationException; Request ID:) 

意味を成さないKinesisClientLibConfigurationこの上で提供されているマニュアルを見て:私は提供ARNが有効なストリーム名ではないことを示す例外を取得し終わる実行時に

ストリーム名 ARNの記載なし。

KinesisClientLibConfigurationにはARNに関連するものが見つかりません。私はKinesisストリームではなく、DynamoDBストリームで作業しているので、ストリーム名を見つける方法もわかりません。

公開されているAWSの例で何が分からないのか分かりませんが、かなり古いバージョンのKCLを使用しているようです。私はamazon-kinesis-clientのバージョン1.7.0を使用しています。

答えて

3

問題は実際に私のKinesisClientLibConfigurationの外になってしまいました。

同じ設定を使用し、DynamoDBストリームアダプタライブラリに含まれるストリームアダプタと、DynamoDBとCloudWatchの両方のクライアントを提供することで、この問題を回避できました。

私の作業の解決策は次のようになります。

キネシスのクライアント設定を定義する。

//Kinesis config for DynamoDB streams 
lazy val kinesisConfig :KinesisClientLibConfiguration = 
    new KinesisClientLibConfiguration(
     getClass.getName, //DynamoDB shard lease table name 
     streamArn, //pulled from the dynamo table at runtime 
     dynamoCredentials, //DefaultAWSCredentialsProviderChain 
     KeywordTrackingActor.NAME //Lease owner name 
    ).withMaxRecords(1000) //using AWS recommended value 
    .withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value 
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) 

RecordProcessorFactoryのインスタンスを作成し、ストリームアダプタとCloudWatchのクライアント

val streamAdapterClient :AmazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoCredentials) 
streamAdapterClient.setRegion(region) 

val cloudWatchClient :AmazonCloudWatchClient = new AmazonCloudWatchClient(dynamoCredentials) 
cloudWatchClient.setRegion(region) 

を定義し、それがKCLがIRecordProcessorFactoryと返さIRecordProcessorを提供実装するクラスを定義するのはあなた次第です。

val recordProcessorFactory :RecordProcessorFactory = new RecordProcessorFactory(context, keywordActor, config.keywordColumnName) 

私が欠席していた部分は、すべてこの作業者に提供する必要があります。

val worker :Worker = 
    new Worker.Builder() 
    .recordProcessorFactory(recordProcessorFactory) 
    .config(kinesisConfig) 
    .kinesisClient(streamAdapterClient) 
    .dynamoDBClient(dynamoClient) 
    .cloudWatchClient(cloudWatchClient) 
    .build() 

//this will start record processing 
streamExecutorService.submit(worker) 
0

また、あなたの代わりに内部的に使用していますAmazonDynamoDBStreamsAdapterClientcom.amazonaws.services.kinesis.clientlibrary.lib.worker.Workercom.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerを使用することができます。

すなわち

lazy val kinesisConfig :KinesisClientLibConfiguration = 
new KinesisClientLibConfiguration(
    getClass.getName, //DynamoDB shard lease table name 
    streamArn, //pulled from the dynamo table at runtime 
    dynamoCredentials, //DefaultAWSCredentialsProviderChain 
    KeywordTrackingActor.NAME //Lease owner name 
).withMaxRecords(1000) //using AWS recommended value 
.withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value 
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) 

val worker = new com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker(recordProcessorFactory, kinesisConfig) 
0

だけで何の問題だった答えるために - それはちょうどストリーム名を望んでいたときに、ARNを提供しました。

0

私は最近このプロジェクトgfc-aws-kinesisにPRしました。アダプタを渡してKinesisRecordAdapter実装を書くだけで、今すぐ使用できます。

私は

は、構成内のクライアント

val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient = 
    new AmazonDynamoDBStreamsAdapterClient() 

パスを作成してハッシュマップを解析するScanamoを使用している例では

:適し暗黙レコードリーダを作成

val streamConfig = KinesisStreamConsumerConfig[Option[A]](
    applicationName, 
    config.stream, //the full dynamodb stream arn 
    regionName = Some(config.region), 
    checkPointInterval = config.checkpointInterval, 
    initialPositionInStream = config.streamPosition, 
    dynamoDBKinesisAdapterClient = Some(streamAdapterClient) 
) 
KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed) 

を読み込みダイナモイベント:

implicit val kinesisRecordReader 
    : KinesisRecordReader[Option[A]] = 
    new KinesisRecordReader[Option[A]] { 
    override def apply(record: Record): Option[A] = { 
     record match { 
     case recordAdapter: RecordAdapter => 
      val dynamoRecord: DynamoRecord = 
      recordAdapter.getInternalObject 
      dynamoRecord.getEventName match { 
      case "INSERT" => 
       ScanamoFree 
       .read[A](
        dynamoRecord.getDynamodb.getNewImage) 
       .toOption 
      case _ => None 
      } 
     case _ => None 
     } 
    } 
    } 
+0

ここで例と簡単な説明を追加して、回答を改善してください。たぶん[これを読む](https://stackoverflow.com/help/how-to-answer)は、あなたの答えを改善するのに役立ちます。 – Markus

関連する問題