2017-06-28 20 views
1

実行時にFlinkアプリケーションを設定することはできますか?たとえば、入力を読み取り、変換を行った後、特定のしきい値以下のすべての要素をフィルタリングするストリーミングアプリケーションがあります。しかし、私は実行時にこのしきい値を設定できるようにしておきます。つまり、私のフリンクジョブを再起動することなくこれを変更することができます。例コード:実行時にflinkジョブを設定する方法は?

DataStream<MyModel> myModelDataStream = // get input ... 
       // do some stuff ... 
       .filter(new RichFilterFunction<MyModel>() { 
        @Override 
        public boolean filter(MyModel value) throws Exception { 
         return value.someValue() > someGlobalState.getThreshold(); 
        } 
       }) 
       // write to some sink ... 

DataStream<MyConfig> myConfigDataStream = // get input ... 
       // ... 
       .process(new RichProcessFunction<MyConfig>() { 
         someGlobalState.setThreshold(MyConfig.getThreshold()); 
       }) 
       // ... 

これは何らかの可能性がありますか?たとえば、構成ストリームを通じて変更できるグローバル状態と同じです。

答えて

4

はい、これはRichCoFlatMapで行うことができます。おおよそこのようなもの:

DataStream<MyModel> myModelDataStream = // get input ... 
DataStream<Long> controlStream = // get input ... 

DataStream<MyModel> result = controlStream 
    .broadcast() 
    .connect(myModelDataStream) 
    .flatMap(new MyCoFlatMap()); 

public class MyCoFlatMap extends RichCoFlatMapFunction<Long, MyModel, MyModel> { 
    private ValueState<Long> threshold; 

    @Override 
    public void open(Configuration conf) { 
     ValueStateDescriptor<Long> descriptor = 
      new ValueStateDescriptor<>("configuration", Long.class); 
     threshold = getRuntimeContext().getState(descriptor); 
    } 

    @Override 
    public void flatMap1(Long newthreshold, Collector<MyModel> out) { 
     threshold.update(newthreshold); 
    } 

    @Override 
    public void flatMap2(MyModel model, Collector<MyModel> out) { 
     if (threshold.value() == null || model.getData() > threshold.value()) { 
      out.collect(model); 
     } 
    } 
} 
関連する問題