2017-06-09 5 views
0

1.3のApache FLINK 1.3テーブルAPI rowtime奇妙な行動

public class TumblingWindow { 

    public static void main(String[] args) throws Exception { 
     List<Content> data = new ArrayList<Content>(); 
     data.add(new Content(1L, "Hi")); 
     data.add(new Content(2L, "Hallo")); 
     data.add(new Content(3L, "Hello")); 
     data.add(new Content(4L, "Hello")); 
     data.add(new Content(7L, "Hello")); 
     data.add(new Content(8L, "Hello world")); 
     data.add(new Content(16L, "Hello world")); 

     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

     final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); 

     DataStream<Content> stream = env.fromCollection(data); 

     DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks(
       new BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) { 

        /** 
        * 
        */ 
        private static final long serialVersionUID = 410512296011057717L; 

        @Override 
        public long extractTimestamp(Content element) { 
         return element.getRecordTime(); 
        } 

       }); 

     Table table = tableEnv.fromDataStream(stream2, 
       "urlKey,httpGetMessageCount,httpPostMessageCount" + ",uplink,downlink,statusCode,statusCodeCount,rowtime.rowtime"); 
     table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, urlKey") 
       .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum "); 

     env.execute(); 
    } 

    public static class Content implements Serializable { 

     private String urlKey; 

     private long recordTime; 
     // private String recordTimeStr; 

     private long httpGetMessageCount; 
     private long httpPostMessageCount; 
     private long uplink; 
     private long downlink; 
     private long statusCode; 
     private long statusCodeCount; 

     public Content() { 
      super(); 
     } 

     public Content(long recordTime, String urlKey) { 
      super(); 
      this.recordTime = recordTime; 
      this.urlKey = urlKey; 
     } 

     public String getUrlKey() { 
      return urlKey; 
     } 

     public void setUrlKey(String urlKey) { 
      this.urlKey = urlKey; 
     } 

     public long getRecordTime() { 
      return recordTime; 
     } 

     public void setRecordTime(long recordTime) { 
      this.recordTime = recordTime; 
     } 

     public long getHttpGetMessageCount() { 
      return httpGetMessageCount; 
     } 

     public void setHttpGetMessageCount(long httpGetMessageCount) { 
      this.httpGetMessageCount = httpGetMessageCount; 
     } 

     public long getHttpPostMessageCount() { 
      return httpPostMessageCount; 
     } 

     public void setHttpPostMessageCount(long httpPostMessageCount) { 
      this.httpPostMessageCount = httpPostMessageCount; 
     } 

     public long getUplink() { 
      return uplink; 
     } 

     public void setUplink(long uplink) { 
      this.uplink = uplink; 
     } 

     public long getDownlink() { 
      return downlink; 
     } 

     public void setDownlink(long downlink) { 
      this.downlink = downlink; 
     } 

     public long getStatusCode() { 
      return statusCode; 
     } 

     public void setStatusCode(long statusCode) { 
      this.statusCode = statusCode; 
     } 

     public long getStatusCodeCount() { 
      return statusCodeCount; 
     } 

     public void setStatusCodeCount(long statusCodeCount) { 
      this.statusCodeCount = statusCodeCount; 
     } 

    } 

    private class TimestampWithEqualWatermark implements AssignerWithPunctuatedWatermarks<Object[]> { 

     /** 
     * 
     */ 
     private static final long serialVersionUID = 1L; 

     @Override 
     public long extractTimestamp(Object[] element, long previousElementTimestamp) { 
      // TODO Auto-generated method stub 
      return (long) element[0]; 
     } 

     @Override 
     public Watermark checkAndGetNextWatermark(Object[] lastElement, long extractedTimestamp) { 
      return new Watermark(extractedTimestamp); 
     } 

    } 
} 

では動作しない、次のサンプルコードは、次の例外

 
Exception in thread "main" org.apache.flink.table.api.TableException: The rowtime attribute can only be replace a field with a valid time type, such as Timestamp or Long. 
    at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:450) 
    at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:440) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at org.apache.flink.table.api.StreamTableEnvironment.validateAndExtractTimeAttributes(StreamTableEnvironment.scala:440) 
    at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:401) 
    at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88) 
    at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:53) 

を発生させます。しかし、私はfromDataStreamでstatusCodeCountを削除した場合、このサンプルはせずに正常に実行され例外。

Table table = tableEnv.fromDataStream(stream2, 
       "urlKey,httpGetMessageCount,httpPostMessageCount" + ",uplink,downlink,statusCode,statusCodeCount,rowtime.rowtime"); 
     table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, urlKey") 
       .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum "); 

お勧めはありますか?

+0

これは、最近導入された時刻属性のバグである可能性があります。私はこれを調べて、あなたに戻ってきます。 – twalthr

答えて

0

これはFLINK-6881として提出されたバグです。回避策として、(this documentation draftも参照)を実装する独自のStreamTableSourceを定義することができます。テーブルソースは、テーブルプログラムをよりコンパクトにする基礎となるDataStream APIもうまく隠します。