居然能通过大保健来理解Flink中的时间语义
#
本篇要解决的问题
Processing Time、Event time、Ingestion Time的不同点?
请解释下Watermark。
如何解决延迟数据问题?
在Flink中如果Source并行度大于1,Watermark是一致的吗?
如果一个Task会从多个输入消费数据,因为涉及到多个数据流入,如何确定它的时间呢?
如果某个Source的Task,一直闲置,而另外一个Task是运行状态,会出现什么样的问题?如何解决?
近两年,Flink状态流处理变得越来越流行起来。其实最开始的时候,流处理都是无状态的。例如:Storm。
流处理在当时的定义就是更快地批处理。所以当时流处理基本是有很多是作为ETL的管道。而随着流处理变得越来越重要,流处理开始处理一些复杂业务,必须要进行有状态地处理。现在的框架也都开始支持有状态的流处理。例如:Flink、Structured Streaming等。
在我们学习更多关于状态的知识之前,我们要先来理解Flink中的时间语义。这在Flink中可以说非常核心的内容。
1
Flink中的时间语义
在流处理应用中,时间是很关键的。我们可以利用时间来对流中的事件分组、关联。而且,Flink中的window也与时间关联密切。因为,流处理应用是一个一直不停运行的应用程序。我们很多时候,会每隔一段时间了解下数据的情况,所以,需要对流进行定期地快照,也就是按照时间来进行快照。例如:当电商中发布一个秒杀活动时,想要每隔10分钟了解流量数据,这就需要采集10分钟内到达流处理应用中的所有事件,并进行各种计算。
而这个10分钟,可以有不同的解读方式。就像在按摩店上钟的时候,我们总是觉得按摩店的钟走得就是快,一会90分钟就到了。
Processing Time
Processing Time也就是处理时间,它是最容易理解的。例如:Flink引擎运行在的Linux系统上,这个系统上有一个时钟。我们可以基于这个始终来记录时间。我们根据该系统时间为准,来确定时间范围。例如:当前系统时间是12:00,那么12:00 – 12:10之间到达的数据认为10分钟内有效的数据。
听起来感觉挺有道理的,没啥问题啊。但大家考虑以下场景,假如说,某个传感器采集到数据之后,通过网络的方式发送数据。但偏偏网络抖动,导致要发送过来的数据产生了延迟,如果严格按照系统时间来计算,延迟的数据就会导致数据计算问题。而系统时间根本不关心,只需要看它自己的时钟,到点就开始计算。
这种体验是不好的,大家想想,如果你去按摩,预约是凌晨2点,但是因为在公司加班,晚了一会。但技师正常从2点开始上钟,管你来了没来,反正到底了下钟走人。这也太不靠谱了,哪里受得了这种!
Event Time
上面我说了说Processing Time的糟糕体验,只要稍有延迟,就可能会导致数据无法正确计算。而事件时间为围绕事件来的,就是根据数据实际发生的时间作为时钟的标准。针对事件时间的10分钟,是指的传感器那一端:12:00 – 12:10 之间产生的所有数据。此时的10分钟不再和Flink计算引擎所在的操作系统时间有关。而且,Eventime对事件延迟到达非常有用。
这种体验是很好的,如果你去按摩,预约是凌晨2点,但是因为在公司加班,晚到了5分钟。体贴的技师能够等你一会,等到开始给你按摩了,正式上钟打开,然后按照标准的时间给你服务,最后下钟。请问你是选Process Time还是Eventtime?
Ingestion Time
摄入时间是事件进入Flink流处理系统的时间。它介于Event time和Processing Time中间。进入到Flink中的事件都以进入到source operator系统给的时间为准,后续的时间操作也会基于这个时间。与Processing Time对比,它可以确保更稳定的计算结果。与Event time相比,它没法处理延迟数据。在内部看来,它和Event time非常类似,只不过事件的时间戳不是从事件自身提取的,而是根据source operator上自动分配的。
还是之前的例子,你还是去按摩,预约时间还是从凌晨2点开始,还是因为公司加班,晚到了5分钟。但现在的方式是你进到按摩师店,技师就开始打卡上钟。但你突然觉得肚子不舒服,要去厕所拉个屎,等你回来的时候,发现已经10分钟过去了。其实,技师的服务时间根本不是你真正开始享受服务的时间!差评!
2
Watermark(水印)
Watermark与时钟
通过前面的内容,我们知道了Flink中有三种时间语义。Processing Time使用的是系统时间,系统时间是不断变化的。如果使用Processing Time,流处理应用是有一个不断流逝的时钟。但如果是Event time和Ingestion Time呢?使用了这两种时间,就表示不再使用Linux系统上的时钟了。一下没有了时钟,这事大了,难道时光不再流逝?这可不行,时光不流逝就表示我们后续无法针对时间做任何计算。所以,流式系统中时光必须流逝,就必须要有时钟。
我们所要开始聊的Watermark,就是要让流式处理应用能够拥有时间流逝的机制,就是要为Flink应用构建一个时钟。Watermark是流的一部分,是一种特殊的控制事件。它其实是Flink在使用Eventtime(Ingestion Time一样)时,提供的一个逻辑时钟(之前的Processing Time是物理时钟)。
明确两点:
- 水印是要构建一个逻辑时钟,既然是时钟是一定不会回退的,一定是前进的
- 水印和时间戳息息相关,基于它Operator才会有流逝、不断推进的时钟。
大家注意我画的图。可以清晰地看到水印是流中的一部分。而且水印一定是随着时间流逝的。水印中有一个数字,这就是时间戳。通过水印,就构建起了Flink的逻辑时钟。让我们后续做Window要取指定事件时间范围的数据成为可能。
在流中分配Watermark
val text = senv.socketTextStream("localhost", 9999)
.assignTimestampsAndWatermarks(new TimestampExtractor)
上面的代码,就是给流中分配水印。
跟一下源码,我们可以看到是一个名为TimeStampsAndWatermarksOperator在处理,它就是根据传入的WatermarkStrategy类中,提取事件时间,然后生成Watermarker,并发送到流中。
并行执行下的Watermark
了解了Watermark,接下来我们需要进入到分布式环境中。我们知道每一个Task都有可能运行在不同的服务器中,所以,每个Task都必须有一个属于自己的时钟(忘记Linux系统时钟吧)。大家看一下Flink下面这张经典的图。
我们来解读一下,这个部分非常关键,是理解Watermark的核心。
-
图中,并行的任务是独立生成Watermark的。
所以,不存在Watermark同步一说。
不同的Task Watermark是不同的。 - 假设在Source Task上定义了Watermark,那么该Task就是以Watermark作为它的Event time。
-
水印是不断在整个任务图中流动的。
每当Task接收到一个Watermark,都会推进它的时钟(Event time),并且这个Task会继续向下游生成一个新的水印。 -
当一个Task消费多个并行输入的数据时,每个输入中都有Watermark,那Task到底以谁为准呢?
Flink中定义当使用join或者window、keyBy这一类操作,取输入中事件的Watermark时间最小值。 - 所有的Task会随着Source Task不断发出的Watermark以Event time方式来更新自己的时钟
数据延迟问题
如果某个Task的时间被一个Watermark更新为W(12),但后续又一些Event time为8、9、10的事件到达Task,这种情况是有可能出现的。而且,在一些业务中,还有可能事件会无限延迟。我们当然是可以让Watermark延迟更长时间,但这肯定会导致计算延迟,这不是我们希望看到的。
所以,我们需要明确最大能够延迟的时间,超过这个时间,一概不接受。没有人愿意无限制地等待。
IDLE Source(空闲的Source)
Flink也提供了一种非常方便的方式来解决该问题:
WatermarkStrategy.withIdleness(Duration.ofMinutes(1));
看一下源码:
核心就是这个WatermarksWithIdleness类,它创建一个定时器。
如果检测到超过指定的空闲时间,就会将输出流标记为IDLE
当输出流标记为IDLE后,下游Task将不再等待输出的watermark了。
#
总结:
Processing Time、Event time、Ingestion Time的不同点?
Flink的一些Operator例如:window是需要按照时间来触发计算和计算大小的。而在Flink流处理应用中,不同的时间有不同的时间语义。Processing Time表示使用Task运行的Linux系统时间、Event time表示使用事件时间、Ingestion time表示摄入时间。绝大多数场景都是使用Event time。
请解释下Watermark。
Watermark是数据流中的一部分。我们必须要让每个Operator有自己的时钟,而因为当前使用的语义是Event time,无法使用系统时钟,所以需要我们需要一种机制,来构建每个Operator自己的时钟。而Watermark就是这样一种机制,通过Watermark可以构建每个Operator的逻辑时钟。
如何解决延迟数据问题?
将Watermark设置为当前Event time – 允许延迟的最大时间,但注意Watermark的时间是不能往前的,必须前进。但不是无限的延迟,需要根据业务控制允许延迟最大的时间,因为它同时也会延迟计算时间。
在Flink中如果Source并行度大于1,Watermark是一致的吗?
不一致,每个Source Operator的Task都会独立生成Watermark,每一个Operator的Task都有自己的时钟。而时钟的推进就是基于Watermark来实现的。
如果一个Task会从多个输入消费数据,因为涉及到多个数据流入,如何确定它的时间呢?
多个流取最小值。
如果某个Source的Task,一直闲置,而另外一个Task是运行状态,会出现什么样的问题?如何解决?
会出现Windows窗口无法实现计算,因为Windows所在的Operator Task会一致等待空间的Source Task,然后取最小值。只需要在Watermark策略中指定IDLE的超时时间即可。一旦Source Task达到超时时间,会标记为IDLE,下游就不再等待上游的Watermark了。