2016-06-15 9 views
0

ストリームのSQLクエリを評価するために、1.1-SNAPSHOTバージョンのApache FlinkのテーブルAPIを使用しています。続きorg.apache.flink.api.table.TableException: "フィールド参照式のエイリアスが必要です"

は私のコードです:私はこのプログラムを実行すると

private static final int MAX_RACK_ID = 10; 
private static final long PAUSE = 100; 
private static final double TEMP_STD = 20; 
private static final double TEMP_MEAN = 80; 

public static void main(String[] args) 
{ 
    StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
    StreamTableEnvironment tableEnv=TableEnvironment.getTableEnvironment(env); 

    DataStream<MonitoringEvent> dstream = env.addSource(new MonitoringEventSource(MAX_RACK_ID, PAUSE, TEMP_STD, TEMP_MEAN)); 
    tableEnv.registerDataStream("TemperatureData", dstream,"rackid,temperature,timestamp"); 

    Table tab1 = tableEnv.sql("select STREAM rackid,temperature,timestamp from TemperatureData where temperature>=100"); 
    DataStream<TemperatureEvent>tempstream=tableEnv.toDataStream(tab1, TemperatureEvent.class); 
    tempstream.print(); 
} 

、それは次の例外がスローされます。

Exception in thread "main" org.apache.flink.api.table.TableException: Alias on field reference expression expected. 
    at org.apache.flink.api.table.TableEnvironment$$anonfun$4.apply(TableEnvironment.scala:299) 
    at org.apache.flink.api.table.TableEnvironment$$anonfun$4.apply(TableEnvironment.scala:292) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) 
    at org.apache.flink.api.table.TableEnvironment.getFieldInfo(TableEnvironment.scala:292) 
    at org.apache.flink.api.table.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:212) 
    at org.apache.flink.api.java.table.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:130) 
    at com.yash.flink.Program.main(Program.java:31) 

私はいくつか質問があります:何ですか

  • をApache FlinkのTable APIを使ってストリーム上にSQLクエリを書く方法?
  • Flinkでこのクエリを実装するにはどうすればよいですか?
  • これはFlink's Table APIのバグですか?
+0

StreamSQLはApache Flinkの未リリースの機能です。 Stack OverflowはSNAPSHOTのバージョンが非常に一時的で、これらのバージョンに関する質問は他の人にはあまり役に立ちませんので、SNAPSHOTのバージョンに関する質問をするには良い場所ではありません。そのような場合は、メーリングリストにメールを投稿したり、バグレポートを開いたりするなど、開発者コミュニティと直接対話する方がよいでしょう。 –

+0

Ok .. Flink 1.1バージョンの正式リリース日は何ですか?ストリームSQLのサポートが含まれますか? –

+0

正確なリリース日はApache Flinkコミュニティによって異なります。最近数週間でリリースを切ることについての議論が始まった。現在のマスターブランチのすべてがリリースの一部(Stream SQLを含む)になります。 –

答えて

0

テーブルAPIには制限事項があります。この問題は、DataStream<MonitoringEvent>をテーブルとして登録することによって発生します。あなたはする必要があります

tableEnv.registerDataStream(
    "TemperatureData", 
    dstream, 
    "rackid AS rackid, temperature AS temperature, timestamp AS timestamp" 
); 

それを動作させる必要があります。 Stream SQLがFlink 1.1.0でリリースされる前に問題が解決されていることを確認します。

+0

あなたの解決策は、私がその問題を解決するのを助けました。しかし、その後、もう1つの例外が発生しました: '例外がメイン" java.lang.NoSuchMethodError:org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/shaded/co.jp/google/common/collect/ImmutableList。 \t(org.apache.flink.api.table.FlinkPlannerImpl) (FlinkPlannerImpl.scala:50)org.apache.flink.api.table.StreamTableEnvironment.sql(StreamTableEnvironment.scala:127)で \t com.yash.flink.Program.mainで \t(Program.java:31) '..これは方解石の問題ですか?あなたはこれのための解決策を提供していただけますか?方解石に存在するメソッド –

+0

私にはバージョンの競合のように見えます。あなたのプロジェクトはグアバに依存していますか? –

+0

いいえ、自分のプロジェクトにGuavaの依存関係を追加していません。Flink Table APIはguavaバージョン18.0を使用しています。カルサイトバージョン1.7.0を使用しています。 –

関連する問題