2017-05-26 3 views
0

私はいくつかのストリーミングデータを処理するためにFlink 1.2.0 Table APIを使用します。以下は私のコードです:Flink:実行計画のテーブルAPIコピー演算子

val dataTable = myDataStream 
// table A 
val tableA = dataTable 
    .window(Tumble over 5.minutes on 'rowtime as 'w) 
    .groupBy("w, group1, group2") 
    .select("w.start as time, group1, group2, data1.sum as data1, data2.sum as data2") 
tableEnv.registerTable("tableA", tableA) 
// table A sink 
tableA.writeToSink(sinkTableA) 
//... 
// I shoul get some other outputs from TableA output 
//... 
val dataTable = tableEnv.ingest("tableA") 
// table result1 
val result1 = dataTable 
.window(Tumble over 5.minutes on 'rowtime as 'w) 
.groupBy("w, group1") 
.select("w.start as time, group1, data1.sum as data1") 
// result1 sink 
result2.writeToSink(sinkResult1) 
// table result2 
val result2 = dataTable 
.window(Tumble over 5.minutes on 'rowtime as 'w) 
.groupBy("w, group2") 
.select("w.start as time, group2, data2.sum as data1") 
// result2 sink 
result2.writeToSink(sinkResult2) 

このツリーをフリンク実行計画で取得するのを待ちます。 他のFlinkジョブでFlinkストリーミングと同じです。

DataStream_Operators -> TableA_Operators -> TableA_Sink 
             |-> Result1_Operators -> Result1_Sink 
             |-> Result2_Operators -> Result2_Sink 

しかし、私はTableAの同じオペレータの3つのコピーでこれを手に入れます!

DataStream_Operators -> TableA_Operators -> TableA_Sink 
        |-> Copy_of_TableA_Operators -> Result1_Operators -> Result1_Sink 
        |-> Copy_of_TableA_Operators -> Result2_Operators -> Result2_Sink 

結果でこのジョブの大きな入力データでパフォーマンスが低下しています。

これをどのように修正して最適な実行計画を得ることができますか?

Flest Table APIとSQLは実験的な機能で、 は次のバージョンで修正される可能性があります。あなたはTableDataSetまたはDataStreamに変換またはTableSinkに書き込むたびに現在の状態では

答えて

0

、表のAPIは、全体のクエリを変換します。プログラムでは、writeToSinkの3回を呼び出します。これは、完全なクエリが翻訳されるたびに呼び出されることを意味します。

しかし、完全なクエリは何ですか? Tableには、すべてのTable API演算子が適用されています。 TableTableEnvironmentに登録すると、基本的にビューとして登録されます。つまり、その定義(テーブルを定義するすべての演算子)のみが登録されます。したがって、writeToSinkを2回目と3回目に呼び出すと、これらの演算子が再度変換されます。

あなたはDataStreamtableAを翻訳しTableとしてそれを登録する代わりにTableEnvironmentDataStreamを登録する場合は、この問題を解決することができます。これは以下のようになります。

私が知っている
val tableA = ... 
val streamA = tableA.toDataStream[X] // X should be a case class for rows of tableA 
val tableEnv.registerDataStream("tableA", streamA) 

tableEnv.ingest("tableA").writeToSink(sinkTableA) // emit tableA by ingesting the registered DataStream 

、これは表の繰り返し翻訳を回避するための唯一の方法は非常に便利ではありませんが、現時点では。

+0

グループと選択フィールドは私の仕事の設定ファイルから設定可能なので、 'TableA'の結果のためにケースクラスを' DataStream'に変換することはできません。これは原因の1つで、Flink Table APIを使用する理由です。Flink Streamingではなく、入力データと出力データのケースクラスを使用できます。ですから、DataStream変換には 'Row'クラスを使うべきです。しかし、 'resul1'と' resul2'のために 'Row'を次の選択に使うことはできません(グループと選択フィールドは設定可能です)! – SergeySA

+0

'Row'を' val resultStream = result.toDataStream [Row];で使用すると、 "main" org.apache.flink.table.api.ValidationException:指定された入力[group1]を解決できません[f0、f1、 f2、f3、f4] 'となる。私はfiledsリスト 'tableEnv.registerDataStream(" TableA "、resultStream、fields)'で 'Row'を使用しようとしましたが、'例外 "メイン" org.apache.flink.table.api.TableException: (f0:タイムスタンプ、f1:ストリング、f2:ストリング、f3:ロング、f4:ロング)を表に変換できません。 – SergeySA

関連する問題