2017-03-26 4 views
0

私はこの男に似たパイプラインで連結すること、マップに減らすの出力を送信しようとしている: I would like to chain multiple mapreduce jobs in google app engine in Python は、私は彼の解決策を試してみましたが、それはうまくいきませんでした。 私のパイプラインの流れは次のとおりです。
マップMap1
Reduce1
マップ2
Reduce2
は私がblob_key下ブロブストアにReduce1の出力を保存し、その後、マップMap2からブロブにアクセスしようとしています。しかし、2番目のマップを実行中に次のエラーが発生します。"BadReaderParamsError: Could not find blobinfo for key <blob_key here>"チェーンMapReduces - GoogleのAppEngineの

ここではパイプラインのコードです:

class SongsPurchasedTogetherPipeline(base_handler.PipelineBase): 

    def run(self, filekey, blobkey): 
    bucket_name = app_identity.get_default_gcs_bucket_name() 
    intermediate_output = yield mapreduce_pipeline.MapreducePipeline(
     "songs_purchased_together_intermediate", 
     "main.songs_purchased_together_map1", 
     "main.songs_purchased_together_reduce1", 
     "mapreduce.input_readers.BlobstoreLineInputReader", 
     "mapreduce.output_writers.GoogleCloudStorageOutputWriter", 
     mapper_params={ 
      "blob_keys": blobkey, 
     }, 
     reducer_params={ 
      "output_writer": { 
       "bucket_name": bucket_name, 
       "content_type": "text/plain", 
      } 
     }, 
     shards=1) 
    yield StoreOutput("SongsPurchasedTogetherIntermediate", filekey, intermediate_output) 

    intermediate_output_key = yield BlobKey(intermediate_output) 
    output = yield mapreduce_pipeline.MapreducePipeline(
     "songs_purchased_together", 
     "main.songs_purchased_together_map2", 
     "main.songs_purchased_together_reduce2", 
     "mapreduce.input_readers.BlobstoreLineInputReader", 
     "mapreduce.output_writers.GoogleCloudStorageOutputWriter", 
     mapper_params=(intermediate_output_key), 
     reducer_params={ 
      "output_writer": { 
       "bucket_name": bucket_name, 
       "content_type": "text/plain", 
      } 
     }, 
     shards=1) 
    yield StoreOutput("SongsPurchasedTogether", filekey, output) 

、ここでは、中間出力を受け取り、マップ2が使用するblobキーを生成blobKeyにクラスです:

class BlobKey(base_handler.PipelineBase): 

    def run(self, output): 
    blobstore_filename = "/gs" + output[0] 
    blobstore_gs_key = blobstore.create_gs_key(blobstore_filename) 
    return { 
     "blob_keys": blobstore_gs_key 
    } 

StoreOutputクラスが同じですGoogleのMapReduceデモのhttps://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/demo/main.pyのもので、BlobKeyクラスと同じことを行いますが、blobのURLをリンクとしてHTMLに追加送信します。

URL appname/blobstore/<blob_key>を手動でブラウザーに入力すると(Reduce1が成功した後にMap2が失敗した後)、Reduce1からの出力が表示されます。 Map2でBLOBを見つけることができないのはなぜですか?申し訳ありませんが、私はAppEngineの初心者です。ブロブストレージを完全に理解していないため、おそらくどこかに間違っています。

答えて

0

さて、私は、GoogleがGAE GitHubリポジトリの標準ライターのリストからBlobstoreOutputWriterを削除したことを知りました。これは少し複雑になります。私はGoogle Cloud Storeに書き込んでそこから読まなければなりませんでした。私は、GoogleCloudStorageInputReaderのマッパーパラメータを生成するヘルパークラスを作成しました。

class GCSMapperParams(base_handler.PipelineBase): 

    def run(self, GCSPath): 
    bucket_name = app_identity.get_default_gcs_bucket_name() 
    return { 
      "input_reader": { 
       "bucket_name": bucket_name, 
       "objects": [path.split('/', 2)[2] for path in GCSPath], 
      } 
     } 

関数は、引数としてGoogleCloudStorageOutputWriterを使用するもののMapReduce段の出力を受け取り、次のMapReduceステージのmapper_paramsに割り当てることができる辞書を返します。

基本的に、最初のMapReduceステージの出力の値は、<app_name>/<pipeline_name>/key/output-[i]を含むリストです(iはシャードの数です)。 GoogleCloudStorageInputReaderを使用するには、データのキーを変数mapper_paramsに渡す必要があります。キーの形式はkey/output-[i]なので、ヘルパークラスは単に<app_name>/<pipeline_name>/を削除します。

関連する問題