Flink的窗口函数
官网地址:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html
在流系统中通常会使用到Windows来统计一定范围的数据,比如按时间、按个数等进行统计。
Flink中主要有4种窗口函数:
- Tumbling Windows - 滚动窗口
- Sliding Windows - 滑动窗口
- Session Windows - 会话窗口
- Global Windows - 全局窗口
其中比较常用的是前面的两种。
Tumbling Windows和Sliding Windows之间最大的区别就是:Tumbling Windows是不可能重叠的,而Sliding Windows是存在重叠的。
我们可以将Tumbling Windows看作是Sliding Windows的特殊情况,当Sliding Windows的滑动时间和窗口时间一样的时候,这时候Sliding Windows窗口之间就不会重叠,这就是Tumbling Windows。
Tumbling Windows - 滚动窗口
Tumbling Window在流数据中进行滚动,这种窗口不存在重叠,也就是说一个events/data只可能出现在一个窗口中,如下图所示:
可以看到,Tumbling Window之间是不存在重叠的。
Tumbling Window的创建可以基于数量(比如每5个元素构成一个窗口)或者基于时间(每隔10s创建一个窗口)。
创建基于时间(每隔5s)的Tumbling Window:
1 | import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment |
2 | import org.apache.flink.streaming.api.windowing.time.Time |
3 | |
4 | object WindowsApp { |
5 | def main(args: Array[String]): Unit = { |
6 | import org.apache.flink.api.scala._ |
7 | |
8 | val env = StreamExecutionEnvironment.getExecutionEnvironment |
9 | |
10 | val text = env.socketTextStream("hadoop002", 9999) |
11 | |
12 | text.flatMap(_.split(",")) |
13 | .map((_, 1)) |
14 | .keyBy(0) |
15 | .timeWindow(Time.seconds(5)) |
16 | .sum(1) |
17 | .print() |
18 | .setParallelism(1) |
19 | |
20 | env.execute("WindowsApp") |
21 | } |
22 | } |
然后Linux机器上启动netcat,并输入数据:
1 | [root@hadoop002 ~]# nc -lk 9999 |
2 | hello,world |
3 | hello,java |
在控制台上可以看到结果:
(world,1)
(hello,1)
(hello,1)
(java,1)
Sliding Windows - 滑动窗口
Sliding Window是在流数据中进行滑动,窗口之间可以重叠,它可以在传入的数据流中进行平滑聚合,如下图所示:
和Tumbling Window一样,我们可以根据需求创建基于数量或者基于时间的Sliding Window。
创建基于时间(窗口大小10s,滑动距离5s)的Sliding Windows:
1 | import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment |
2 | import org.apache.flink.streaming.api.windowing.time.Time |
3 | |
4 | object WindowsApp { |
5 | def main(args: Array[String]): Unit = { |
6 | import org.apache.flink.api.scala._ |
7 | |
8 | val env = StreamExecutionEnvironment.getExecutionEnvironment |
9 | |
10 | val text = env.socketTextStream("hadoop002", 9999) |
11 | |
12 | text.flatMap(_.split(",")) |
13 | .map((_, 1)) |
14 | .keyBy(0) |
15 | // .timeWindow(Time.seconds(5)) |
16 | .timeWindow(Time.seconds(10), Time.seconds(5)) |
17 | .sum(1) |
18 | .print() |
19 | .setParallelism(1) |
20 | |
21 | env.execute("WindowsApp") |
22 | } |
23 | } |