2017-12-22 12 views
0

私はconfigure a TableSource with a rowtime attributeのドキュメントに従っています。Flink 1.4カラム 'rowtime'がテーブルに見つかりません

私は

KafkaTableSource source = Kafka08JsonTableSource.builder()// set Kafka topic 
      .forTopic("alerting") 
      // set Kafka consumer properties 
      .withKafkaProperties(getKafkaProperties()) 
      // set Table schema 
      .withSchema(TableSchema.builder() 
        .field("tenant", Types.STRING()) 
        .field("message", Types.STRING()) 
        .field("frequency", Types.LONG()) 
        .field("timestamp", Types.SQL_TIMESTAMP()).build()) 
      .failOnMissingField(true) 
      .withRowtimeAttribute(
        // "timestamp" is rowtime attribute 
        "timestamp", 
        // value of "timestamp" is extracted from existing field with same name 
        new ExistingField("timestamp"), 
        // values of "timestamp" are at most out-of-order by 30 seconds 
        new BoundedOutOfOrderTimestamps(TimeUnit.DAYS.toMillis(1))) 
      .build(); 

    //register the alerting topic as kafka 
    tEnv.registerTableSource("kafka", source); 

    Table results = tEnv.sqlQuery("SELECT tenant, message, SUM(frequency) " + 
      "FROM kafka " + 
      "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message"); 

    tEnv.toAppendStream(results, Row.class).print(); 

を次のようにtimestampフィールドを登録し、次のエラーを取得:kafkatimestampなくrowtimeと呼ばれ、あなたのテーブルのフィールドを

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 64 to line 1, column 70: Column 'rowtime' not found in any table at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:93) at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:561) at oracle.flink.demo.KafkaSQLStream.main(KafkaSQLStream.java:62) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 64 to line 1, column 70: Column 'rowtime' not found in any table at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)

答えて

0

を。したがって、rowtimeの代わりにtimestampという名前で属性を呼び出す必要があります。あなたがtimestamp属性の名前を変更するか、バッククォート( `)で、属性名をエスケープする必要がありますいずれかのようTIMESTAMPは、SQLのキーワードであることを

注:ところで

tEnv.sqlQuery(
    "SELECT tenant, message, SUM(frequency) " + 
    "FROM kafka " + 
    "GROUP BY HOP(`timestamp`, INTERVAL '1' SECOND, INTERVAL '5' SECOND), tenant, message"); 

。 1日のBoundedOutOfOrderTimestampsは実際にはかなり多いです。これにより、結果が出されて状態が破棄されるまでに1日分のデータが収集されるため、処理のレイテンシと状態のサイズが大きくなる可能性があります。

+0

"スレッドで例外が発生しました"メイン "org.apache.flink.runtime.client.JobSubmissionException:BlobServerアドレスを取得できませんでした。"タイムスタンプを更新するように変更した後 – Xuan

+0

"SELECTテナント、メッセージ、SUM(周波数)" + "カフカFROM" + "HOP BY GROUP(更新、INTERVAL '1' SECOND、INTERVAL '5' SECOND)、テナント、メッセージ" スレッド内 例外」メイン "org.apache.flink.runtime.client.JobSubmissionException:BlobServerアドレスを取得できませんでした。 org.apache.flink.runtime.client.JobSubmissionClientActor $ 1.callで \t(JobSubmissionClientActor.java:166)akka.dispatch.Futures $$ anonfun $将来の$ 1.applyで \t(Future.scala:97) \tでscala.concurrent.impl.Future $ PromiseCompletingRunnable.liftedTree1 $ 1(Future.scala:24) – Xuan

+0

Kafka ## JsonTableSource + SQL + windowingのサンプル/ githubがあれば、それは素晴らしいでしょう! – Xuan

関連する問題