2015-10-29 21 views
13

Apache Flinkでは、私はタプルのストリームを持っています。本当に単純なものと仮定しましょうTuple1<String>タプルは、値フィールドに任意の値(たとえば、 'P1'、 'P2'など)を持つことができます。可能な値のセットは有限ですが、私は事前にフルセットを知りません(したがって、 'P362'がある可能性があります)。タプルの内部の値に応じて、特定の出力場所にそのタプルを書きたいと思います。だから私は事前に知っている場所(例えばstream.writeCsv("/output/somewhere"))に書き込むために、私は唯一の可能性を発見したドキュメントでFlinkストリーミング:データに応じて1つのデータストリームを異なる出力に出力する方法は?

  • /output/P1
  • /output/P2

、ない方法は:私は、次のファイル構造を持っていると思いますデータの内容が実際にどこで終わっているのかを決定させるということです。

ドキュメントの出力分割については読んでいますが、これは出力先を別の宛先にリダイレクトする方法を提供していないようです(またはこれがどのように機能するのか分かりません) 。

Flink APIでこれを行うことはできますか?そうでない場合は、それを行うことができる第三者の図書館があるのでしょうか、それとも自分でそのようなものを構築しなければなりませんか?

更新

マティアスの提案に続いて、私は、出力パスを決定し、それをシリアル化した後、それぞれのファイルにタプルを書き込みふるい分けシンク機能を思い付きました。私はここに参考にしています。他の人にとっては便利かもしれません。

public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> { 

    private final OutputSelector<IT> outputSelector; 
    private final MapFunction<IT, String> serializationFunction; 
    private final String basePath; 
    Map<String, TextOutputFormat<String>> formats = new HashMap<>(); 

    /** 
    * @param outputSelector  the selector which determines into which output(s) a record is written. 
    * @param serializationFunction a function which serializes the record to a string. 
    * @param basePath    the base path for writing the records. It will be appended with the output selector. 
    */ 
    public SiftingSinkFunction(OutputSelector<IT> outputSelector, MapFunction<IT, String> serializationFunction, String basePath) { 
     this.outputSelector = outputSelector; 
     this.serializationFunction = serializationFunction; 
     this.basePath = basePath; 
    } 


    @Override 
    public void invoke(IT value) throws Exception { 
     // find out where to write. 
     Iterable<String> selection = outputSelector.select(value); 
     for (String s : selection) { 
      // ensure we have a format for this. 
      TextOutputFormat<String> destination = ensureDestinationExists(s); 
      // then serialize and write. 
      destination.writeRecord(serializationFunction.map(value)); 
     } 
    } 

    private TextOutputFormat<String> ensureDestinationExists(String selection) throws IOException { 
     // if we know the destination, we just return the format. 
     if (formats.containsKey(selection)) { 
      return formats.get(selection); 
     } 

     // create a new output format and initialize it from the context. 
     TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection)); 
     StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); 
     format.configure(context.getTaskStubParameters()); 
     format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks()); 

     // put it into our map. 
     formats.put(selection, format); 
     return format; 
    } 

    @Override 
    public void close() throws IOException { 
     Exception lastException = null; 
     try { 
      for (TextOutputFormat<String> format : formats.values()) { 
       try { 
        format.close(); 
       } catch (Exception e) { 
        lastException = e; 
        format.tryCleanupOnError(); 
       } 
      } 
     } finally { 
      formats.clear(); 
     } 

     if (lastException != null) { 
      throw new IOException("Close failed.", lastException); 
     } 
    } 
} 

答えて

6

カスタムシンクを実装できます。あなたのプログラムの使用では

  • org.apache.flink.streaming.api.functions.sink.SinkFunction
  • org.apache.flink.streaming.api.functions.sink.RichSinkFunction

:両方のいずれかから継承

stream.addSink(SinkFunction<T> sinkFunction); 

の代わりstream.writeCsv("/output/somewhere")

+4

ありがとうございました!私は 'FileSinkFunction'の実装をチェックして、私自身も同様のものを思いついた。私は実装を参照のために私の質問に追加しました。 –

関連する問題