Flink

本文介绍Flink的体系结构。

Flink是一个开源的分布式数据计算框架,可以用来处理stream data和batch data.它的核心是一个streaming dataflow engine ,这个engine对数据流(data streams)提供数据分布、通信、错误恢复等分布式数据计算的支持。Flink为用户提供的API同时支持stream data和batch data的处理,但它的计算核心只有一个,就是前面说的streaming dataflow engine,对batch data进行了一些封装。下面先介绍一下stream和batch的区别。

streaming dataflow engine,这里面有两个词streaming和dataflow,这两个词应该能解释它整个系统是用来做什么的,以及是怎么做的。首先streaming,说明它这个系统是用来处理stream data 的,或者说它实现的是一个针对stream data processing的系统,虽然它也能处理batch data,但它的本意、基本思想是要做stream data processing。第二个,dataflow指的是Google提出的dataflow model(The Dataflow Model: A Practical Approach to BalancingCorrectness, Latency, and Cost in Massive-Scale,Unbounded, Out-of-Order Data Processing),它的内部实现应该很多技术和方法都是参考这篇paper。

streaming

stream data字面意义是流式数据,与之相关的另外一个词unbounded data,认为数据是无界,它的大小不是确定的。传统的数据处理任务都是一堆数据放在哪里,已经知道他有多少个文件,一共占用多少磁盘,这些数据是有界的。为什么会有无界的数据,因为可能数据的大小是在不断增长的,或者你要处理的数据并不在自己的本地,要不断从别的地方接收过来,而你也不知道对方什么时候会发送结束等等。这种数据一般是实时增长,不断产生的。像网络日志、应用日志、传感器数据、用户的操作事务等。
streaming要解决的问题就是处理这种unbounded data。这里面会有许多问题,包括时延、正确性、事件的乱序问题,比如由于数据是分散的接收的,一个任务要确定合适的时间来处理接收得到的数据,如果任务跟事件的时间和顺序是无关的,可以接收到一个事件(这里的事件就是一条数据)就进行一次处理,得到结果,这样的计算方式时延是最低的,也不会产生乱序问题。如果一个任务跟数据的时间和顺序是有关的,那么对于接收到的数据就要谨慎地选择处理的顺序和时间。由于数据传输、通信等的原因,几条数据到达的顺序可能不是他们原来产生的顺序,这就是数据潜在的乱序问题。而对于接收到的数据如果不能进行立即处理,等待他前面发生的数据后到来,则会存在时延问题。

time:event time & processing time

因为stream data processing里大部分任务都是跟数据或者事件的时间信息是有关的,所以大多数情况下,每条数据都有一个时间信息。对于不同的数据,它们的时间信息要求的也会不一样。
  • event time 一条数据产生时的时间
  • processing time 数据被处理的时间,对应的是处理这条数据的机器时间
理想的情况下event time和processing time是相同的,但在现实的物理世界中,由于许许多多原因这是不可能的,实际的processing time总要比event time慢许多。
对于和数据时间关系不大的任务,可以使用数据的processing time,这样可以尽可能的减少延时,提高任务的处理效率。而对于和数据的发生时间非常敏感的任务,可以使用event time,可以一定程度上解决数据的时间相关问题,但会产生时延增加等问题。
在fink系统中,还增加了一个ingestion time的概念,它指的是数据到达flink系统的时间,可以一定程度上平衡前面的几个问题。这会在后面进行介绍。


Window

fixed window。传统的针对batch data 的数据处理引擎会使用它来处理stream data,主要的思想就是将接收到的数据根据固定的时间划分成一个个小的窗口,每次处理这个窗口中的数据,就可以将unbounded data变成bounded data 进行处理。但简单的固定窗口的大小会有许多问题不能解决,比如不能保证某个窗口里的数据在他结束的时间到来时,所有的数据都已经到达。等等。
flink也使用窗口机制来处理stream data,但它的window相对更加灵活,类型更多,可以定制的方式也很多。比如根据时间划分的窗口,event time window和processing time window;固定的窗口和滑动的窗口,tumbling window、sliding window;根据计时的窗口和计数的窗口等。这在后面会进行介绍。

1. 系统结构

主要分为四层架构:deployment, core, APIs, and libraries.

图1显示的是flink的software stack,核心部分是分布式数据流引擎(distributed dataflow engine),用来执行数据流程序。API部分分为两类,一类处理dataset,即有限的数据,批处理;另一类是datastream,处理流式数据。核心的runtime可以被看作是一个流式数据处理引擎,执行基于上面两种API编写的程序。

 图2 描绘的是一个flink集群的进程组织模型。主要包含三类进程:client,job manager, task manager(最少一个).
client接受程序代码,转换成数据流图,提交给job manager。这一步会检查每次变换的数据类型,and creates serializers and other type/schema specific code. 对于基于dataset的程序还会额外经过一个优化阶段(cost-based query)。similar to the physical optimizations performed by relational query optimizers (for more details see Section 4.1) 。
JobManager协调数据流的分布式执行。它跟踪每个操作和流的状态和进度,安排新的操作,并协调检查点和负责恢复。在高可用性设置中,job manager会将每个检查点的一组最小元数据进行保存,用于错误恢复。
数据的处理和执行在task manager中进行,包含一到多个operator,并向job manager汇报状态。task manager维护一个buffer pool来buffer和materialize数据流,以及网络连接进行不同operator间的数据交换。 

2 通用结构(The Common Fabric): Streaming Dataflows 

即使使用的API不同,所用的程序都会被转换成数据流图,flink 的runtime engine就是处理这个流图。

2.1 数据流图

图3 描绘了一个有向无环图,包括两个部分:
  1. stateful operator
  2. 代表由operator产生和被operator处理的数据的数据流
由于数据流图是以数据并行方式执行的,操作符被并行化为一个或多个称为子任务的并行实例,并且流被分成一个或多个流分区(每个生成子任务对应一个分区)。

2.2 通过中间数据流进行数据交换

flink把两个operator中间的数据流交换抽象为intermediate data streams,它代表的是由一个operator生成并可以被一个或者多个operator消费的中间数据。这些数据可以被序列化,也可能不被序列化。上层结构可以对这些数据的特定行为进行参数化。(data set api 对程序进行优化)

2.2.1 Pipelined and Blocking Data Exchange.

并发的生产者和消费者通过Pipelined intermediate streams交换数据。连续的流式任务使用pipelined streams,批处理任务里也尽可能的避免使用materialized streams。blocking stream缓存所有生产运营商的数据,然后才能被消费,从而将生产和消费运营商分为不同的执行阶段。
pipeline stream:生产者和消费者可以并发的执行
blocking stream:生产者和消费者没有并发。more memory;spill to secondary storage; and do not propagate backpressure 
在某些特殊情况下也会使用后者。

2.2.2 平衡延时和吞吐量

flink的数据交换机制使用buffer交换实现。生产者序列化到buffer,buffer传递给消费者:
  1. buffer满
  2. 时间到
设置大的buffer size可以提高吞吐量,降低buffer时间可以减少延时。
图4显示了缓冲区超时应用在30台机器(120个核心)上简单的流式grep作业中记录传输的吞吐量和延迟的影响。


2.2.3 控制事件

除了交换数据,flink的stream也会传递不同的控制事件。
  • checkpoint barriers 
  • watermarks
  • iteration barriers
控制事件认为流的分组保持了事件原有的顺序,一元operator同样。但是接受多个流的operator并不能保证接收的数据是顺序的,但为了提高效率(the streams’ rates) 减少前面operator的压力,operator不会保证事件的顺序问题,或者对于特定的operator,要在实现它的时候单独考虑。
这一设计更高效,由于很多operator并不需要考虑顺序问题(hash-join, map)。另外,flink还有一个时间窗的机制。

2.3 错误恢复

2.4 迭代数据流


Stream Analytics on Top of Dataflows 

3.1 时间概念

flink里有两种时间概念:
  1. 事件产生的时间,由事件源赋予时间戳
  2. 处理时间,在某个机器上被处理时的机器时间
文中说在分布式系统中,事件时间和处理时间之间存在任意的(arbitrary)偏差,即从事件发生到处理得到结果会等待 不确定的一段时间。为了解决这个等待问题,这些系统都使用一种watermark事件机制,就是前面控制事件提到的。
就是会向operator发送一个watermark事件,包含一个时间戳,表示这一时间戳之前的事件都已经到达了该operator。
watermark不断传递,map,filter简单的传递,某些operator 例如event-time window 会基于它进行一些计算再传递。当有多个输入包含watermark时,总是传递最小的到下一个operator。
基于处理时间的flink程序,有更小的时延,但在一致性、错误恢复方面会有问题;基于事件产生时间的程序更可靠,但时延会更高。flink还使用一种ingestion time(事件进入flink的时间),来平衡二者。

3.3 stream window

window是处理stream data的核心思想。
flink提供的API 使用window进行数据处理的一个基本过程如下面的代码所示,分为两种类型,一种是将stream先根据key进行划分,另一种不进行这一步的操作。使用keyedwindows可以提高整个程序的并发性。其中‘[]'中的操作是可选的。
Keyed Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
Non-Keyed Windows

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
整个window的使用流程包括三个核心的部分:assigner、trigger和evictor。

  • assigner负责将每条记录分配到一个窗口,
  • trigger负责确定与该窗口相关的operator何时被执行,
  • evictor在处理window的operator被调用前或调用后执行,负责筛选window里的一些数据。

3.3.1 window 的生命周期

简单讲,当一个window的第一个数据进入它是,window会被创建;当时间(event or processing)超过这个窗口的结束时间+最大lateness时,它会被销毁。此处的lateness不是watermark,是在上面allowedLateness(...)进行设置,后面介绍。


3.3.2 window类型


window assigner用来创建window,确定window类型。根据每个任务的需求不同,可以根据不同的方式来划分window:

  • 根据时间的不同:eventTime window、 processingTime window
  • 处理方式不同:tumblingWindow、 slidingWindow、 session
  • 计数不同:count window、time window
如下图所示:

3.3.3 Triggers

trigger用来确定一个window是否已经准备好被处理。所有的window assigner都自带默认的trigger,用户也可以根据需要自定义trigger。
这个接口的方法有:
  • The onElement() method is called for each element that is added to a window.
  • The onEventTime() method is called when a registered event-time timer fires.
  • The onProcessingTime() method is called when a registered processing-time timer fires.
  • The onMerge() method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
  • Finally the clear() method performs any action needed upon removal of the corresponding window.
用户可以重新定义这些方法,来决定接下来的操作:
  • 继续 不操作
  • 发送窗口
  • 抹掉一个数据
  • 发送和抹掉

3.3.4 Evictor

evictor的作用是,在一个 window被发出后,在这个 window的processing function被执行前或执行后,将里面的某些数据剔除。

3.3.5 Allowed lateness

决定一个window被销毁的时刻。

3.3.6 window functions

将数据划分成窗口后,要定义要对数据进行的计算。Flink中function类型有ReduceFunctionAggregateFunctionFoldFunction or ProcessWindowFunction.
前面两类function可以有更高的执行效率,这类函数不需要等到整个 window被发送出来后在进行计算,同时每个window中存储的数据也会相应变少。而ProcessWindowFunction总是会将数据进行缓存,等到整个window数据满了之后才会进行后面的计算。可以将ProcessWindowFunction结合到前面的几类函数中进行优化,具体例子可以看Window Functions .
fsx

3.4 异步流迭代

Loops in streams are essential for several applications, such as incrementally building and training machine learning models, reinforcement learning and graph approximations [9, 15]. In most such cases, feedback loops need no coordination. Asynchronous iterations cover the communication needs for streaming applications and differ from parallel optimisation problems that are based on structured iterations on finite data. As presented in Section 3.4 and Figure 6, the execution model of Apache Flink already covers asynchronous iterations, when no iteration control mechanism is enabled. In addition, to comply with fault-tolerance guarantees, feedback streams are treated as operator state within the implicit-iteration head operator and are part of a global snapshot [7]. The DataStream API allows for an explicit definition of feedback streams and can trivially subsume support for structured loops over streams [23] as well as progress tracking [9].

Batch Analytics on Top of Dataflows
 Flink approaches batch processing as follows:
  • Batch computations are executed by the same runtime as streaming computations. The runtime executable may be parameterized with blocked data streams to break up large computations into isolated stages that are scheduled successively.
  • Periodic snapshotting is turned off when its overhead is high. Instead, fault recovery can be achieved by replaying the lost stream partitions from the latest materialized intermediate stream (possibly the source).
  • Blocking operators (e.g., sorts) are simply operator implementations that happen to block until they have consumed their entire input. The runtime is not aware of whether an operator is blocking or not. These operators use managed memory provided by Flink (either on or off the JVM heap) and can spill to disk if their inputs exceed their memory bounds.
  • A dedicated DataSet API provides familiar abstractions for batch computations, namely a bounded fault tolerant DataSet data structure and transformations on DataSets (e.g., joins, aggregations, iterations).
  • A query optimization layer transforms a DataSet program into an efficient executable. 

 


评论