• 欢迎光临~

Flink 窗口API & 窗口分配器

开发技术 开发技术 2022-06-23 次浏览

1、窗口API

1,1、按键分区(Keyed)和非按键分区(Non-Keyed)

在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流KeyedStream来开窗,还是直接在没有按键分区的DataStream上开窗。也就是说,在调用窗口算子之前,是否有keyBy操作。

按键分区窗口(Keyed Windows)

经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。在代码实现上,我们需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。

stream.keyBy(...)
      .window(...)

非按键分区(Non-Keyed Windows)

如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。所以在实际应用中一般不推荐使用这种方式。

stream.windowAll(...)

这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

1.2、代码中窗口API的调用

有了前置的基础,接下来我们就可以真正在代码中实现一个窗口操作了。简单来说,窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

tream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,我们接下来就详细展开讲解。另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做keyBy,后面调用.window()时直接换成.windowAll()就可以了。

2、窗口分配器

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。之前的介绍中我们知道,窗口分配数据的规则,其实就对应着不同的窗口类型。所以可以说,窗口分配器其实就是在指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner作为参数,返回WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是AllWindowedStream。窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

2.1、时间窗口

时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。在较早的版本中,可以直接调用.timeWindow()来定义时间窗口;这种方式非常简洁,但使用事件时间语义时需要另外声明,程序员往往因为忘记这点而导致运行结果错误。所以在1.12版本之后,这种方式已经被弃用了,标准的声明方式就是直接调用.window(),在里面传入对应时间语义下的窗口分配器。这样一来,我们不需要专门定义时间语义,默认就是事件时间;如果想用处理时间,那么在这里传入处理时间的窗口分配器就可以了。

环境准备

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置全局并行度
        env.setParallelism(1);
        //设置水位线生成间隔
        env.getConfig().setAutoWatermarkInterval(100);
        SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        //无序水位线:延迟2s
                        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            //指定水位线的字段:这里从 timestamp 读取时间信息
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

滚动处理时间窗口

        /**
         * 滚动时间窗口
         * 窗口大小 10 min ,偏移 0
         */
        eventStream.map(data -> Tuple2.of(data.user, 1L)) 
                .keyBy(data -> data.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }) ;
        /**
         * 滚动时间窗口
         * 窗口大小 10 min ,偏移 2 min
         */
        WindowedStream<Event, String, TimeWindow> window2 =
                eventStream.keyBy(data -> data.user)
                        .window(TumblingEventTimeWindows.of(Time.minutes(10), Time.minutes(2)));

偏移量说明:我们知道,不同国家分布在不同的时区。标准时间戳其实就是1970年1月1日0时0分0秒0毫秒开始计算的一个毫秒数,而这个时间是以UTC时间,也就是0时区(伦敦时间)为标准的。我们所在的时区是东八区,也就是UTC+8,跟UTC有8小时的时差。我们定义1天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天0点开启窗口,这时是北京时间早上8点。那怎样得到北京时间每天0点开启的滚动窗口呢?只要设置-8小时的偏移量就可以了:

.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))

滑动处理时间窗口

        /**
         * 滑动事件时间窗口
         * 窗口大小 30 min,步长 5 min 无偏移量
         * window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(10)))
         */
        WindowedStream<Event, String, TimeWindow> window1 = eventStream.keyBy(data -> data.user)
                .window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(10)));
         /**
         * 滑动事件时间窗口
         * 窗口大小 30 min,步长 5 min 偏移量 -8h
         * SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(5), Time.hours(8)
         */
        WindowedStream<Event, String, TimeWindow> window11 = eventStream.keyBy(data -> data.user)
                .window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(5), Time.hours(-8)));

其它一些窗口分配器

   //事件时间会话窗口:窗口大小 5s
        WindowedStream<Event, String, TimeWindow> window3 =
                eventStream.keyBy(data -> data.user)
                        .window(EventTimeSessionWindows.withGap(Time.seconds(5)));
        //滑动计数窗口 窗口大小 10  :滑动步长 2
        WindowedStream<Event, String, GlobalWindow> eventStringGlobalWindowWindowedStream =
                eventStream.keyBy(data -> data.user)
                        .countWindow(10, 2);
程序员灯塔
转载请注明原文链接:Flink 窗口API & 窗口分配器
喜欢 (0)