私はいくつかのストリーミングデータを処理するためにFlink 1.2.0 Table APIを使用します。以下は私のコードです:Flink:実行計画のテーブルAPIコピー演算子
val dataTable = myDataStream
// table A
val tableA = dataTable
.window(Tumble over 5.minutes on 'rowtime as 'w)
.groupBy("w, group1, group2")
.select("w.start as time, group1, group2, data1.sum as data1, data2.sum as data2")
tableEnv.registerTable("tableA", tableA)
// table A sink
tableA.writeToSink(sinkTableA)
//...
// I shoul get some other outputs from TableA output
//...
val dataTable = tableEnv.ingest("tableA")
// table result1
val result1 = dataTable
.window(Tumble over 5.minutes on 'rowtime as 'w)
.groupBy("w, group1")
.select("w.start as time, group1, data1.sum as data1")
// result1 sink
result2.writeToSink(sinkResult1)
// table result2
val result2 = dataTable
.window(Tumble over 5.minutes on 'rowtime as 'w)
.groupBy("w, group2")
.select("w.start as time, group2, data2.sum as data1")
// result2 sink
result2.writeToSink(sinkResult2)
このツリーをフリンク実行計画で取得するのを待ちます。 他のFlinkジョブでFlinkストリーミングと同じです。
DataStream_Operators -> TableA_Operators -> TableA_Sink
|-> Result1_Operators -> Result1_Sink
|-> Result2_Operators -> Result2_Sink
しかし、私はTableAの同じオペレータの3つのコピーでこれを手に入れます!
DataStream_Operators -> TableA_Operators -> TableA_Sink
|-> Copy_of_TableA_Operators -> Result1_Operators -> Result1_Sink
|-> Copy_of_TableA_Operators -> Result2_Operators -> Result2_Sink
結果でこのジョブの大きな入力データでパフォーマンスが低下しています。
これをどのように修正して最適な実行計画を得ることができますか?
Flest Table APIとSQLは実験的な機能で、 は次のバージョンで修正される可能性があります。あなたはTable
がDataSet
またはDataStream
に変換またはTableSink
に書き込むたびに現在の状態では
グループと選択フィールドは私の仕事の設定ファイルから設定可能なので、 'TableA'の結果のためにケースクラスを' DataStream'に変換することはできません。これは原因の1つで、Flink Table APIを使用する理由です。Flink Streamingではなく、入力データと出力データのケースクラスを使用できます。ですから、DataStream変換には 'Row'クラスを使うべきです。しかし、 'resul1'と' resul2'のために 'Row'を次の選択に使うことはできません(グループと選択フィールドは設定可能です)! – SergeySA
'Row'を' val resultStream = result.toDataStream [Row];で使用すると、 "main" org.apache.flink.table.api.ValidationException:指定された入力[group1]を解決できません[f0、f1、 f2、f3、f4] 'となる。私はfiledsリスト 'tableEnv.registerDataStream(" TableA "、resultStream、fields)'で 'Row'を使用しようとしましたが、'例外 "メイン" org.apache.flink.table.api.TableException: (f0:タイムスタンプ、f1:ストリング、f2:ストリング、f3:ロング、f4:ロング)を表に変換できません。 – SergeySA