在流处理系统中,时间是一个非常重要的概念。Flink 可以设置下面三种事件时间。
ProcessingTime
ProcessingTime
意味着算子使用数据被处理时所在机器的系统时间。- 优点:延时低,因为算子不用等待按顺序来的数据
- 缺点:结果不是非确定的。因为每个窗口的数据内容取决于数据到达的速度。
EventTime
EventTime
指的是算子使用数据本身包含的时间信息作为当前时间。每个数据都携带一个时间戳和水位线。当一个水位通知说在给定的时间范围内所有数据已经到达时,event-time
窗口就会触发计算。
EventTime
窗口计算出的结果是确定的:结果和数据到达的顺序无关。IngestionTime
IngestionTime
是数据进入流处理器的时间。可以将IngestionTime
理解为source operator
的ProcessingTime
设置时间属性如下:
1 |
object { |
Event-Time 应用中的时间戳和水位
在基于 Event-Time
的流处理应用中,每个数据有两个必需的信息:
- 时间戳
事件发生的时间 - 水位
算子通过水位推断当前的事件时间。
水位用于通知算子没有比水位更小的时间戳的事件会发生了。
可以通过覆盖 SourceFunction
或者 UDF 来生成并分配时间戳和水位。
DataStream API
提同乐一个 TimestampAssigner
接口来从数据中提取时间戳。
一种最佳实践是:将分配时间戳和生成水位的操作离数据源越近越好,甚至就在 SourceFunction 内部更好。
TimestampAssigner
和其他 transformation
算子类似: 作用在一条数据流中的所有数据,并生成一条新的带有时间戳的数据和水位的数据流。
1 |
val env = StreamExecutionEnvironment.getExecutionEnvironment |
上面的例子中,MyAssigner
可以继承自 AssignerWithPeriodicWatermarks
或者 AssingerWithPuctuatedWatermarks
。区别如下:
AssignerWithPeriodicWatermarks
系统会以一个固定的时间值定期检查 event time 的进展。默认的时间是 200 ms。可以通过 env.getConfig.setAutoWatermarkInterval(interval)
来设置。
1 |
val env = StreamExecutionEnvironment.getExecutionEnvironment |
在上面的例子中,系统会每隔 5 秒钟检查当前的水位:
- Flink 先调用
getCurrentWatermark()
方法; - 如果上述方法返回一个时间戳更大的水位,则将新的水位发送。该步骤可以保证时间事件是持续增长的;
- 反之则不发送。
1 |
class PeriodicAssigner |
当事件的发生时间是单调递增的时候,可以在 DataStream
上调用 assignAscendingTimestamps
方法,该方法使用当前的时间戳生成水位: 因为不会出现更早的时间戳。
1 |
val stream: DataStream[MyEvent] = ... |
AssignerWithPunctuatedWatermarks
当水位可以基于数据的某些属性来决定时,Flink 提供了 AssignerWithPunctuatedWatermarks
接口,该接口包含 checkAndGetNextWatermark
方法,这个方法会在每次 extractTimestamp()
方法被调用后调用,它可以决定是否生成一个新的水位。另外,和上面一样,如果该方法返回了一个时间戳更大的水位时,发送一个新的水位。
水位,延时,准确性
水位是一种可以权衡一个流处理应用延时和结果准确性的机制:
- 松水位
如果水位时间戳远远小于最新事件时间戳,应用的延时会变得比较大,但是准确性会提升。 - 紧水位
如果水位时间戳和最新事件时间很接近,应用的延时会比较小,但是准确性会下降。
延时-准确性的权衡是所有流处理系统的一个基本特性。
Process Function
DataStream API
提供了一系列的底层可操控时间戳和水位的ProcessFunction
接口。这些接口还可以向系统注册 timer
用于在将来某些特定时间触发事件。另外,process functions
还可以用于将输出数据发送到多个输出。
ProcessFunction
接口适用于构建自定义逻辑的事件驱动的应用。对于预定义好的时间窗口的操作可能不太适用。
ProcessFunction
可以作用于 DataStream
和 KeyedStream
上,这个方法在 Stream
里的每个数据上调用。
ProcessFunction
提供了两个通用的方法:
processElement(v:IN, ctx: Context, out:Collector[OUT])
作用在数据流上的每个数据;
输出数据被发送到Collector
;
Context
提供了操控当前数据时间戳和TimeService的功能;Context
还可以向多个输出流发送数据。onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])
该方法会在指定的已注册的timer
触发时被调用;
timestamp
参数是指定 timer 触发时的时间戳;
Collector
用于发送结果数据;
OnTimerContext
提供类似processElement
中 ctx 相同的功能。
TimerService 和 Timers
Context
和 OnTimerContext
中的 TimerService
提供了下述方法:
currentProcessingTime(): Long
currentWatermark(): Long
registerProcessingTimeTimer(timestamp: Long): Unit
注册一个作用于当前 key 的processing time timer
。
这个timer
会在机器的执行时间和timestamp
参数匹配时触发执行上述onTimer()
方法;registerEventTimeTimer(timestamp: Long): Unit
功能一样,只是换做了
Event Time`deleteProcessingTimeTimer(timestamp: Long): Unit
删除一个 processing timer,如果没有已注册的,则无效。deleteEventTimeTimer(timestamp: Long): Unit
当 timer
触发时,onTimer()
会被调用,而且 onTimer()
和 processElement
是被同步(synchronized) 调用的。
timers 只能被用于 keyedStream。
timers
的常用场景是:
- 在经过某段时间的不活跃后清除一个 key 对应的状态;
- 或者实现自定义的基于时间的窗口逻辑。
可以在每个数据上创建一个不重要的 key 来将一个普通 Stream 变成 KeyedStream。
触发到边输出
边输出(Side outputs) 是一种可以通过函数将一个输入流的数据处理后触发到多个输出流的机制:
- 除了主输出流外的边输出流的数量没有限制;
- 每个单独的边输出都用
OutputTag[X]
标记; - 一个
ProcessFunction
可以通过Context
将一个数据记录触发到一个或多个边输出。
1 |
val monitoredReadings: DataStream[SensorReading] = readings |
CoProcessFunction
对于两个输入流的操作,DataStream
API 提供了 CoProcessFunction
。
对于每个输入流,CoProcessFunction
都提供了一个转换函数:
processElement1()
processElement2()
同样的,CoProcessFunction
还提供了 TimerService
等相关接口。
窗口算子
在流处理系统中,窗口
是很常见的概念。窗口算子
可以在无限流数据中划分出一个有限范围,然后做各种处理。通常情况下,这个有限范围都是基于时间的。
定义窗口算子
窗口算子可以作用在
KeyedStream
上
并行计算non-keyed Stream
上
单线程上计算
要创建一个窗口算子,需要两个组件:
- Window Assigner
决定了数据流中的数据是如何被划分到窗口中的,并且返回一个(keyed)WindowedStream
或者 (non-keyed)AllWindowedStream
。 - Window Function
作用于WindowedStream
或者AllWindowedStream
上,然后处理窗口中的元素。
如下面代码所示:
1 |
// define a keyed window operator |
内置的 Window Assigners
基于时间的窗口会根据事件时间(event time 或者 processing time)将一个数据分配给某个窗口。每个时间窗口都有一个开始时间戳和结束时间戳。
所有内置的窗口分配器都会提供一个默认的触发器,一旦时间超过某个窗口的结束时间,触发器就会触发对这个窗口的计算。
Tumbling Windows
TumblingWindow
会将数据放在不重叠,大小固定的窗口中:
内置了两个分配器:
TumblingEventTimeWindows
和 TumblingProcessingTimeWindows
1 |
val sensorData: DataStream[SensorReading] = ... |
一种简写方式是:
1 |
val avgTemp = sensorData |
默认情况下,tumbling windows 和纪元时间(1970-01-01-00:00:00.000
)对齐,比如,一个大小为一小时的窗口的开始和结束时间为: 00:00:00, 01:0:00, 02:00:00 等。
所以,分配器还提供了第二个参数,可以控制偏移量:
1 |
val avgTemp = sensorData |
Sliding Windows
SlidingWindow 会将数据分配到长度固定,但可能会重叠的窗口中:
对于一个滑动窗口,需要声明两个参数: 窗口大小和新窗口生成频率:
1 |
// event-time sliding windows assigner |
有两种情况:
- 当窗口生成频率小于窗口大小时,窗口会重叠,数据可能会被分配给两个窗口;
- 当窗口生成频率大于窗口大小时,有些数据可能会得不到分配。
也可以传入第三个参数:偏移量
同样的,也可以用快捷方法:timeWindow(windowSize, slideSize)
来表示滑动窗口。
Session Windows
Session Window 将数据分配到 不覆盖,长度不固定的窗口中。
Session Window 的边界由不活跃间隔定义:例如,没收到任何数据的时间长度。
1 |
// event-time session windows assigner |
SessionWindows
会先将每个输入数据分配到单独的窗口,以数据的时间为窗口开始时间,并且以 session gap 为窗口大小,最后会将所有窗口合并。
将函数作用在窗口上
窗口上的函数有两种:
- 增量聚合函数
每当有输入数据加入窗口时,函数就作用在其上,并更新一个单独的窗口状态值。
空间效率非常高,最终会出发一个聚合后的结果。
ReduceFunction
和AggregateFunction
是常见的增量聚合函数。 - 全窗口函数
会先收集窗口中的所有数据,然后遍历计算。
需要更多空间。但是适用于更复杂的逻辑。
ProcessWindowFunction
是全窗口函数。
ReduceFunction
ReduceFunction
接收两个类型相同的数据,并将其结合输出为一个类型相同的值:单个窗口状态。
当有新数据加入窗口时:该函数被调用,并传入新数据和窗口状态值作为参数。
优点:
- 窗口状态恒定而且数据量小;
- 接口简单;
缺点:
- 输入和输出结果类型都是相同的。因此通常被限制为简单的聚合方法。
1 |
val minTempPerWindow: DataStream[(String, Double)] = sensorData |
AggregateFunction
增量聚合,窗口状态单一,但是接口更灵活也更复杂:
1 |
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable { |
可以看到,输出,输入,中间状态都可以是不同类型。
ProcessWindowFunction
有些场景是增量聚合函数很难实现的,比如:取中位数,词频等。
ProcessWindowFunction
可以先收集窗口中所有数据,然后在作用其上计算:
1 |
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> |
值得注意的是,和其他 ProcessFunction
一样,该方法也提供 Context
参数,这个参数除了普通的方法外,还提供窗口的一些原信息,比如:开始时间,结束时间,以及每个窗口的状态 windowState()和每个 key 的总状态 globalState()。
例子:
1 |
// output the lowest and highest temperature reading every 5 seconds |
增量聚合 和 ProcessWindowFunction
有这样的场景:窗口函数可以被表达为增量聚合函数,但同时也想获取窗口的元信息。
在这种场景下,可以将两者结合:
- 当数据进入窗口后,立即被增量聚合函数处理;
- 当触发器触发时,聚合结果被交给
ProcessWindowFunction
; ProcessWindowFunction.process
方法的vals: Iterable
参数只有一个值:上述的聚合结果。
在 DataStream API 中,调用方法为:
1 |
input |
上面的例子:
1 |
val minMaxTempPerWindow2: DataStream[MinMaxTemp] = sensorData |
自定义窗口算子
自定义窗口算子可以解决一些内置窗口无法解决的问题:
- 在窗口结束前发送结果;
- 当有迟到的数据进入窗口后,更新相应的状态;
- 当遇到特殊的数据条目时,开始或者结束一个窗口。
通过定义 assingers
, triggers
, evictors
和 window functions
可以定义一个窗口算子并处理其中的元素:
- 当有数据到达窗口算子,被
WindowAssigner
处理。这个方法决定了数据被分配到哪个窗口,如果这个窗口不存在,那么就会被新创建; - 如果
窗口函数
是增量聚合函数,那么新数据会被立即聚合,计算结果被存储为窗口的内容;反之,如果是全窗口函数
,新元素会被追加到ListState
用于保存窗口所有的函数。 - 当数据被分配给一个窗口时,同时也被传给窗口的
Trigger
。
触发器定义了一个窗口何时被认为准备好评估以及何时被关闭清除内容。
触发器的定义可以基于窗口中元素的数量和注册的timer
。
当触发器触发时,其具体行为由注册的窗口函数决定:- 如果是增量聚合函数,当前的聚合结果立即被触发。
- 如果是全窗口函数:
- 如果是混合型:
- 如果是增量聚合函数,当前的聚合结果立即被触发。
Evictor
是一个可选项,用于清除一个窗口中所有的元素,由于需要遍历,所有只能作用在全窗口函数上。
下例描述了如何定义:
1 |
stream |
窗口的生命周期
- 创建
当
WindowAssigner
分配第一个元素给窗口时,被创建。因此,不存在一个元素都不存在的窗口。一个窗口包含不同的状态:- 窗口内容
窗口内容包含了被分配的所有元素,或者聚合结果(如果是增量聚合函数)。 - 窗口对象
窗口对象保存了可以分辨不同窗口的信息。每个窗口对象都有一个结束时间戳用于定义窗口可以被删除的时间。 - 触发器的 timer
一个Trigger
可以注册定时器用于在特定时间被回调:评估窗口或者清除窗口内容。
由窗口算子维护。 - 触发器中自定义的状态
可以自定义每个窗口中的状态或者每个key对应的状态。
由触发器维护。
- 窗口内容
- 删除
当时间到达窗口的结束时间时,窗口被删除。
当窗口被删除后,窗口算子会清楚这个窗口的窗口内容,并丢弃窗口对象。
触发器状态和定时器不会被清除,因为它们是由窗口算子维护。
所以,需要调用Trigger.clear()
来清除状态以防止状态泄露。
窗口分配器
1 |
/** A custom window that groups events into 30 second tumbling windows. */ |
Triggers
触发器可以基于数据特性或者时间特性来决定计算一个窗口。
触发器可以接触到窗口的元数据,包括时间,定时器,状态等。
触发器可以做到如下述场景:
- 当窗口接收到固定数量的数据;
- 某个特定的数据到达窗口;
- 在当前时间未达到水位时触发计算返回结果(可以降低时延);
等等。
没当trigger被调用时,会返回一个 TriggerResult
,用于决定窗口的动作。
CONTINUE
: 不做任何操作;FIRE
: 如果窗口函数是 增量聚合函数,那么这个函数被调用,结果被触发;窗口状态不变;PURGE
: 清除窗口内容和状态;FIRE_AND_PURGE
: 先FIRE
再PURGE
。
Evictors
Evictors 可以在窗口函数被计算后或者计算前清除其中的元素。
接口如下:
1 |
public interface Evictor<T, W extends Window> extends Serializable { |
根据时间 Join 数据流
常见场景:Join 两个数据流中的数据。
Flink 内置了两个处理这种场景的算子。
Interval Join
Window Join
处理迟到的数据
Flink 基于水位来处理 event time 的进程。正如上面讨论的,总会有迟到的事件(事件时间小于水位)在事后才到达。
有以下几种处理方式:
丢弃迟到的数据
最简单的办法。
也是 EventTime 窗口的默认方法。
ProcessFunction 可以通过对比事件时间和当前水位决定是否丢弃。
重定向迟到的数据
通过 SideOutput
将迟到的数据重定向到另一个数据流,然后再取出来做业务上的处理:
1 |
// define a side output tag |
包含迟到的数据更新结果
将迟到的数据包括进来,然后重新计算一次。
问题多多:
- 为了重新计算所有的数据,算子必须保存所有的状态;
- 下游算子也要重新计算,甚至 Sink 到磁盘的算子也要覆盖;
Flink 给窗口算子提供了一个允许的迟到时间: allowedLateness
:
1 |
val readings: DataStream[SensorReading] = ??? |
近期评论