Flink的Tumbling Windows和Sliding Windows

Flink的窗口函数

官网地址:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html

在流系统中通常会使用到Windows来统计一定范围的数据,比如按时间、按个数等进行统计。

Flink中主要有4种窗口函数:

  • Tumbling Windows - 滚动窗口
  • Sliding Windows - 滑动窗口
  • Session Windows - 会话窗口
  • Global Windows - 全局窗口

其中比较常用的是前面的两种。

Tumbling Windows和Sliding Windows之间最大的区别就是:Tumbling Windows是不可能重叠的,而Sliding Windows是存在重叠的。

我们可以将Tumbling Windows看作是Sliding Windows的特殊情况,当Sliding Windows的滑动时间和窗口时间一样的时候,这时候Sliding Windows窗口之间就不会重叠,这就是Tumbling Windows。

Tumbling Windows - 滚动窗口

Tumbling Window在流数据中进行滚动,这种窗口不存在重叠,也就是说一个events/data只可能出现在一个窗口中,如下图所示:

Tumbling Windows

可以看到,Tumbling Window之间是不存在重叠的。

Tumbling Window的创建可以基于数量(比如每5个元素构成一个窗口)或者基于时间(每隔10s创建一个窗口)。

创建基于时间(每隔5s)的Tumbling Window:

1
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
2
import org.apache.flink.streaming.api.windowing.time.Time
3
4
object WindowsApp {
5
  def main(args: Array[String]): Unit = {
6
    import org.apache.flink.api.scala._
7
8
    val env = StreamExecutionEnvironment.getExecutionEnvironment
9
10
    val text = env.socketTextStream("hadoop002", 9999)
11
12
    text.flatMap(_.split(","))
13
      .map((_, 1))
14
      .keyBy(0)
15
      .timeWindow(Time.seconds(5))
16
      .sum(1)
17
      .print()
18
      .setParallelism(1)
19
20
    env.execute("WindowsApp")
21
  }
22
}

然后Linux机器上启动netcat,并输入数据:

1
[root@hadoop002 ~]# nc -lk 9999
2
hello,world
3
hello,java

在控制台上可以看到结果:

(world,1)
(hello,1)
(hello,1)
(java,1)

Sliding Windows - 滑动窗口

Sliding Window是在流数据中进行滑动,窗口之间可以重叠,它可以在传入的数据流中进行平滑聚合,如下图所示:

Sliding Windows

和Tumbling Window一样,我们可以根据需求创建基于数量或者基于时间的Sliding Window。

创建基于时间(窗口大小10s,滑动距离5s)的Sliding Windows:

1
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
2
import org.apache.flink.streaming.api.windowing.time.Time
3
4
object WindowsApp {
5
  def main(args: Array[String]): Unit = {
6
    import org.apache.flink.api.scala._
7
8
    val env = StreamExecutionEnvironment.getExecutionEnvironment
9
10
    val text = env.socketTextStream("hadoop002", 9999)
11
12
    text.flatMap(_.split(","))
13
      .map((_, 1))
14
      .keyBy(0)
15
//      .timeWindow(Time.seconds(5))
16
      .timeWindow(Time.seconds(10), Time.seconds(5))
17
      .sum(1)
18
      .print()
19
      .setParallelism(1)
20
21
    env.execute("WindowsApp")
22
  }
23
}
Author: VinxC
Link: https://vinxikk.github.io/2019/01/11/flink/flink-tumbling-window-vs-sliding-window/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.