Pyspark RDDのワーカーからSQSキューにデータを送信しようとしています.boto3を使用してAWSと通信しています。 RDDを収集してドライバからデータを送信するのではなく、パーティションから直接データを送信する必要があります。boto3はpysparkワーカーでクライアントを作成できませんか?
sparkドライバからboto3ローカル&経由でSQSにメッセージを送信できます。また、私はboto3をインポートし、パーティション上にboto3セッションを作成することができます。しかし、パーティションからクライアントまたはリソースを作成しようとすると、エラーが発生します。私はboto3がクライアントを正しく作成していないと信じていますが、その点については完全にはわかりません。私のコードは次のようになります。
def get_client(x): #the x is required to use pyspark's mapPartitions
import boto3
client = boto3.client('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
return x
rdd_with_client = rdd.mapPartitions(get_client)
エラー:
DataNotFoundError: Unable to load data for: endpoints
長いトレースバック:
File "<stdin>", line 4, in get_client
File "./rebuilt.zip/boto3/session.py", line 250, in client
aws_session_token=aws_session_token, config=config)
File "./rebuilt.zip/botocore/session.py", line 810, in create_client
endpoint_resolver = self.get_component('endpoint_resolver')
File "./rebuilt.zip/botocore/session.py", line 691, in get_component
return self._components.get_component(name)
File "./rebuilt.zip/botocore/session.py", line 872, in get_component
self._components[name] = factory()
File "./rebuilt.zip/botocore/session.py", line 184, in create_default_resolver
endpoints = loader.load_data('endpoints')
File "./rebuilt.zip/botocore/loaders.py", line 123, in _wrapper
data = func(self, *args, **kwargs)
File "./rebuilt.zip/botocore/loaders.py", line 382, in load_data
raise DataNotFoundError(data_path=name)
DataNotFoundError: Unable to load data for: endpoints
私も代わりに、明示的にリソースを作成するために私の機能を変更しようとしましたクライアントで、&がデフォルトのクライアント設定を使用しているかどうかを確認します。その場合には、私のコードは次のとおりです。
def get_resource(x):
import boto3
sqs = boto3.resource('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
return x
rdd_with_client = rdd.mapPartitions(get_resource)
私は、クライアントが存在しないため、トリガされhas_low_level_clientパラメータ、を指してエラーを受け取ります。トレースバックには次のように書かれています:
File "/usr/lib/spark/python/pyspark/rdd.py", line 2253, in pipeline_func
File "/usr/lib/spark/python/pyspark/rdd.py", line 270, in func
File "/usr/lib/spark/python/pyspark/rdd.py", line 689, in func
File "<stdin>", line 4, in session_resource
File "./rebuilt.zip/boto3/session.py", line 329, in resource
has_low_level_client)
ResourceNotExistsError: The 'sqs' resource does not exist.
The available resources are:
-
私には、それらを収容するクライアントがないと考えられるため、利用可能なリソースはありません。
私は数日間、これに対して私の頭を叩いていました。どんな助けにも感謝!
botocoreがインストールされている場所を確認し、 'data'サブディレクトリを確認してください。また、ディスクから読み取る能力があることを確認する必要があります。 –
こんにちはJordan、データのサブディレクトリで何を探していますか?私はそこにendpoints.jsonというファイルを持っていますが、それはこのトレースバックに関連しているようです。 – EmmaOnThursday
どんな理由であれ、botocoreはその 'endpoints.json'ファイルにアクセスすることができず、' boto3'も同様にそのディレクトリ内のデータにアクセスすることができません。私の考えは、それがまったく存在しなかったか、またはあなたの環境がそれがアクセスされるのを妨げるということでした。 –