私はApache Flink(1.3.1)で作業しているときに初めて質問をします。より詳しくは、私はflink-core、flink-cep、flink-streamingライブラリを扱っています。私のアプリケーションはAkka ActorSystemで、RabbitMQからのメッセージを消費し、さまざまなアクターがこのメッセージを処理します。いくつかのアクターでは、FlinkからStreamExecutionEnvironment
をインスタンス化して、着信メッセージを処理したいと思っています。したがって、RichSourceFunction
クラスを拡張するカスタムソースクラスを作成しました。 1つのことを除いてすべてが動作します:私はFlink拡張機能にデータを送信する方法がわかりません。ここに私のセットアップは次のとおりです。カスタムソースからデータを連続的に書き出します。
public class FlinkExtension {
private static StreamExecutionEnvironment environment;
private DataStream<ValueEvent> input;
private CustomSourceFunction function;
public FlinkExtension(){
environment = StreamExecutionEnvironment.getExecutionEnvironment();
function = new CustomSourceFunction();
input = environment.addSource(function);
PatternStream<ValueEvent> patternStream = CEP.pattern(input, _pattern());
DataStream<String> warnings = patternStream.select(new PatternSelectFunction<ValueEvent, String>() {
@Override
public String select(Map<String, List<ValueEvent>> pattern) throws Exception {
return null; //TODO
}
});
warnings.print();
try {
environment.execute();
} catch(Exception e){
e.printStackTrace();
}
}
private Pattern<ValueEvent, ?> _pattern(){
return Pattern.<ValueEvent>begin("first").where(new SimpleCondition<ValueEvent>() {
@Override
public boolean filter(ValueEvent value) throws Exception {
return value.getValue() > 10;
}
});
}
public void sendData(ValueEvent value){
function.sendData(value);
}
}
そして、これは私のカスタムソース関数である:
public class CustomSourceFunction extends RichSourceFunction<ValueEvent> {
private volatile boolean run = false;
private SourceContext<ValueEvent> context;
@Override
public void open(Configuration parameters){
run = true;
}
@Override
public void run(SourceContext<ValueEvent> ctx) throws Exception {
this.context = ctx;
while (run){
}
}
public void sendData(ValueEvent value){
this.context.collectWithTimestamp(value, Calendar.getInstance().getTimeInMillis());
}
@Override
public void cancel() {
run = false;
}
}
だから私は、私への連続的な方法でデータを書き込むために外部からの私のFlinkExtension
クラスのメソッドsendData
を呼び出したいですFlinkExtension
。私のJUnitテストでは、データをエクステンションに送信してから、SourceContext
にデータを書き込む必要があります。
@Test
public void testSendData(){
FlinkExtension extension = new FlinkExtension();
extension.sendData(new ValueEvent(30));
}
しかし、私は、テストを実行した場合、何も起こりません、アプリケーションがCustomSourceFunction
のrunメソッドでハングアップします。私はまた、CustomSourceFunction
runメソッドで新しいエンドレススレッドを作成しようとしました。
要約:誰かが、アプリケーションからのデータをFlinkインスタンスに連続的に書き込む方法を知っていますか?