2016-06-21 20 views
1

Pyspark RDDのワーカーからSQSキューにデータを送信しようとしています.boto3を使用してAWSと通信しています。 RDDを収集してドライバからデータを送信するのではなく、パーティションから直接データを送信する必要があります。boto3はpysparkワーカーでクライアントを作成できませんか?

sparkドライバからboto3ローカル&経由でSQSにメッセージを送信できます。また、私はboto3をインポートし、パーティション上にboto3セッションを作成することができます。しかし、パーティションからクライアントまたはリソースを作成しようとすると、エラーが発生します。私はboto3がクライアントを正しく作成していないと信じていますが、その点については完全にはわかりません。私のコードは次のようになります。

def get_client(x): #the x is required to use pyspark's mapPartitions 
    import boto3 
    client = boto3.client('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey") 
    return x 

rdd_with_client = rdd.mapPartitions(get_client) 

エラー:

DataNotFoundError: Unable to load data for: endpoints 

長いトレースバック:

File "<stdin>", line 4, in get_client 
    File "./rebuilt.zip/boto3/session.py", line 250, in client 
    aws_session_token=aws_session_token, config=config) 
    File "./rebuilt.zip/botocore/session.py", line 810, in create_client 
    endpoint_resolver = self.get_component('endpoint_resolver') 
    File "./rebuilt.zip/botocore/session.py", line 691, in get_component 
    return self._components.get_component(name) 
    File "./rebuilt.zip/botocore/session.py", line 872, in get_component 
    self._components[name] = factory() 
    File "./rebuilt.zip/botocore/session.py", line 184, in create_default_resolver 
    endpoints = loader.load_data('endpoints') 
    File "./rebuilt.zip/botocore/loaders.py", line 123, in _wrapper 
    data = func(self, *args, **kwargs) 
    File "./rebuilt.zip/botocore/loaders.py", line 382, in load_data 
    raise DataNotFoundError(data_path=name) 
DataNotFoundError: Unable to load data for: endpoints 

私も代わりに、明示的にリソースを作成するために私の機能を変更しようとしましたクライアントで、&がデフォルトのクライアント設定を使用しているかどうかを確認します。その場合には、私のコードは次のとおりです。

def get_resource(x): 
    import boto3 
    sqs = boto3.resource('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey") 
    return x 

rdd_with_client = rdd.mapPartitions(get_resource) 

私は、クライアントが存在しないため、トリガされhas_low_level_clientパラメータ、を指してエラーを受け取ります。トレースバックには次のように書かれています:

File "/usr/lib/spark/python/pyspark/rdd.py", line 2253, in pipeline_func 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 270, in func 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 689, in func 
    File "<stdin>", line 4, in session_resource 
    File "./rebuilt.zip/boto3/session.py", line 329, in resource 
    has_low_level_client) 
ResourceNotExistsError: The 'sqs' resource does not exist. 
The available resources are: 
    - 

私には、それらを収容するクライアントがないと考えられるため、利用可能なリソースはありません。

私は数日間、これに対して私の頭を叩いていました。どんな助けにも感謝!

+0

botocoreがインストールされている場所を確認し、 'data'サブディレクトリを確認してください。また、ディスクから読み取る能力があることを確認する必要があります。 –

+0

こんにちはJordan、データのサブディレクトリで何を探していますか?私はそこにendpoints.jsonというファイルを持っていますが、それはこのトレースバックに関連しているようです。 – EmmaOnThursday

+0

どんな理由であれ、botocoreはその 'endpoints.json'ファイルにアクセスすることができず、' boto3'も同様にそのディレクトリ内のデータにアクセスすることができません。私の考えは、それがまったく存在しなかったか、またはあなたの環境がそれがアクセスされるのを妨げるということでした。 –

答えて

0

これは、boto3バンドルがzipファイルであるためです。

"./rebuilt.zip/boto3"

boto3とは、バンチファイルをダウンロードして配布用フォルダに保存することです。あなたのboto3はzipパッケージに収められているので、明らかにそれらのファイルはそこにアクセスできません。

解決策は、zoto内にboto3を配布するのではなく、Spoto環境にboto3をインストールすることです。ここで注意してください。マスターノードとワーカーノードの両方にboto3をインストールしたい場合は、アプリケーションの実装方法によって異なります。安全な賭けは両方にインストールされます。

EMRを使用している場合は、ブートストラップステップを使用して行うことができます。

+0

また、これらのデータファイルを "./rebuilt.zip/boto3"にバンドルすることもできます。 –

+0

ここで同じ問題に直面しています。これは本当にうまくいきます。私はemrにインストールされているので、デフォルトではクラスタです。 – avocado

+0

また、ブートストラップアクションを使用して "pip install boto3"を実行することもできます。日没になるので、botoにフォールバックする必要はありません。 –

関連する問題