一、ProcessFunction 与 MapFunction 区别1、功能和区别MapFunction:纯数据转换,一条进一条出,无状态、无时间、无侧输出,只能做简单映射。ProcessFunction:全能处理,一条进可以 0/1/N 条出,支持状态、定时器、侧输出、访问时间,能实现复杂业务逻辑。简单说:Map 能干的,Process 全能干;Process 能干的,Map 干不了。2、processFunction用法2.1、初级:基础过滤与数据增强场景:监控传感器读数,过滤异常值并添加标记package com.self.map; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; public class BasicProcessFunctionExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 简化输出 // 模拟传感器数据流 (传感器ID, 温度值) DataStreamSensorReading sensorStream = env .fromElements( new SensorReading("sensor_1", 25.5), new SensorReading("sensor_1", 95.0), // 异常高温 new SensorReading("sensor_2", 22.3), new SensorReading("sensor_2", -5.0) // 异常低温 ); // 使用 ProcessFunction 进行异常检测和标记 DataStreamEnrichedReading processedStream = sensorStream .process(new AnomalyDetectionProcessFunction()); processedStream.print(); env.execute("Basic ProcessFunction Demo"); } // 传感器数据类 public static class SensorReading { public String sensorId; public double temperature; public SensorReading(String sensorId, double temperature) { this.sensorId = sensorId; this.temperature = temperature; } @Override public String toString() { return String.format("SensorReading{sensor='%s', temp=%.1f}", sensorId, temperature); } } // 增强后的数据类 public static class EnrichedReading { public String sensorId; public double temperature; public boolean isAnomaly; public String status; public EnrichedReading(String sensorId, double temperature, boolean isAnomaly, String status) { this.sensorId = sensorId; this.temperature = temperature; this.isAnomaly = isAnomaly; this.status = status; } @Override public String toString() { return String.format("EnrichedReading{sensor='%s', temp=%.1f, anomaly=%s, status='%s'}", sensorId, temperature, isAnomaly, status); } } /** * 初级 ProcessFunction:过滤异常值并添加标记 * 核心 API: * - processElement():处理每条记录 * - ctx.output():可输出到侧输出流(本例未使用) * - ctx.timestamp():获取事件时间戳 */ public static class AnomalyDetectionProcessFunction extends ProcessFunctionSensorReading, EnrichedReading { private static final double MAX_NORMAL_TEMP = 80.0; private static final double MIN_NORMAL_TEMP = 0.0; @Override public void processElement( SensorReading reading, Context ctx, CollectorEnrichedReading out) throws Exception { boolean isAnomaly = reading.temperature MAX_NORMAL_TEMP || reading.temperature MIN_NORMAL_TEMP; String status; if (reading.temperature MAX_NORMAL_TEMP) { status = "HIGH_TEMP_ALERT"; } else if (reading.temperature MIN_NORMAL_TEMP) { status = "LOW_TEMP_ALERT"; } else { status = "NORMAL"; } // 可以访问事件时间戳(如果存在) Long timestamp = ctx.timestamp(); System.out.println("Processing at timestamp: " + timestamp); // 输出处理后的结果 out.collect(new EnrichedReading( reading.sensorId, reading.temperature, isAnomaly, status )); } } }关键学习点:processElement的基础用法Context对象获取时间戳Collector输出数据2.2、中级:使用定时器和状态进行延迟告警场景:监测传感器在 5 秒内没有收到数据,触发告警package com.self.map; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import java.time.Duration; public class IntermediateProcessFunctionExample { public static void main(String[] args) throws Exception { // 获取流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 env.setParallelism(1); // 设置水位线 = 最大时间 - 2s;使用数据自带的时间作为事件事件 DataStreamSensorEvent eventStream = env .addSource(new SensorSource()) .assignTimestampsAndWatermarks( WatermarkStrategy.SensorEventforBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner((event, timestamp) - event.timestamp) ); // 根据id分组 + 处理数据 DataStreamAlert alertStream = eventStream .keyBy(event - event.sensorId) .process(new InactivityAlertProcessFunction(5000L)); //打印数据 alertStream.print(); //执行 env.execute("Intermediate ProcessFunction Demo"); } public static class SensorEvent { public String sensorId; public double value; public long timestamp; public SensorEvent(String sensorId, double value, long timestamp) { this.sensorId = sensorId; this.value = value; this.timestamp = timestamp; } @Override public String toString() { return String.format("Event{sensor='%s', value=%.2f, time=%d}", sensorId, value, timestamp); } } public static class Alert { public String sensorId; public String message; public long alertTime; public Alert(String sensorId, String message, long alertTime) { this.sensorId = sensorId; this.message = message; this.alertTime = alertTime; } @Override public String toString() { return String.format("ALERT! sensor='%s', msg='%s', at=%d", sensorId, message, alertTime); } } /** * 使用 KeyedProcessFunction 而不是 ProcessFunction * 这样才能使用 getCurrentKey() 方法 */ public static class InactivityAlertProcessFunction extends KeyedProcessFunctionString, SensorEvent, Alert { private final long inactivityThresholdMs; //最大静默时间 private transient ValueStateLong lastEventTimeState; //最后活跃时间 private transient ValueStateLong registeredTimerState; //闹钟时间 // 构造函数 public InactivityAlertProcessFunction(long inactivityThresholdMs) { this.inactivityThresholdMs = inactivityThresholdMs; } @Override public void open(Configuration parameters) throws Exception { //状态说明书 获取最后活跃时间 ValueStateDescriptorLong lastEventDesc = new ValueStateDescriptor( "lastEventTime", Long.class ); //从上下文获取状态值 lastEventTimeState = getRuntimeContext().getState(lastEventDesc); //状态说明书 获取闹钟时间 ValueStateDescriptorLong timerDesc = new ValueStateDescriptor( "registeredTimer", Long.class ); //从上下文获取状态值 registeredTimerState = getRuntimeContext().getState(timerDesc); } /** * 更新闹钟和事件时间 * @param event * @param ctx * @param out * @throws Exception */ @Override public void processElement( SensorEvent event, Context ctx, CollectorAlert out) throws Exception { // 获取getCurrentKey() String currentKey = ctx.getCurrentKey(); // 获取/更新最后活跃时间 long currentTime = event.timestamp; lastEventTimeState.update(currentTime); //获取旧闹钟,若存在删除 Long oldTimer = registeredTimerState.value(); if (oldTimer != null) { ctx.timerService().deleteEventTimeTimer(oldTimer); } //设置新闹钟时间 long alarmTime = currentTime + inactivityThresholdMs; //注册闹钟 ctx.timerService().registerEventTimeTimer(alarmTime); //更新闹钟时间 registeredTimerState.update(alarmTime); //打印数据 System.out.println(String.format( "[%s] Processed event at %d, registered alarm at %d", currentKey, currentTime, alarmTime )); } /** * 定时器,定时检测数据有没有到 * @param timestamp * @param ctx * @param out * @throws Exception */ @Override public void onTimer(long timestamp, OnTimerContext ctx, CollectorAlert out) throws Exception { // 获取当前数据的key String key = ctx.getCurrentKey(); //获取最后一次活跃时间和闹钟时间 Long lastTime = lastEventTimeState.value(); Long registeredTimer = registeredTimerState.value(); //若没有闹钟,则无须继续 if (registeredTimer == null || timestamp != registeredTimer) { return; } //若没有最后活跃时间,则无须继续 if (lastTime == null) { return; } //最后一次活跃时间 + 静默时间 = 当前时间 ,报警 if (lastTime + inactivityThresholdMs = timestamp) { out.collect(new Alert( key, String.format("No data received for %d ms!", inactivityThresholdMs), timestamp )); } //清除闹钟 registeredTimerState.clear(); } } // 模拟数据源 public static class SensorSource implements SourceFunctionSensorEvent { private volatile boolean running = true; @Override public void run(SourceContextSensorEvent ctx) throws Exception { long startTime = System.currentTimeMillis(); System.out.println("=== Starting data source ==="); for (int i = 0; i 3 running; i++) { long eventTime = startTime + i * 1000; ctx.collectWithTimestamp(new SensorEvent("sensor_1", 100 + i, eventTime), eventTime); System.out.println("Sent: sensor_1 at " + eventTime); Thread.sleep(500); } System.out.println("=== sensor_1 going silent for 6 seconds ==="); Thread.sleep(6000); long resumeTime = System.currentTimeMillis(); ctx.collectWithTimestamp(new SensorEvent("sensor_1", 103, resumeTime), resumeTime); System.out.println("Sent: sensor_1 at " + resum
Flink 流处理核心算子深度剖析
一、ProcessFunction 与 MapFunction 区别1、功能和区别MapFunction:纯数据转换,一条进一条出,无状态、无时间、无侧输出,只能做简单映射。ProcessFunction:全能处理,一条进可以 0/1/N 条出,支持状态、定时器、侧输出、访问时间,能实现复杂业务逻辑。简单说:Map 能干的,Process 全能干;Process 能干的,Map 干不了。2、processFunction用法2.1、初级:基础过滤与数据增强场景:监控传感器读数,过滤异常值并添加标记package com.self.map; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; public class BasicProcessFunctionExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 简化输出 // 模拟传感器数据流 (传感器ID, 温度值) DataStreamSensorReading sensorStream = env .fromElements( new SensorReading("sensor_1", 25.5), new SensorReading("sensor_1", 95.0), // 异常高温 new SensorReading("sensor_2", 22.3), new SensorReading("sensor_2", -5.0) // 异常低温 ); // 使用 ProcessFunction 进行异常检测和标记 DataStreamEnrichedReading processedStream = sensorStream .process(new AnomalyDetectionProcessFunction()); processedStream.print(); env.execute("Basic ProcessFunction Demo"); } // 传感器数据类 public static class SensorReading { public String sensorId; public double temperature; public SensorReading(String sensorId, double temperature) { this.sensorId = sensorId; this.temperature = temperature; } @Override public String toString() { return String.format("SensorReading{sensor='%s', temp=%.1f}", sensorId, temperature); } } // 增强后的数据类 public static class EnrichedReading { public String sensorId; public double temperature; public boolean isAnomaly; public String status; public EnrichedReading(String sensorId, double temperature, boolean isAnomaly, String status) { this.sensorId = sensorId; this.temperature = temperature; this.isAnomaly = isAnomaly; this.status = status; } @Override public String toString() { return String.format("EnrichedReading{sensor='%s', temp=%.1f, anomaly=%s, status='%s'}", sensorId, temperature, isAnomaly, status); } } /** * 初级 ProcessFunction:过滤异常值并添加标记 * 核心 API: * - processElement():处理每条记录 * - ctx.output():可输出到侧输出流(本例未使用) * - ctx.timestamp():获取事件时间戳 */ public static class AnomalyDetectionProcessFunction extends ProcessFunctionSensorReading, EnrichedReading { private static final double MAX_NORMAL_TEMP = 80.0; private static final double MIN_NORMAL_TEMP = 0.0; @Override public void processElement( SensorReading reading, Context ctx, CollectorEnrichedReading out) throws Exception { boolean isAnomaly = reading.temperature MAX_NORMAL_TEMP || reading.temperature MIN_NORMAL_TEMP; String status; if (reading.temperature MAX_NORMAL_TEMP) { status = "HIGH_TEMP_ALERT"; } else if (reading.temperature MIN_NORMAL_TEMP) { status = "LOW_TEMP_ALERT"; } else { status = "NORMAL"; } // 可以访问事件时间戳(如果存在) Long timestamp = ctx.timestamp(); System.out.println("Processing at timestamp: " + timestamp); // 输出处理后的结果 out.collect(new EnrichedReading( reading.sensorId, reading.temperature, isAnomaly, status )); } } }关键学习点:processElement的基础用法Context对象获取时间戳Collector输出数据2.2、中级:使用定时器和状态进行延迟告警场景:监测传感器在 5 秒内没有收到数据,触发告警package com.self.map; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import java.time.Duration; public class IntermediateProcessFunctionExample { public static void main(String[] args) throws Exception { // 获取流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 env.setParallelism(1); // 设置水位线 = 最大时间 - 2s;使用数据自带的时间作为事件事件 DataStreamSensorEvent eventStream = env .addSource(new SensorSource()) .assignTimestampsAndWatermarks( WatermarkStrategy.SensorEventforBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner((event, timestamp) - event.timestamp) ); // 根据id分组 + 处理数据 DataStreamAlert alertStream = eventStream .keyBy(event - event.sensorId) .process(new InactivityAlertProcessFunction(5000L)); //打印数据 alertStream.print(); //执行 env.execute("Intermediate ProcessFunction Demo"); } public static class SensorEvent { public String sensorId; public double value; public long timestamp; public SensorEvent(String sensorId, double value, long timestamp) { this.sensorId = sensorId; this.value = value; this.timestamp = timestamp; } @Override public String toString() { return String.format("Event{sensor='%s', value=%.2f, time=%d}", sensorId, value, timestamp); } } public static class Alert { public String sensorId; public String message; public long alertTime; public Alert(String sensorId, String message, long alertTime) { this.sensorId = sensorId; this.message = message; this.alertTime = alertTime; } @Override public String toString() { return String.format("ALERT! sensor='%s', msg='%s', at=%d", sensorId, message, alertTime); } } /** * 使用 KeyedProcessFunction 而不是 ProcessFunction * 这样才能使用 getCurrentKey() 方法 */ public static class InactivityAlertProcessFunction extends KeyedProcessFunctionString, SensorEvent, Alert { private final long inactivityThresholdMs; //最大静默时间 private transient ValueStateLong lastEventTimeState; //最后活跃时间 private transient ValueStateLong registeredTimerState; //闹钟时间 // 构造函数 public InactivityAlertProcessFunction(long inactivityThresholdMs) { this.inactivityThresholdMs = inactivityThresholdMs; } @Override public void open(Configuration parameters) throws Exception { //状态说明书 获取最后活跃时间 ValueStateDescriptorLong lastEventDesc = new ValueStateDescriptor( "lastEventTime", Long.class ); //从上下文获取状态值 lastEventTimeState = getRuntimeContext().getState(lastEventDesc); //状态说明书 获取闹钟时间 ValueStateDescriptorLong timerDesc = new ValueStateDescriptor( "registeredTimer", Long.class ); //从上下文获取状态值 registeredTimerState = getRuntimeContext().getState(timerDesc); } /** * 更新闹钟和事件时间 * @param event * @param ctx * @param out * @throws Exception */ @Override public void processElement( SensorEvent event, Context ctx, CollectorAlert out) throws Exception { // 获取getCurrentKey() String currentKey = ctx.getCurrentKey(); // 获取/更新最后活跃时间 long currentTime = event.timestamp; lastEventTimeState.update(currentTime); //获取旧闹钟,若存在删除 Long oldTimer = registeredTimerState.value(); if (oldTimer != null) { ctx.timerService().deleteEventTimeTimer(oldTimer); } //设置新闹钟时间 long alarmTime = currentTime + inactivityThresholdMs; //注册闹钟 ctx.timerService().registerEventTimeTimer(alarmTime); //更新闹钟时间 registeredTimerState.update(alarmTime); //打印数据 System.out.println(String.format( "[%s] Processed event at %d, registered alarm at %d", currentKey, currentTime, alarmTime )); } /** * 定时器,定时检测数据有没有到 * @param timestamp * @param ctx * @param out * @throws Exception */ @Override public void onTimer(long timestamp, OnTimerContext ctx, CollectorAlert out) throws Exception { // 获取当前数据的key String key = ctx.getCurrentKey(); //获取最后一次活跃时间和闹钟时间 Long lastTime = lastEventTimeState.value(); Long registeredTimer = registeredTimerState.value(); //若没有闹钟,则无须继续 if (registeredTimer == null || timestamp != registeredTimer) { return; } //若没有最后活跃时间,则无须继续 if (lastTime == null) { return; } //最后一次活跃时间 + 静默时间 = 当前时间 ,报警 if (lastTime + inactivityThresholdMs = timestamp) { out.collect(new Alert( key, String.format("No data received for %d ms!", inactivityThresholdMs), timestamp )); } //清除闹钟 registeredTimerState.clear(); } } // 模拟数据源 public static class SensorSource implements SourceFunctionSensorEvent { private volatile boolean running = true; @Override public void run(SourceContextSensorEvent ctx) throws Exception { long startTime = System.currentTimeMillis(); System.out.println("=== Starting data source ==="); for (int i = 0; i 3 running; i++) { long eventTime = startTime + i * 1000; ctx.collectWithTimestamp(new SensorEvent("sensor_1", 100 + i, eventTime), eventTime); System.out.println("Sent: sensor_1 at " + eventTime); Thread.sleep(500); } System.out.println("=== sensor_1 going silent for 6 seconds ==="); Thread.sleep(6000); long resumeTime = System.currentTimeMillis(); ctx.collectWithTimestamp(new SensorEvent("sensor_1", 103, resumeTime), resumeTime); System.out.println("Sent: sensor_1 at " + resum