0

kinesisアプリケーションでストリーム化された単純なバニラスパークをスケーラで実行する際に問題が発生しています。私はSnowplowWordCountASLのようなチュートリアルの基本的なガイダンスに従ってきました。Spark Streaming Kinesisの統合:ワーカーでLeaseCoordinatorを初期化中にエラーが発生しました

しかし、私はまだそれがあるため、このキネシスワーカーエラーで動作させることはできません。ここで

16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator 
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain 
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125) 
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173) 
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374) 
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318) 
    at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174) 
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain 
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117) 
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758) 
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822) 
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118) 
    ... 4 more 

は、私のコードのサンプルです:

import com.amazonaws.auth.BasicAWSCredentials 
import com.amazonaws.internal.StaticCredentialsProvider 
import com.amazonaws.regions.RegionUtils 
import com.amazonaws.services.kinesis.AmazonKinesisClient 
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.kinesis.KinesisUtils 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Milliseconds, StreamingContext} 

/** 
    * Created by franco on 11/11/16. 
    */ 
object TestApp { 
    // === Configurations for Kinesis streams === 
    val awsAccessKeyId = "XXXXXX" 
    val awsSecretKey = "XXXXXXX" 
    val kinesisStreamName = "MyStream" 
    val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com" 
    val appName = "MyAppName" 

    def main(args: Array[String]): Unit = { 

    val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey) 

    val provider = new StaticCredentialsProvider(credentials) 

    val kinesisClient = new AmazonKinesisClient(provider) 
    kinesisClient.setEndpoint(kinesisEndpointUrl) 

    val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size() 

    val streams = shards 

    val batchInterval = Milliseconds(2000) 

    val kinesisCheckpointInterval = batchInterval 

    val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName 

    val cores : Int = Runtime.getRuntime.availableProcessors() 
    println("Available Cores : " + cores.toString) 
    val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores/2) + "]") 
    val ssc = new StreamingContext(config, batchInterval) 

    // Create the Kinesis DStreams 
    val kinesisStreams = (0 until streams).map { i => 
     KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName, 
     InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2) 
    } 

    ssc.union(kinesisStreams).map(bytes => new String(bytes)).print() 
    // Start the streaming context and await termination 
    ssc.start() 
    ssc.awaitTermination() 
    } 


} 

そして、私のIAMポリシーは次のようになります。

{ 
    "Version": "2012-10-17", 
    "Statement": [ 
     { 
      "Sid": "Stmt123", 
      "Effect": "Allow", 
      "Action": [ 
       "kinesis:DescribeStream", 
       "kinesis:GetShardIterator", 
       "kinesis:GetRecords" 
      ], 
      "Resource": [ 
       "arn:aws:kinesis:region:account:stream/name" 
      ] 
     }, 
     { 
      "Sid": "Stmt456", 
      "Effect": "Allow", 
      "Action": [ 
       "dynamodb:CreateTable", 
       "dynamodb:DeleteItem", 
       "dynamodb:DescribeTable", 
       "dynamodb:GetItem", 
       "dynamodb:PutItem", 
       "dynamodb:Scan", 
       "dynamodb:UpdateItem" 
      ], 
      "Resource": [ 
       "arn:aws:dynamodb:region:account:table/name" 
      ] 
     }, 
     { 
      "Sid": "Stmt789", 
      "Effect": "Allow", 
      "Action": [ 
       "cloudwatch:PutMetricData" 
      ], 
      "Resource": [ 
       "*" 
      ] 
     } 
    ] 
} 

私はこのアプリで何が間違っている得ることができません。この件に関するガイダンスはすべて歓迎されます。

答えて

1

AWSアクセスキーとシークレットキーを渡すためのDStreamのコンストラクタがあります。

たとえば、以下のリンクの第1および第5のコンストラクタでは、コンストラクタに渡すことができます(システムに渡す必要があります)vs Systemプロパティを設定する必要があります。

1

最終的には、資格情報の値をシステムプロパティに設定することで機能します。

System.setProperty("aws.accessKeyId","XXXXXX") 
System.setProperty("aws.secretKey","XXXXXX") 

しかし、私はこの解決策では「幸せ」ではありません。

このアプローチに関する問題はありますか?

関連する問題