GCSバケットからファイルを読み込む必要があります。私はGCS API /クライアントライブラリを使用しなければならないことは知っていますが、関連するサンプルは見つかりません。Apache BeamのGCSからファイルを読み込みます。
私はGCSドキュメントのこのリンクを参照しています: GCS Client Librariesしかし、本当に窪みを作ることはできませんでした。もし誰かが本当に助けになる例を提供できれば。おかげさまで
GCSバケットからファイルを読み込む必要があります。私はGCS API /クライアントライブラリを使用しなければならないことは知っていますが、関連するサンプルは見つかりません。Apache BeamのGCSからファイルを読み込みます。
私はGCSドキュメントのこのリンクを参照しています: GCS Client Librariesしかし、本当に窪みを作ることはできませんでした。もし誰かが本当に助けになる例を提供できれば。おかげさまで
OK。 PCollectionではなく通常のファイルとしてGCSからファイルを読み込みたい場合や、GCS Javaクライアントライブラリに問題がある場合は、Apache Beam FileSystems APIを使用することもできます。
まず、 gs://
ファイルシステムの実装が含まれているあなたはbeam-sdks-java-extensions-google-cloud-platform-core
にあなたのpom.xml
にMavenの依存関係を持っていることを確認する:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
</dependency>
そして、ファイルシステムのAPIを(それが全てのパイプラインで、デフォルトで設定されていますが」設定それをパイプライン外で使用する場合は、手動で行う必要があります)。
PipelineOptions options = PipelineOptionsFactory.create();
// ...Optionally fill in options such as GCP credentials...
// (see GcpOptions class)
FileSystems.setDefaultPipelineOptions(options);
次に、あなたがそれを使用することができます。
ReadableByteChannel chan = FileSystems.open(FileSystems.matchNewResource(
"gs://path/to/your/file", false /* is_directory */));
try (InputStream stream = Channels.newInputStream(chan)) {
// Use regular Java utilities to work with the input stream.
}
私はこのアプローチに従っていて、ローカルマシン上でプログラムを実行していて、以下のエラーが発生しました。 'スレッドの例外 "main" java.lang.IllegalStateException:org.apache.beamのgs \tのレジストラを見つけることができません。 sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447) \t at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:517) '。どんな助け? –
私は私の答えを編集しました、もう一度やり直してください。 – jkff
ありがとう@jkffこれは動作します! – rish0097
AvroIOなどTextIO、のようなすべての変換は、デフォルトではGCSファイルで作業することができ、例えばTextIO.read()。from( "gs://あなたのバケツ/パス/ to/your-files/*")。彼らはあなたのために働いていないのですか? – jkff
こんにちは@jkff私はそれを知っています。なぜ私がクライアントライブラリを使うのか忘れていました。実際に私が直面している問題は、TextIO.read()を使用してファイルを読み込むと、取得するデータがファイルに表示される順序と同じではないということです。私はそれがファイルにあるのとまったく同じ順序でデータを持つ必要があります。これを行う方法? – rish0097
正しいものです.PCollectionは並列に処理され、順序は並列処理とは逆のため、順不同です。つまり、並列パイプラインの中で何らかのシーケンシャルな処理をしたいというケースは間違いありません。あなたのユースケースと注文が必要な理由についてもっと教えてください。 – jkff