2017-08-28 8 views
0

Flinkを使用してキネシスストリームを読みます。タイムウィンドウとキーに基づいて特定のイベントを集約します。コードはreduceの後に何もしません。出力csvにputのデータがマップされていません。私は数分待っています(時間窓がわずか2分であっても)。FlinkおよびKinesisストリームによるストリームウィンドウ処理が機能していません。

public static void main(String[] args) throws Exception { 

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
    env.enableCheckpointing(CommonTimeConstants.TWO_MINUTES.toMilliseconds()); 
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(1, TimeUnit.MINUTES))); 

    Properties consumerConfig = new Properties(); 
    consumerConfig.put(ConsumerConfigConstants.AWS_REGION, PropertyFileUtils.get("aws.region", "")); 
    consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, PropertyFileUtils.get("aws.accessKeyId", "")); 
    consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, PropertyFileUtils.get("aws.secretAccessKey", "")); 
    consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); 

    DataStream<APIActionLog> apiLogRecords = env.addSource(new FlinkKinesisConsumer<>(
      ProjectProperties.SOURCE_ENV_PREFIX, // stream name 
      new StreamedApiLogRecordDeserializationSchema(), 
      consumerConfig)); 

    apiLogRecords.assignTimestampsAndWatermarks(API_LOG_RECORD_BOUNDED_OUT_OF_ORDERNESS_TIMESTAMP_EXTRACTOR); 

    DataStream<Tuple7<String, String, String, String, Timestamp, String, Integer>> skuPlatformTsCount = 
      apiLogRecords.flatMap(collecting events...) 
        .keyBy(Key based on some parameters of the event...) 
        .timeWindow(TWO_MINUTES) 
        .reduce(adding up event parameter..., window function...) 
        .map(Map to get a different tuple format...); 

    skuPlatformTsCount.writeAsCsv("/Users/uday/Desktop/out.csv", FileSystem.WriteMode.OVERWRITE); 

    env.execute("Processing ATC Log Stream"); 
} 

private static final BoundedOutOfOrdernessTimestampExtractor<APIActionLog> API_LOG_RECORD_BOUNDED_OUT_OF_ORDERNESS_TIMESTAMP_EXTRACTOR = 
     new BoundedOutOfOrdernessTimestampExtractor<APIActionLog>(TEN_SECONDS) { 
      private static final long serialVersionUID = 1L; 

      @Override 
      public long extractTimestamp(APIActionLog apiActionLog) { 
       return apiActionLog.getTs().getTime(); 
      } 
     }; 

答えて

0

愚かな間違いでした。

apiLogRecords.assignTimestampsAndWatermarks(API_LOG_RECORD_BOUNDED_OUT_OF_ORDERNESS_TIMESTAMP_EXTRACTOR); 

呼び出しは、割り当てられた透かしを含む新しいストリームを返します。この戻り値は、後の操作で使用する必要があります。

関連する問題