上一期我们聊到了Flink知识点四Watermark水位线本期我们继续一起聊下Flink的窗口。一、为什么需要窗口Flink 处理的是无界流数据永远不会结束。窗口把无界流切成有界的数据块才能做聚合统计。Window是无限数据流处理的核心Window 将一个无限的stream拆分成有限大小的“buckets”桶我们可以在这些桶上做计算操作。还是外卖的例子统计每10分钟下了多少单就需要把数据按时间切成一段一段来计算。二、窗口的分类2.1 按键分KeyedStream → keyBy() 之后 → .window() 每个 key 独立维护自己的窗口 Non-Keyed → 不 keyBy() → .windowAll() 所有数据进同一个窗口并行度强制为1慎用2.2 按照窗口分配数据的规则分类2.2.1 滚动窗口Tumbling固定大小不重叠每条数据只属于一个窗口。注图片来源于网上stream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).sum(amount);外卖场景统计每10分钟的订单量11:00-11:10 一个窗口11:10-11:20 一个窗口互不干扰。2.2.2 滑动窗口Sliding Window固定大小有重叠一条数据可能属于多个窗口。由窗口大小和滑动步长共同决定。注图片来源于网上stream.keyBy(Order::getUserId)// 窗口大小10分钟每5分钟滑动一次.window(SlidingEventTimeWindows.of(Time.minutes(10),Time.minutes(5))).sum(amount);外卖场景每5分钟统计一次过去10分钟的订单量用来做实时趋势监控。注意窗口大小 / 滑动步长 每条数据被计算的次数步长越小计算开销越大。2.2.3 会话窗口Session Window没有固定大小按照数据间的间隔来切分。超过指定时间没有数据窗口关闭。注图片来源于网上stream.keyBy(Order::getUserId)// 超过30分钟没有数据关闭窗口.window(EventTimeSessionWindows.withGap(Time.minutes(30))).sum(amount);外卖场景统计用户一次点餐会话的总消费用户开始浏览到下单完成算一次会话中间超过30分钟没操作就结束。扩展动态间隔// 不同用户可以有不同的 gap.window(EventTimeSessionWindows.withDynamicGap(order-order.getUserLevel().equals(VIP)?60_000L:30_000L))2.2.4 全局窗口Global Window所有数据进同一个窗口永远不自动触发必须配合自定义 Trigger 使用。注图片来源于网上stream.keyBy(Order::getUserId).window(GlobalWindows.create())// 每累积100条数据触发一次.trigger(CountTrigger.of(100)).sum(amount);外卖场景每个用户累计下了100单触发一次统计。扩展把按键分和按照窗口分配数据的规则分类组合起来看滚动滑动会话全局Keyed✅ 最常用✅✅✅Non-Keyed慎用慎用慎用慎用实际生产中基本都是Keyed 滚动/滑动/会话这三种组合。2.3 按照驱动类型分时间窗口Time Window以时间点来定义窗口的开始和结束。定点发车计数窗口Count Window以事件的个数来截取数据达到固定的个数预置就出发计算并关闭窗口。人齐发车三、窗口函数窗口收集到数据后用窗口函数来计算。3.1 增量聚合函数数据来一条处理一条不存储原始数据内存占用小。3.1.1 ReduceFunctionstream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).reduce((a,b)-{a.setAmount(a.getAmount()b.getAmount());returna;});3.1.2 AggregateFunction更灵活输入输出类型可以不同stream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).aggregate(newAggregateFunctionOrder,Tuple2Double,Integer,Double(){OverridepublicTuple2Double,IntegercreateAccumulator(){returnTuple2.of(0.0,0);// (总金额, 订单数)}OverridepublicTuple2Double,Integeradd(Orderorder,Tuple2Double,Integeracc){returnTuple2.of(acc.f0order.getAmount(),acc.f11);}OverridepublicDoublegetResult(Tuple2Double,Integeracc){returnacc.f0/acc.f1;// 返回平均金额}OverridepublicTuple2Double,Integermerge(Tuple2Double,Integera,Tuple2Double,Integerb){returnTuple2.of(a.f0b.f0,a.f1b.f1);}});3.2 全量窗口函数窗口触发时才处理能拿到窗口内所有数据和窗口元信息。3.2.1 ProcessWindowFunctionstream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).process(newProcessWindowFunctionOrder,String,String,TimeWindow(){Overridepublicvoidprocess(StringuserId,Contextctx,IterableOrderorders,CollectorStringout){longwindowStartctx.window().getStart();longwindowEndctx.window().getEnd();doubletotalStreamSupport.stream(orders.spliterator(),false).mapToDouble(Order::getAmount).sum();out.collect(String.format(用户%s 在 %s~%s 消费了 %.2f 元,userId,windowStart,windowEnd,total));}});3.2.2 WindowFunctionstream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).process(newProcessWindowFunctionOrder,String,String,TimeWindow(){Overridepublicvoidprocess(StringuserId,Contextctx,IterableOrderorders,CollectorStringout){doubletotal0;for(Orderorder:orders){totalorder.getAmount();}out.collect(userId 消费 total);}});3.3 增量和全量结合(推荐使用)用 AggregateFunction 做增量聚合再用 ProcessWindowFunction 拿窗口元信息兼顾性能和灵活性stream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).aggregate(newOrderAggregateFunction(),newOrderProcessWindowFunction());四、触发器Trigger决定窗口什么时候触发计算默认不需要手动配置。// 内置触发器EventTimeTrigger.create()// 默认Watermark 超过窗口结束时间触发ProcessingTimeTrigger.create()// 处理时间触发CountTrigger.of(100)// 累积100条触发PurgingTrigger.of(...)// 触发后清空窗口数据// 自定义触发器.trigger(newTriggerOrder,TimeWindow(){OverridepublicTriggerResultonElement(Orderorder,longtimestamp,TimeWindowwindow,TriggerContextctx){// 每来一条数据都触发实时输出但开销大returnTriggerResult.FIRE;}OverridepublicTriggerResultonEventTime(longtime,TimeWindowwindow,TriggerContextctx){returnTriggerResult.FIRE_AND_PURGE;// 触发并清空}OverridepublicTriggerResultonProcessingTime(longtime,TimeWindowwindow,TriggerContextctx){returnTriggerResult.CONTINUE;}});TriggerResult 的四种结果结果含义CONTINUE什么都不做FIRE触发计算保留数据PURGE清空数据不触发FIRE_AND_PURGE触发计算并清空数据五、迟到数据处理那就使用侧输出流OutputTagOrderlateTagnewOutputTagOrder(late-orders){};SingleOutputStreamOperatorStringresultstream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).allowedLateness(Time.minutes(1))// 窗口关闭后再等1分钟.sideOutputLateData(lateTag)// 超过1分钟的数据发到侧输出.process(newOrderProcessWindowFunction());// 单独处理极端迟到的数据result.getSideOutput(lateTag).print(极端迟到的订单);六、整体流程总结数据流 → keyBy() 按 key 分区 → window() 指定窗口类型滚动/滑动/会话/全局 → trigger() 指定触发条件可选有默认值 → allowedLateness() 处理迟到数据可选 → aggregate/process 窗口函数计算结果七、怎么选窗口类型固定周期统计每小时、每天 → 滚动窗口 实时趋势、移动平均 → 滑动窗口 用户行为分析、会话统计 → 会话窗口 按条数或自定义条件触发 → 全局窗口 自定义 Trigger八、Flink SQL中的窗口使用TVFTable-Valued Function窗口8.1 滚动窗口 TUMBLE-- 每10分钟统计一次各用户的订单量和总金额SELECTwindow_start,window_end,user_id,COUNT(*)ASorder_cnt,SUM(amount)AStotal_amountFROMTABLE(TUMBLE(TABLEorders,DESCRIPTOR(order_time),INTERVAL10MINUTE))GROUPBYwindow_start,window_end,user_id;8.2 滑动窗口 HOP-- 窗口大小10分钟每5分钟滑动一次SELECTwindow_start,window_end,user_id,COUNT(*)ASorder_cntFROMTABLE(HOP(TABLEorders,DESCRIPTOR(order_time),INTERVAL5MINUTE,INTERVAL10MINUTE))GROUPBYwindow_start,window_end,user_id;8.3 会话窗口 SESSION-- 超过30分钟没有数据关闭窗口SELECTwindow_start,window_end,user_id,COUNT(*)ASorder_cntFROMTABLE(SESSION(TABLEorders,DESCRIPTOR(order_time),INTERVAL30MINUTE))GROUPBYwindow_start,window_end,user_id;8.4 累计窗口 CUMULATETVF 特有这是 TVF 新增的窗口类型DataStream API 没有对应实现。-- 每天从0点开始每1小时输出一次当天累计订单量-- 比如 01:00 输出 0~1 点的累计02:00 输出 0~2 点的累计SELECTwindow_start,window_end,user_id,COUNT(*)AScumulative_cntFROMTABLE(CUMULATE(TABLEorders,DESCRIPTOR(order_time),INTERVAL1HOUR,INTERVAL1DAY))GROUPBYwindow_start,window_end,user_id;CUMULATE 参数表名、时间字段、累计步长、最大窗口大小效果window_start00:00 window_end01:00 → 统计 0~1 点window_start00:00 window_end02:00 → 统计 0~2 点window_start00:00 window_end03:00 → 统计 0~3 点…window_start00:00 window_end24:00 → 统计全天外卖场景大屏展示今日累计订单量每小时刷新一次。九、举两个Flink SQL的例子9.1 窗口 TopN统计每10分钟内下单金额最高的前3名用户SELECT*FROM(SELECTwindow_start,window_end,user_id,total_amount,ROW_NUMBER()OVER(PARTITIONBYwindow_start,window_endORDERBYtotal_amountDESC)ASrnFROM(SELECTwindow_start,window_end,user_id,SUM(amount)AStotal_amountFROMTABLE(TUMBLE(TABLEorders,DESCRIPTOR(order_time),INTERVAL10MINUTE))GROUPBYwindow_start,window_end,user_id))WHERErn3;9.2 窗口 Join订单表和用户表按相同窗口做 JoinJoin不太清楚的可以看之前的文章Flink知识点一Flink中的双流关联SELECTo.window_start,o.window_end,o.user_id,u.user_name,o.total_amountFROM(SELECTwindow_start,window_end,user_id,SUM(amount)AStotal_amountFROMTABLE(TUMBLE(TABLEorders,DESCRIPTOR(order_time),INTERVAL10MINUTE))GROUPBYwindow_start,window_end,user_id)oJOIN(SELECTwindow_start,window_end,user_id,user_nameFROMTABLE(TUMBLE(TABLEusers,DESCRIPTOR(login_time),INTERVAL10MINUTE))GROUPBYwindow_start,window_end,user_id,user_name)uONo.window_startu.window_startANDo.window_endu.window_endANDo.user_idu.user_id;
Flink知识点(五)|Window(窗口)
上一期我们聊到了Flink知识点四Watermark水位线本期我们继续一起聊下Flink的窗口。一、为什么需要窗口Flink 处理的是无界流数据永远不会结束。窗口把无界流切成有界的数据块才能做聚合统计。Window是无限数据流处理的核心Window 将一个无限的stream拆分成有限大小的“buckets”桶我们可以在这些桶上做计算操作。还是外卖的例子统计每10分钟下了多少单就需要把数据按时间切成一段一段来计算。二、窗口的分类2.1 按键分KeyedStream → keyBy() 之后 → .window() 每个 key 独立维护自己的窗口 Non-Keyed → 不 keyBy() → .windowAll() 所有数据进同一个窗口并行度强制为1慎用2.2 按照窗口分配数据的规则分类2.2.1 滚动窗口Tumbling固定大小不重叠每条数据只属于一个窗口。注图片来源于网上stream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).sum(amount);外卖场景统计每10分钟的订单量11:00-11:10 一个窗口11:10-11:20 一个窗口互不干扰。2.2.2 滑动窗口Sliding Window固定大小有重叠一条数据可能属于多个窗口。由窗口大小和滑动步长共同决定。注图片来源于网上stream.keyBy(Order::getUserId)// 窗口大小10分钟每5分钟滑动一次.window(SlidingEventTimeWindows.of(Time.minutes(10),Time.minutes(5))).sum(amount);外卖场景每5分钟统计一次过去10分钟的订单量用来做实时趋势监控。注意窗口大小 / 滑动步长 每条数据被计算的次数步长越小计算开销越大。2.2.3 会话窗口Session Window没有固定大小按照数据间的间隔来切分。超过指定时间没有数据窗口关闭。注图片来源于网上stream.keyBy(Order::getUserId)// 超过30分钟没有数据关闭窗口.window(EventTimeSessionWindows.withGap(Time.minutes(30))).sum(amount);外卖场景统计用户一次点餐会话的总消费用户开始浏览到下单完成算一次会话中间超过30分钟没操作就结束。扩展动态间隔// 不同用户可以有不同的 gap.window(EventTimeSessionWindows.withDynamicGap(order-order.getUserLevel().equals(VIP)?60_000L:30_000L))2.2.4 全局窗口Global Window所有数据进同一个窗口永远不自动触发必须配合自定义 Trigger 使用。注图片来源于网上stream.keyBy(Order::getUserId).window(GlobalWindows.create())// 每累积100条数据触发一次.trigger(CountTrigger.of(100)).sum(amount);外卖场景每个用户累计下了100单触发一次统计。扩展把按键分和按照窗口分配数据的规则分类组合起来看滚动滑动会话全局Keyed✅ 最常用✅✅✅Non-Keyed慎用慎用慎用慎用实际生产中基本都是Keyed 滚动/滑动/会话这三种组合。2.3 按照驱动类型分时间窗口Time Window以时间点来定义窗口的开始和结束。定点发车计数窗口Count Window以事件的个数来截取数据达到固定的个数预置就出发计算并关闭窗口。人齐发车三、窗口函数窗口收集到数据后用窗口函数来计算。3.1 增量聚合函数数据来一条处理一条不存储原始数据内存占用小。3.1.1 ReduceFunctionstream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).reduce((a,b)-{a.setAmount(a.getAmount()b.getAmount());returna;});3.1.2 AggregateFunction更灵活输入输出类型可以不同stream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).aggregate(newAggregateFunctionOrder,Tuple2Double,Integer,Double(){OverridepublicTuple2Double,IntegercreateAccumulator(){returnTuple2.of(0.0,0);// (总金额, 订单数)}OverridepublicTuple2Double,Integeradd(Orderorder,Tuple2Double,Integeracc){returnTuple2.of(acc.f0order.getAmount(),acc.f11);}OverridepublicDoublegetResult(Tuple2Double,Integeracc){returnacc.f0/acc.f1;// 返回平均金额}OverridepublicTuple2Double,Integermerge(Tuple2Double,Integera,Tuple2Double,Integerb){returnTuple2.of(a.f0b.f0,a.f1b.f1);}});3.2 全量窗口函数窗口触发时才处理能拿到窗口内所有数据和窗口元信息。3.2.1 ProcessWindowFunctionstream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).process(newProcessWindowFunctionOrder,String,String,TimeWindow(){Overridepublicvoidprocess(StringuserId,Contextctx,IterableOrderorders,CollectorStringout){longwindowStartctx.window().getStart();longwindowEndctx.window().getEnd();doubletotalStreamSupport.stream(orders.spliterator(),false).mapToDouble(Order::getAmount).sum();out.collect(String.format(用户%s 在 %s~%s 消费了 %.2f 元,userId,windowStart,windowEnd,total));}});3.2.2 WindowFunctionstream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).process(newProcessWindowFunctionOrder,String,String,TimeWindow(){Overridepublicvoidprocess(StringuserId,Contextctx,IterableOrderorders,CollectorStringout){doubletotal0;for(Orderorder:orders){totalorder.getAmount();}out.collect(userId 消费 total);}});3.3 增量和全量结合(推荐使用)用 AggregateFunction 做增量聚合再用 ProcessWindowFunction 拿窗口元信息兼顾性能和灵活性stream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).aggregate(newOrderAggregateFunction(),newOrderProcessWindowFunction());四、触发器Trigger决定窗口什么时候触发计算默认不需要手动配置。// 内置触发器EventTimeTrigger.create()// 默认Watermark 超过窗口结束时间触发ProcessingTimeTrigger.create()// 处理时间触发CountTrigger.of(100)// 累积100条触发PurgingTrigger.of(...)// 触发后清空窗口数据// 自定义触发器.trigger(newTriggerOrder,TimeWindow(){OverridepublicTriggerResultonElement(Orderorder,longtimestamp,TimeWindowwindow,TriggerContextctx){// 每来一条数据都触发实时输出但开销大returnTriggerResult.FIRE;}OverridepublicTriggerResultonEventTime(longtime,TimeWindowwindow,TriggerContextctx){returnTriggerResult.FIRE_AND_PURGE;// 触发并清空}OverridepublicTriggerResultonProcessingTime(longtime,TimeWindowwindow,TriggerContextctx){returnTriggerResult.CONTINUE;}});TriggerResult 的四种结果结果含义CONTINUE什么都不做FIRE触发计算保留数据PURGE清空数据不触发FIRE_AND_PURGE触发计算并清空数据五、迟到数据处理那就使用侧输出流OutputTagOrderlateTagnewOutputTagOrder(late-orders){};SingleOutputStreamOperatorStringresultstream.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(10))).allowedLateness(Time.minutes(1))// 窗口关闭后再等1分钟.sideOutputLateData(lateTag)// 超过1分钟的数据发到侧输出.process(newOrderProcessWindowFunction());// 单独处理极端迟到的数据result.getSideOutput(lateTag).print(极端迟到的订单);六、整体流程总结数据流 → keyBy() 按 key 分区 → window() 指定窗口类型滚动/滑动/会话/全局 → trigger() 指定触发条件可选有默认值 → allowedLateness() 处理迟到数据可选 → aggregate/process 窗口函数计算结果七、怎么选窗口类型固定周期统计每小时、每天 → 滚动窗口 实时趋势、移动平均 → 滑动窗口 用户行为分析、会话统计 → 会话窗口 按条数或自定义条件触发 → 全局窗口 自定义 Trigger八、Flink SQL中的窗口使用TVFTable-Valued Function窗口8.1 滚动窗口 TUMBLE-- 每10分钟统计一次各用户的订单量和总金额SELECTwindow_start,window_end,user_id,COUNT(*)ASorder_cnt,SUM(amount)AStotal_amountFROMTABLE(TUMBLE(TABLEorders,DESCRIPTOR(order_time),INTERVAL10MINUTE))GROUPBYwindow_start,window_end,user_id;8.2 滑动窗口 HOP-- 窗口大小10分钟每5分钟滑动一次SELECTwindow_start,window_end,user_id,COUNT(*)ASorder_cntFROMTABLE(HOP(TABLEorders,DESCRIPTOR(order_time),INTERVAL5MINUTE,INTERVAL10MINUTE))GROUPBYwindow_start,window_end,user_id;8.3 会话窗口 SESSION-- 超过30分钟没有数据关闭窗口SELECTwindow_start,window_end,user_id,COUNT(*)ASorder_cntFROMTABLE(SESSION(TABLEorders,DESCRIPTOR(order_time),INTERVAL30MINUTE))GROUPBYwindow_start,window_end,user_id;8.4 累计窗口 CUMULATETVF 特有这是 TVF 新增的窗口类型DataStream API 没有对应实现。-- 每天从0点开始每1小时输出一次当天累计订单量-- 比如 01:00 输出 0~1 点的累计02:00 输出 0~2 点的累计SELECTwindow_start,window_end,user_id,COUNT(*)AScumulative_cntFROMTABLE(CUMULATE(TABLEorders,DESCRIPTOR(order_time),INTERVAL1HOUR,INTERVAL1DAY))GROUPBYwindow_start,window_end,user_id;CUMULATE 参数表名、时间字段、累计步长、最大窗口大小效果window_start00:00 window_end01:00 → 统计 0~1 点window_start00:00 window_end02:00 → 统计 0~2 点window_start00:00 window_end03:00 → 统计 0~3 点…window_start00:00 window_end24:00 → 统计全天外卖场景大屏展示今日累计订单量每小时刷新一次。九、举两个Flink SQL的例子9.1 窗口 TopN统计每10分钟内下单金额最高的前3名用户SELECT*FROM(SELECTwindow_start,window_end,user_id,total_amount,ROW_NUMBER()OVER(PARTITIONBYwindow_start,window_endORDERBYtotal_amountDESC)ASrnFROM(SELECTwindow_start,window_end,user_id,SUM(amount)AStotal_amountFROMTABLE(TUMBLE(TABLEorders,DESCRIPTOR(order_time),INTERVAL10MINUTE))GROUPBYwindow_start,window_end,user_id))WHERErn3;9.2 窗口 Join订单表和用户表按相同窗口做 JoinJoin不太清楚的可以看之前的文章Flink知识点一Flink中的双流关联SELECTo.window_start,o.window_end,o.user_id,u.user_name,o.total_amountFROM(SELECTwindow_start,window_end,user_id,SUM(amount)AStotal_amountFROMTABLE(TUMBLE(TABLEorders,DESCRIPTOR(order_time),INTERVAL10MINUTE))GROUPBYwindow_start,window_end,user_id)oJOIN(SELECTwindow_start,window_end,user_id,user_nameFROMTABLE(TUMBLE(TABLEusers,DESCRIPTOR(login_time),INTERVAL10MINUTE))GROUPBYwindow_start,window_end,user_id,user_name)uONo.window_startu.window_startANDo.window_endu.window_endANDo.user_idu.user_id;