私は2つのストリームが1つのIntと他のjsonです.Jsonのスキーマには、いくつかのintである1つのキーがあります。私は他の整数ストリームとのキー比較を介してjsonストリームをフィルタリングする必要がありますフリンク?Apache Flinkストリームを他の方法でフィルタリングするにはどうすればよいですか?
0
A
答えて
1
はい、この種のストリーム処理はFlinkで行うことができます。あなたがFLINKから必要な基本的なビルディングブロックは、ストリーム、およびステートフル機能が接続されている - ここRichCoFlatMapを使用した例です:
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
public class Connect {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> control = env.fromElements(
new Event(17),
new Event(42))
.keyBy("key");
DataStream<Event> data = env.fromElements(
new Event(2),
new Event(42),
new Event(6),
new Event(17),
new Event(8),
new Event(42)
)
.keyBy("key");
DataStream<Event> result = control
.connect(data)
.flatMap(new MyConnectedStreams());
result.print();
env.execute();
}
static final class MyConnectedStreams
extends RichCoFlatMapFunction<Event, Event, Event> {
private ValueState<Boolean> seen = null;
@Override
public void open(Configuration config) {
ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
// state name
"have-seen-key",
// type information of state
TypeInformation.of(new TypeHint<Boolean>() {
}));
seen = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap1(Event control, Collector<Event> out) throws Exception {
seen.update(Boolean.TRUE);
}
@Override
public void flatMap2(Event data, Collector<Event> out) throws Exception {
if (seen.value() == Boolean.TRUE) {
out.collect(data);
}
}
}
public static final class Event {
public Event() {
}
public Event(int key) {
this.key = key;
}
public int key;
public String toString() {
return String.valueOf(key);
}
}
}
この例では、制御ストリーム上で見られているキーのみが通過していますデータストリーム - 他のすべてのイベントは除外されます。私はFlink's managed keyed stateとconnected streamsを利用しました。
これを単純にしておくと、データストリームにJSONが含まれているという要件は無視されていますが、JSONとFlinkを別の場所で使用する方法の例が見つかります。
2つのストリームのタイミングを制御することができないため、結果が非確定的であることに注意してください。イベントタイムスタンプをストリームに追加し、代わりにRichCoProcessFunctionを使用することで、これを管理できます。
関連する問題
- 1. 他のFlinkプログラムからクラスタにFlinkプログラムを提出するにはどうすればよいですか?
- 2. Apache Flinkでストリームをセッション化する方法は?
- 3. Apache FlinkでDynamoDBストリームを使用する
- 4. Apache Flink:ローカル事前集計でウィンドウを計算するにはどうすればよいですか?
- 5. Apache Flink:バックプレッシャーはどのように処理されますか?
- 6. Apache FlinkのFlatMapFunctionで型を忠実に派生させるにはどうすればいいですか?
- 7. Apache Edgentでタプルのストリームをフィルタリングする方法
- 8. Google Developer Tools:[ネットワーク]タブのリクエスト方法(POSTなど)でフィルタリングするにはどうすればよいですか?
- 9. Koaでマルチパートリクエストボディをストリームするにはどうすればよいですか?
- 10. コードカバレッジの分析方法をいくつかフィルタリングするにはどうすればよいですか?
- 11. マルチキャストメッセージのストリームを分析するにはどうすればよいですか?
- 12. テキストフィールドでSharePointリストをフィルタリングするにはどうすればよいですか?
- 13. javascriptでJSONデータをフィルタリングするにはどうすればよいですか?
- 14. Laravelのリレーションシップ列でフィルタリングするにはどうすればよいですか?
- 15. Apache Flinkでは、空のウィンドウに値が出力されるようにストリームをどのように交差させることができますか?
- 16. StreamWriterをストリームに変換するにはどうすればよいですか?
- 17. IntPtrをストリームに変換するにはどうすればよいですか?
- 18. オブジェクトからキーをフィルタリングするにはどうすればよいですか?
- 19. ユーザーリストテーブルのviews_editをフィルタリングするにはどうすればよいですか?
- 20. プライベートユーザをユーザインタラクションにフィルタリングするにはどうすればよいですか?
- 21. WindowsサーバーでApacheをアップグレードするにはどうすればよいですか?
- 22. Apacheでルーティングを設定するにはどうすればよいですか?
- 23. Apache Commons Mail content-typeを1つの方法で指定するにはどうすればよいですか?
- 24. SQLクエリをカーソルクエリまたは他の方法で使用するにはどうすればよいですか?
- 25. gzipをSSLやその他の方法で使用するにはどうすればよいですか?
- 26. DataListをフィルタリングするにはどうすればよいですか?
- 27. RxJSストリームを最新の別のストリームにマップするにはどうすればよいですか?
- 28. JavascriptでRTMPストリームを読むにはどうすればよいですか?
- 29. ApacheのFlinkコネクタを構築する最も良い方法は何ですか?
- 30. ストリームからClosedXMLワークブックを開くにはどうすればよいですか?