Google Cloud Storageにデータを保存してデータをアーカイブするためのデータフロープロセスを構築しようとしています。私は、client_idといくつかのメタデータを含むイベントデータのPubSubストリームを持っています。このプロセスはすべての着信イベントをアーカイブする必要があるため、これはストリーミングパイプラインである必要があります。データフローを使用したGoogle Cloud Storageへの要素値の書き込み
私は受け取った各イベントをgs://archive/client_id/eventdata.jsonのようなバケットに入れてイベントをアーカイブすることができます。それはdataflow/apacheビーム内で行うことが可能ですか?具体的には、PCollectionの各イベントに異なる名前を割り当てることができますか?
EDIT: だから私のコードは、現在のようになります。私は、新しいクライアントが自分のデータを保存することができるようにするために起こるたびに再デプロイする必要があるため
public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {
private String customerId;
public PerWindowFiles(String customerId) {
this.customerId = customerId;
}
@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
String filename = bucket+"/"+customerId;
return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
public static void main(String[] args) throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<Event> set = p.apply(PubsubIO.readStrings()
.fromTopic("topic"))
.apply(new ConvertToEvent()));
PCollection<KV<String, Event>> events = labelEvents(set);
PCollection<KV<String, EventGroup>> sessions = groupEvents(events);
String customers = System.getProperty("CUSTOMERS");
JSONArray custList = new JSONArray(customers);
for (Object cust : custList) {
if (cust instanceof String) {
String customerId = (String) cust;
PCollection<KV<String, EventGroup>> custCol = sessions.apply(new FilterByCustomer(customerId));
stringifyEvents(custCol)
.apply(TextIO.write()
.to("gs://archive/")
.withFilenamePolicy(new PerWindowFiles(customerId))
.withWindowedWrites()
.withNumShards(3));
} else {
LOG.info("Failed to create TextIO: customerId was not String");
}
}
p.run()
.waitUntilFinish();
}
このコードは醜いです。顧客データを適切なバケットに動的に割り当てることができるようにしたいと考えています。
あなたの試したことのコード例と、ソフトウェア開発で直面している困難を教えてください。 – Fabien
これは、私が持っている(編集された)作業コードです。これは、配布するmavenコマンドの一部として渡されるすべてのクライアントのJSON配列に依存しています。明らかに最適ではないコードです。 –