flink 水印 时间戳

300人浏览 / 0人评论
flink水印就是一个时间戳

flink水印就是一个时间戳,它给每个消息添加一个允许一定延迟的时间戳。窗口可以继续计算一定时间范围内延迟的消息,添加水印后,窗口会等 n 秒,再执行计算。若超过 n 秒,则舍弃。窗口执行计算时间由 水印时间 来触发,当接收到消息的 watermark >= endtime ,触发计算。Flink提供添加水印的API如下所示:

val watermarkData: DataStream[Message] =
clicklogDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message]{

   var currentTimestamp: Long = 0L
   val maxDelayTime = 5000L
   var watermark: Watermark = null

   // 获取当前的水印
   override def getCurrentWatermark = {
    watermark = new Watermark(currentTimestamp - maxDelayTime)
    watermark
  }

   // 时间戳抽取操作
   override def extractTimestamp(t: Message, l: Long) = {
    val timeStamp = t.timestamp
    currentTimestamp = Math.max(timeStamp, currentTimestamp)
    currentTimestamp
  }

 })

全部评论

晴天下起了小雨
2017-10-01 18:00
很喜欢,果断关注了
wjmyly7336064
2017-10-01 18:00
相当实用,赞美了
橘大佬
2017-10-01 18:00
就是有些细节再到位点就好了…