2017-07-18 11 views
1

Google Cloud Storageにデータを保存してデータをアーカイブするためのデータフロープロセスを構築しようとしています。私は、client_idといくつかのメタデータを含むイベントデータのPubSubストリームを持っています。このプロセスはすべての着信イベントをアーカイブする必要があるため、これはストリーミングパイプラインである必要があります。データフローを使用したGoogle Cloud Storageへの要素値の書き込み

私は受け取った各イベントをgs://archive/client_id/eventdata.jsonのようなバケットに入れてイベントをアーカイブすることができます。それはdataflow/apacheビーム内で行うことが可能ですか?具体的には、PCollectionの各イベントに異なる名前を割り当てることができますか?

EDIT: だから私のコードは、現在のようになります。私は、新しいクライアントが自分のデータを保存することができるようにするために起こるたびに再デプロイする必要があるため

public static class PerWindowFiles extends FileBasedSink.FilenamePolicy { 

private String customerId; 

public PerWindowFiles(String customerId) { 
    this.customerId = customerId; 
} 

@Override 
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) { 
    String filename = bucket+"/"+customerId; 
    return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE); 
} 

@Override 
public ResourceId unwindowedFilename(
    ResourceId outputDirectory, Context context, String extension) { 
    throw new UnsupportedOperationException("Unsupported."); 
} 
} 


public static void main(String[] args) throws IOException { 
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args) 
    .withValidation() 
    .as(DataflowPipelineOptions.class); 
options.setRunner(DataflowRunner.class); 
options.setStreaming(true); 
Pipeline p = Pipeline.create(options); 

PCollection<Event> set = p.apply(PubsubIO.readStrings() 
            .fromTopic("topic")) 
    .apply(new ConvertToEvent())); 

PCollection<KV<String, Event>> events = labelEvents(set); 
PCollection<KV<String, EventGroup>> sessions = groupEvents(events); 

String customers = System.getProperty("CUSTOMERS"); 
JSONArray custList = new JSONArray(customers); 
for (Object cust : custList) { 
    if (cust instanceof String) { 
    String customerId = (String) cust; 
    PCollection<KV<String, EventGroup>> custCol = sessions.apply(new FilterByCustomer(customerId)); 
      stringifyEvents(custCol) 
       .apply(TextIO.write() 
               .to("gs://archive/") 
               .withFilenamePolicy(new PerWindowFiles(customerId)) 
               .withWindowedWrites() 
               .withNumShards(3)); 
    } else { 
    LOG.info("Failed to create TextIO: customerId was not String"); 
    } 
} 

p.run() 
    .waitUntilFinish(); 
} 

このコードは醜いです。顧客データを適切なバケットに動的に割り当てることができるようにしたいと考えています。

+0

あなたの試したことのコード例と、ソフトウェア開発で直面している困難を教えてください。 – Fabien

+0

これは、私が持っている(編集された)作業コードです。これは、配布するmavenコマンドの一部として渡されるすべてのクライアントのJSON配列に依存しています。明らかに最適ではないコードです。 –

答えて

2

"ダイナミックデスティネーション" - 書き込まれる要素に基づいてファイル名を選択する - は、まだリリースされていないBeam 2.1.0で利用できる新しい機能になります。

+0

ちょっと残念ですが、答えに感謝します。 Beam 2.1.0リリースの予定タイムラインはありますか? –

+0

私たちは現在、リリース候補に投票しています。ブロッキングの問題がなければ、数日しかかかりません。 –

関連する問題