Flinkには、濃縮のために使用できるいくつかのメカニズムがあります。
すべてのストリームが対応するアイテムに参加するために使用できる共通のキーを共有していると仮定します。
最も簡単な方法は、おそらくRichFlatmap
を使用し、open()メソッド(docs about rich functions)で静的濃縮データを読み込むことです。これは、濃縮データが静的である場合、または濃縮データを更新するたびに濃縮作業を再開する意思がある場合にのみ適しています。
以下に説明する他の方法では、濃縮されたデータを管理されたキー状態(docs about working with state in Flinkを参照)として保存する必要があります。これにより、失敗した場合にFlinkが豊富なジョブを復元して再開できるようになります。
濃縮データに実際にストリームしたいと仮定すると、RichCoFlatmap
が適切です。これは、2つの接続されたストリームをマージまたは結合するために使用できるステートフル演算子です。しかし、RichCoFlatmap
では、ストリーム要素のタイミングを考慮する能力がありません。たとえば、一方のストリームが他方のストリームよりも先に進んでいるか遅れているのかについて懸念があり、反復可能で決定論的な方法で濃縮を実行したい場合は、CoProcessFunction
を使用するのが適切な方法です。
Apache Flink training materialsに詳細な例とコードが記載されています。
参加するストリーム数が多い場合(たとえば10)、2入力の一連の2入力CoProcessFunction
演算子をカスケードすることはできますが、それはある点ではかなり不愉快になります。代わりに、結合演算子を使用してすべてのメタデータストリームを結合します(これにはすべてのストリームが同じタイプでなければならないことに注意してください)。RichCoFlatmap
またはCoProcessFunction
は、このユニファイドエンリッチメントストリームをプライマリストリームに結合します。
更新:
FLINKの表とSQL APIは、ストリームの濃縮のためにも使用することができ、かつFLINK 1.4は、内側のストリーミングタイムウィンドウをジョイン追加することにより、このサポートを拡張します。 Table API joinsおよびSQL joinsを参照してください。たとえば、次の出荷が配置されるための4桁以内に発生した場合
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
この例では、それらの対応する出荷と注文を結合します。
ありがとうございました!私は、独自のCoProcessFunctionとカスタム状態管理のセットを使ってユースケースを解決することができました.PlinkはDSLスタイルのAPIに欠けている2つの問題を解決しました。スライディングウィンドウ(つまり、重なり合ったウィンドウの重なりではなく、残念ながら、フリンクコミュニティはスライディングウィンドウと呼ばれます)。しかし、Jiraにはこれらの機能の両方に発行チケットがあると私は信じています。 – averas
あなたは解決策を見つけてうれしいです。私は、あなたのユースケースに適しているかもしれない時間ウィンドウの結合のためのテーブルとSQL APIの使用についてのアップデートを追加しました。今週末にリリースされるはずのFlink 1.4にこれが登場しています。 –