1

私は、Pickleファイルを使用するDataFlowパイプラインをリモートで実行しようとしています。 ローカルでは、以下のコードを使用してファイルを呼び出すことができます。パスは、クラウド・ストレージ(:// ... GS):約あるときGoogle Cloud DataFlowジョブのGCSからBLOB(ピクル)ファイルを読み取る方法は?

with open (known_args.file_path, 'rb') as fp: 
    file = pickle.load(fp) 

は、しかし、私はそれが有効ではありません見つける

IOError: [Errno 2] No such file or directory: 'gs://.../.pkl' 

私は種類のそれが機能していないが、私はできない理由を理解しますそれを行う正しい方法を見つける。

答えて

0

あなたはGCSバケット内のファイルをpickeしている場合、あなたはのBLOBとしてそれらを読み込むことができますし、(pickle.load()を使用して)さらに、コードのようにそれらを処理:

class ReadGcsBlobs(beam.DoFn): 
    def process(self, element, *args, **kwargs): 
     from apache_beam.io.gcp import gcsio 
     gcs = gcsio.GcsIO() 
     yield (element, gcs.open(element).read()) 


# usage example: 
files = (p 
     | "Initialize" >> beam.Create(["gs://your-bucket-name/pickle_file_path.pickle"]) 
     | "Read blobs" >> beam.ParDo(ReadGcsBlobs()) 
     ) 
0

open()は、Google Cloud Storageのパスを認識しない標準のPythonライブラリ関数です。代わりにBeam FileSystems APIを使用する必要があります。このファイルは、Beamでサポートされているその他のファイルシステムと認識されています。

関連する問題