flink事件时间引起的问题

350人浏览 / 0人评论

Flink引入了事件时间(eventTime)这个重要概念,从而提升数据统计的准确性。但是,引入事件时间后在具体业务实现时存在一些问题必需要合理去解决,否则会造成非常严重的问题。事件时间存在什么样的问题呢?下面先看一个简单的业务场景。比如:要统计APP上搜索按钮每1分钟的点击次数,通常我们是这样设计如下水印处理器:

public static class TimestampExtractor implements AssignerWithPeriodicWatermarks {
 
 private long currentMaxTimestamp = 0L;
 
 public Watermark getCurrentWatermark() {
  return new Watermark(currentMaxTimestamp -3000);
 }
 
 public long extractTimestamp(Tuple2 tuple, long previousElementTimestamp) {
  long eventTime = tuple.f1;
  currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime);
  return eventTime;
 }
}

extractTimestamp方法会拿事件时间和上一次事件时间比较,并取较大值来更新当前水印值。但是,假设前端发送了以下这些数据,方便直观看数据直接采用格式化后的值,并以逗号分隔数据。

事件类型 事件发生时间
ClickSearchButton 2020-12-17 13:30:00
ClickSearchButton 2020-12-17 13:30:01
ClickSearchButton 2020-12-17 13:30:02
ClickSearchButton 2020-12-17 13:30:03
ClickSearchButton 2020-12-17 13:35:00
ClickSearchButton 2020-12-17 13:30:04
ClickSearchButton 2020-12-17 13:30:05

正常数据都是13:30,突然来了一条13:33的数据,再结合上面的水印逻辑,一旦出现这种问题数据,直接导致水位上升到13:33,后面再来13:30的数据全部无法处理。针对业务来讲这样的错误是致命的,统计结果出现断层。

针对以上问题我们可以对水印实现类做如下改造:

public static class TimestampExtractor implements AssignerWithPeriodicWatermarks {
 
 private long currentMaxTimestamp = 0L;
 
 public Watermark getCurrentWatermark() {
  return new Watermark(currentMaxTimestamp -3000);
 }
 
 public long extractTimestamp(Tuple2 tuple, long previousElementTimestamp) {
  long eventTime = tuple.f1;
  if((currentMaxTimestamp == 0) || (eventTime - currentMaxTimestamp < MESSAGE_FORWARD_TIME)) {
            currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTime);
        }
  return eventTime;
 }
}

MESSAGE_FORWARD_TIME 变量是自定义的消息最大跳跃时间,如果超出这个范围则不更新最大水印时间。

全部评论

晴天下起了小雨
2017-10-01 18:00
很喜欢,果断关注了
wjmyly7336064
2017-10-01 18:00
相当实用,赞美了
橘大佬
2017-10-01 18:00
就是有些细节再到位点就好了…