智能快递柜湿度检测与防潮预警系统一、实际应用场景描述随着电商行业的蓬勃发展智能快递柜已成为城市末端配送的核心基础设施。这些柜体通常部署在小区、写字楼、地铁站等公共场所24小时不间断服务。然而现实环境中快递柜面临着严峻的防潮挑战雨季空气湿度飙升、地下室通风不良、沿海地区盐雾侵蚀、冬季温差导致的冷凝水等问题时刻威胁着包裹内物品的安全。想象这样一个场景李女士网购了一台昂贵的单反相机快递员将其存入小区智能快递柜。恰逢连日阴雨柜内湿度悄然攀升至85%。第二天李女士取件时发现相机的包装盒已经受潮变形内部电路板出现霉斑最终造成数千元的损失。类似的情况在全国各地的快递柜中每天都在发生——电子产品短路、纸质文件发霉、药品失效、衣物异味...本系统专门针对这一痛点通过在智能快递柜内部署高精度湿度传感器实时监控柜内环境当湿度超过安全阈值时立即触发多级防潮预警并通过LED指示灯、蜂鸣器和云端推送通知管理人员及时处理最大程度保护包裹内物品免受湿气侵害。二、引入痛点痛点类型 具体表现 潜在损失 本方案解决方式环境湿度不可控 雨季/梅雨季柜内湿度可达90% 电子产品损坏、文档发霉 实时监测阈值预警冷凝水隐患 昼夜温差大时内壁结露滴落 包裹湿损、细菌滋生 温湿度联动分析冷凝预警通风死角 密闭空间水汽难以扩散 长期存放物品变质 通风建议除湿提醒预警滞后 依赖人工巡检发现潮湿 批量包裹受损 自动检测即时报警无法追溯 不知何时湿度超标 责任界定困难 数据记录历史查询维护被动 故障后才发现问题 运营成本高 预防性维护提醒三、核心逻辑讲解graph TDA[系统初始化] -- B[传感器校准与环境检测]B -- C[加载湿度阈值配置]C -- D[启动数据采集线程]D -- E[进入主循环监控]E -- F{读取当前湿度值}F -- G[数据滤波处理]G -- H{湿度警戒阈值?}H -- 是 -- I[触发一级防潮预警]I -- J[LED黄色闪烁蜂鸣器间歇鸣叫]J -- K[记录湿度超标事件]K -- L{湿度危险阈值?}L -- 是 -- M[触发二级紧急预警]M -- N[LED红色快闪蜂鸣器连续报警]N -- O[发送云端告警通知]O -- P[启动强制通风建议]L -- 否 -- Q[维持一级预警状态]H -- 否 -- R{湿度正常范围?}R -- 是 -- S[LED绿色常亮系统正常]R -- 否 -- T[更新湿度趋势分析]S -- U{检查历史数据?}U -- 是 -- V[生成湿度分析报告]U -- 否 -- ET -- EQ -- EV -- E关键逻辑点1. 多重阈值策略设置警戒阈值70%和危险阈值85%分级响应不同严重程度2. 数据滤波算法采用滑动平均中值滤波组合消除传感器瞬时波动干扰3. 温湿度联动结合温度变化预测冷凝风险提前预警结露可能性4. 趋势分析通过历史数据识别湿度上升模式预测未来风险时段5. 多级通知本地声光提示云端推送运维平台告警确保信息触达四、代码模块化实现项目结构smart_delivery_locker_humidity/├── main.py # 主程序入口├── config.py # 配置文件├── sensors/ # 传感器模块│ ├── __init__.py│ └── humidity_sensor.py # DHT22湿度传感器├── processing/ # 数据处理模块│ ├── __init__.py│ └── data_filter.py # 数据滤波与校准├── alerts/ # 预警模块│ ├── __init__.py│ ├── local_alerter.py # 本地声光报警│ └── cloud_notifier.py # 云端通知推送├── storage/ # 数据存储模块│ ├── __init__.py│ └── humidity_logger.py # 湿度数据记录├── analysis/ # 数据分析模块│ ├── __init__.py│ └── trend_analyzer.py # 湿度趋势分析├── utils/ # 工具函数│ ├── __init__.py│ └── time_utils.py # 时间工具└── web/ # Web管理界面(可选)├── __init__.py└── dashboard.py # 监控仪表板1. 配置文件 (config.py)智能快递柜湿度检测系统配置文件包含传感器参数、预警阈值、设备配置等# 系统基本信息SYSTEM_NAME SmartLockerHumidityMonitorSYSTEM_VERSION 1.0.0DEVICE_ID LOCKER_HUMIDITY_001DEVICE_NAME A区3号快递柜# 湿度传感器配置HUMIDITY_SENSOR_TYPE DHT22 # 支持DHT11/DHT22/SHT30DHT_PIN 4 # DHT传感器数据引脚SHT30_I2C_ADDR 0x44 # SHT30 I2C地址SENSOR_READ_INTERVAL 30 # 传感器读取间隔秒DATA_FILTER_WINDOW 5 # 数据滤波窗口大小# 湿度阈值配置HUMIDITY_NORMAL_MAX 60.0 # 正常湿度上限%HUMIDITY_WARNING_THRESHOLD 70.0 # 警戒阈值%HUMIDITY_DANGER_THRESHOLD 85.0 # 危险阈值%HUMIDITY_CRITICAL_THRESHOLD 95.0 # 临界阈值%# 温度联动配置用于冷凝风险预测TEMPERATURE_COLD_THRESHOLD 5.0 # 低温阈值℃TEMPERATURE_HOT_THRESHOLD 35.0 # 高温阈值℃CONDENSATION_RISK_THRESHOLD 10.0 # 温差冷凝风险℃# 预警配置ENABLE_LOCAL_ALERTS True # 启用本地声光预警ENABLE_CLOUD_ALERTS True # 启用云端通知ENABLE_SMS_ALERTS False # 启用短信通知WARNING_LED_PIN 17 # 预警LED引脚WARNING_BUZZER_PIN 27 # 预警蜂鸣器引脚LED_YELLOW True # 警戒状态LED颜色LED_RED True # 危险状态LED颜色BUZZER_WARNING_FREQ 800 # 警戒蜂鸣频率HzBUZZER_DANGER_FREQ 1200 # 危险蜂鸣频率Hz# 通风控制配置VENTILATION_RECOMMENDATION_ENABLED True # 启用通风建议MIN_VENTILATION_INTERVAL 3600 # 最小通风建议间隔秒AUTO_VENTILATION_DURATION 300 # 自动通风时长秒# 数据存储配置HUMIDITY_LOG_FILE humidity_data.csvHUMIDITY_LOG_RETENTION_DAYS 30 # 数据保留天数ENABLE_DATABASE_STORAGE False # 启用数据库存储DATABASE_PATH humidity_data.db# 云端通信配置CLOUD_API_ENDPOINT https://api.smartlocker.com/humidity/alertAPI_KEY your_api_key_hereDEVICE_TOKEN locker_device_token_hereUPLOAD_INTERVAL 300 # 数据上传间隔秒# 维护配置CALIBRATION_INTERVAL 86400 # 传感器校准间隔秒- 24小时MAINTENANCE_CHECK_INTERVAL 3600 # 维护检查间隔秒SENSOR_ERROR_THRESHOLD 5 # 连续错误次数阈值# 环境参数CABINET_VOLUME 0.5 # 柜体容积立方米AIR_EXCHANGE_RATE 0.1 # 空气交换率DEHUMIDIFIER_CAPACITY 200 # 除湿机容量g/h# 调试配置DEBUG_MODE TrueLOG_LEVEL DEBUGSIMULATION_MODE False # 模拟模式MOCK_HUMIDITY_BASE 65.0 # 模拟基础湿度MOCK_HUMIDITY_VARIATION 5.0 # 模拟湿度波动范围2. 湿度传感器模块 (sensors/humidity_sensor.py)DHT22高精度湿度传感器模块提供温度和湿度的精确测量支持数据校验import Adafruit_DHTimport timeimport randomfrom config import (HUMIDITY_SENSOR_TYPE, DHT_PIN, SHT30_I2C_ADDR,SENSOR_READ_INTERVAL, SIMULATION_MODE,MOCK_HUMIDITY_BASE, MOCK_HUMIDITY_VARIATION,DEBUG_MODE, CALIBRATION_INTERVAL)from utils.time_utils import get_timestamp, retry_on_exceptionclass HumiditySensor:DHT22数字温湿度传感器驱动提供高精度(±2%RH)的湿度测量和环境温度监测支持数据有效性验证和异常值过滤# DHT22技术参数DHT22_HUMIDITY_ACCURACY ±2.0 # 湿度精度%RHDHT22_HUMIDITY_RANGE (0, 100) # 湿度测量范围DHT22_TEMPERATURE_ACCURACY ±0.5 # 温度精度℃DHT22_MIN_READ_INTERVAL 2.0 # 最小读取间隔秒def __init__(self):初始化湿度传感器self.sensor_type HUMIDITY_SENSOR_TYPEself.pin DHT_PINself.i2c_addr SHT30_I2C_ADDRself.last_read_time 0self.last_valid_reading Noneself.read_count 0self.error_count 0self.calibration_offset 0.0self.needs_calibration Trueself.calibration_time 0if not SIMULATION_MODE:self._initialize_sensor()else:print( 模拟模式湿度传感器已初始化模拟DHT22)print(f✅ 湿度传感器初始化成功)print(f 传感器类型: {self.sensor_type})print(f 数据引脚: GPIO{DHT_PIN})print(f 测量精度: ±2%RH, ±0.5℃)print(f 读取间隔: {SENSOR_READ_INTERVAL}秒)def _initialize_sensor(self):初始化物理传感器try:if self.sensor_type DHT22:# 测试传感器连接test_humidity, test_temp Adafruit_DHT.read_retry(Adafruit_DHT.DHT22, self.pin)if test_humidity is not None:print( DHT22传感器连接正常)else:print(⚠️ DHT22传感器连接异常请检查接线)elif self.sensor_type SHT30:# SHT30 I2C初始化import smbusself.bus smbus.SMBus(1)# 软复位self.bus.write_i2c_block_data(self.i2c_addr, 0x30, [0xA2])time.sleep(0.01)print( SHT30传感器连接正常)except ImportError:print(⚠️ 传感器库未安装切换到模拟模式)global SIMULATION_MODESIMULATION_MODE Trueexcept Exception as e:print(f⚠️ 传感器初始化失败: {e})print( 切换到模拟模式运行)SIMULATION_MODE Trueretry_on_exception(retries3, delay1.0)def read_humidity_temperature(self) - tuple:读取湿度和温度数据return: (humidity, temperature) 元组失败返回(None, None)current_time time.time()# 检查读取间隔if current_time - self.last_read_time self.DHT22_MIN_READ_INTERVAL:if self.last_valid_reading:return self.last_valid_readingself.last_read_time current_timeif SIMULATION_MODE:return self._read_simulated_data()try:if self.sensor_type DHT22:humidity, temperature Adafruit_DHT.read_retry(Adafruit_DHT.DHT22, self.pin, retries3, delay_seconds0.5)elif self.sensor_type SHT30:humidity, temperature self._read_sht30()else:return self._read_simulated_data()# 数据有效性验证if self._validate_reading(humidity, temperature):# 应用校准偏移calibrated_humidity humidity self.calibration_offset# 限制有效范围calibrated_humidity max(0, min(100, calibrated_humidity))if temperature is not None:temperature max(-40, min(80, temperature))reading (round(calibrated_humidity, 1),round(temperature, 1) if temperature else None)self.last_valid_reading readingself.read_count 1if DEBUG_MODE:print(f 传感器读数: 湿度{reading[0]}%, 温度{reading[1]}℃)return readingelse:self.error_count 1if DEBUG_MODE:print(⚠️ 传感器数据无效使用上次有效值)return self.last_valid_readingexcept Exception as e:self.error_count 1if DEBUG_MODE:print(f❌ 传感器读取错误: {e})return self.last_valid_readingdef _read_sht30(self) - tuple:读取SHT30传感器数据import smbus# 触发测量self.bus.write_i2c_block_data(self.i2c_addr, 0x24, [0x00])time.sleep(0.015) # 等待测量完成# 读取6字节数据data self.bus.read_i2c_block_data(self.i2c_addr, 0xE0, 6)# 解析湿度16位单位0.01%RHhumidity_raw (data[3] 8) | data[4]humidity (humidity_raw / 65535.0) * 100.0# 解析温度16位单位0.01℃temp_raw (data[0] 8) | data[1]temperature -45.0 (175.0 * temp_raw / 65535.0)return humidity, temperaturedef _read_simulated_data(self) - tuple:生成模拟湿度数据用于测试base_humidity MOCK_HUMIDITY_BASEvariation MOCK_HUMIDITY_VARIATION# 添加时间相关的变化模式hour time.localtime().tm_hourif 6 hour 18: # 白天湿度略低base_humidity - 5else: # 夜间湿度略高base_humidity 3# 添加随机波动humidity base_humidity random.uniform(-variation, variation)temperature 20.0 random.uniform(-3, 3)# 模拟偶发异常值if random.random() 0.02: # 2%概率产生异常humidity random.choice([None, 105.0, -5.0])# 数据有效性验证if humidity is not None and 0 humidity 100:reading (round(humidity, 1), round(temperature, 1))self.last_valid_reading readingreturn readingelse:return self.last_valid_readingdef _validate_reading(self, humidity: float, temperature: float) - bool:验证传感器数据有效性humidity: 湿度值temperature: 温度值return: 数据是否有效# 检查None值if humidity is None:return False# 检查湿度范围DHT22理论范围0-100%if not (0 humidity 100):if DEBUG_MODE:print(f 湿度超出范围: {humidity}%)return False# 检查温度范围DHT22理论范围-40~80℃if temperature is not None and not (-40 temperature 80):if DEBUG_MODE:print(f 温度超出范围: {temperature}℃)return False# 检查物理合理性高温高湿的极端情况if temperature is not None and temperature 35 and humidity 90:if DEBUG_MODE:print(f 物理上不太可能的组合: {temperature}℃, {humidity}%)return False# 检查突变与前次读数差异过大if self.last_valid_reading:last_humidity self.last_valid_reading[0]if abs(humidity - last_humidity) 15: # 15%突变阈值if DEBUG_MODE:print(f 湿度突变过大: {last_humidity}% → {humidity}%)# 可能是传感器故障但仍返回True让其通过滤波处理return Truedef calibrate(self, reference_humidity: float None):传感器校准reference_humidity: 参考湿度值如使用专业仪器测得print( 开始传感器校准...)if reference_humidity is None:# 自动校准连续读取多次取平均作为基准readings []print( 采集校准数据...)for i in range(10):result self.read_humidity_temperature()if result[0] is not None:readings.append(result[0])time.sleep(1.0)if len(readings) 5:avg_humidity sum(readings) / len(readings)# 假设环境湿度在50-60%之间计算偏移if 45 avg_humidity 65:self.calibration_offset 55.0 - avg_humidityprint(f 自动校准完成偏移量: {self.calibration_offset:.2f}%)else:print(f ⚠️ 环境湿度可能异常({avg_humidity:.1f}%)使用默认校准)self.calibration_offset 0.0else:print( ⚠️ 校准数据不足使用默认校准)self.calibration_offset 0.0else:# 使用参考值校准current_reading self.read_humidity_temperature()[0]if current_reading:self.calibration_offset reference_humidity - current_readingprint(f 参考校准完成偏移量: {self.calibration_offset:.2f}%)self.calibration_time time.time()self.needs_calibration Falseprint(f✅ 校准完成当前偏移量: {self.calibration_offset:.2f}%)def check_calibration_needed(self):检查是否需要重新校准current_time time.time()if current_time - self.calibration_time CALIBRATION_INTERVAL:self.needs_calibration Trueprint(⏰ 传感器需要重新校准24小时周期)def get_sensor_status(self) - dict:获取传感器状态信息error_rate (self.error_count / max(1, self.read_count)) * 100return {sensor_type: self.sensor_type,is_simulated: SIMULATION_MODE,read_count: self.read_count,error_count: self.error_count,error_rate: round(error_rate, 2),calibration_offset: round(self.calibration_offset, 2),needs_calibration: self.needs_calibration,last_read_time: get_timestamp(),last_valid_reading: self.last_valid_reading}def cleanup(self):清理传感器资源if not SIMULATION_MODE and self.sensor_type SHT30:try:self.bus.close()except:passprint( 湿度传感器资源已清理)3. 数据滤波模块 (processing/data_filter.py)数据滤波与校准模块对原始传感器数据进行去噪、平滑和异常值处理import numpy as npfrom collections import dequefrom config import DATA_FILTER_WINDOW, DEBUG_MODEfrom utils.time_utils import moving_average, median_filterclass DataFilter:多阶数据滤波器结合滑动平均、中值滤波和卡尔曼滤波的优点提供稳定可靠的湿度数据输出def __init__(self, window_size: int None):初始化数据滤波器window_size: 滤波窗口大小self.window_size window_size or DATA_FILTER_WINDOWself.humidity_buffer deque(maxlenself.window_size)self.temperature_buffer deque(maxlenself.window_size)self.filtered_humidity Noneself.filtered_temperature Noneself.kalman_gain 0.3 # 卡尔曼增益self.estimated_humidity Noneprint(f✅ 数据滤波器初始化成功)print(f 滤波窗口: {self.window_size}个采样点)def add_reading(self, humidity: float, temperature: float None):添加新读数到滤波器humidity: 湿度值temperature: 温度值if humidity is not None:self.humidity_buffer.append(humidity)if temperature is not None:self.temperature_buffer.append(temperature)def get_filtered_humidity(self) - float:获取滤波后的湿度值使用三级滤波中值滤波→滑动平均→卡尔曼平滑return: 滤波后的湿度值if len(self.humidity_buffer) 3:return self.humidity_buffer[-1] if self.humidity_buffer else None# 第一级中值滤波去除异常值raw_data list(self.humidity_buffer)median_filtered median_filter(raw_data, kernel_size3)# 第二级滑动平均平滑average_filtered moving_average(median_filtered, window_sizemin(3, len(median_filtered)))# 第三级卡尔曼滤波进一步平滑filtered_value self._kalman_filter(average_filtered[-1] if average_filtered else median_filtered[-1])self.filtered_humidity round(filtered_value, 1)return self.filtered_humiditydef _kalman_filter(self, measurement: float) - float:简化的卡尔曼滤波器measurement: 测量值return: 滤波估计值if self.estimated_humidity is None:self.estimated_humidity measurementreturn measurement# 预测步骤假设过程噪声很小predicted self.estimated_humidity# 更新步骤kalman_gain self.kalman_gainself.estimated_humidity predicted kalman_gain * (measurement - predicted)# 动态调整卡尔曼增益if abs(measurement - predicted) 5: # 测量突变kalman_gain min(0.8, kalman_gain 0.1)else:kalman_gain max(0.1, kalman_gain * 0.95)self.kalman_gain kalman_gainreturn self.estimated_humiditydef get_filtered_temperature(self) - float:获取滤波后的温度值if len(self.temperature_buffer) 3:return self.temperature_buffer[-1] if self.temperature_buffer else Noneraw_data list(self.temperature_buffer)median_filtered median_filter(raw_data, kernel_size3)average_filtered moving_average(median_filtered, window_sizemin(3, len(median_filtered)))self.filtered_temperature round(average_filtered[-1], 1)return self.filtered_temperaturedef detect_anomaly(self, current_value: float, threshold: float 10.0) - bool:检测数据异常current_value: 当前值threshold: 异常阈值与历史均值的偏差return: 是否异常if len(self.humidity_buffer) 5:return Falsehistorical_mean np.mean(list(self.humidity_buffer)[:-1]) # 排除当前值deviation abs(current_value - historical_mean)return deviation thresholddef get_trend(self) - str:分析湿度变化趋势return: 趋势描述 (rising, falling, stable)if len(self.humidity_buffer) 5:return insufficient_datadata list(self.humidity_buffer)# 计算最近几个点的斜率recent_points data[-5:]x np.arange(len(recent_points))slope, _ np.polyfit(x, recent_points, 1)if slope 0.5:return risingelif slope -0.5:return fallingelse:return stabledef get_volatility(self) - float:计算数据波动性return: 变异系数标准差/均值if len(self.humidity_buffer) 3:return 0.0data list(self.humidity_buffer)mean_val np.mean(data)if mean_val 0:return 0.0std_val np.std(data)coefficient_of_variation std_val / mean_valreturn round(coefficient_of_variation, 3)def reset(self):重置滤波器状态self.humidity_buffer.clear()self.temperature_buffer.clear()self.filtered_humidity Noneself.filtered_temperature Noneself.estimated_humidity Noneself.kalman_gain 0.3def get_filter_status(self) - dict:获取滤波器状态return {window_size: self.window_size,利用AI解决实际问题如果你觉得这个工具好用欢迎关注长安牧笛
编写程序实现智能快递柜湿度检测,湿度过高,提示“防潮”,保护包裹内物品。
智能快递柜湿度检测与防潮预警系统一、实际应用场景描述随着电商行业的蓬勃发展智能快递柜已成为城市末端配送的核心基础设施。这些柜体通常部署在小区、写字楼、地铁站等公共场所24小时不间断服务。然而现实环境中快递柜面临着严峻的防潮挑战雨季空气湿度飙升、地下室通风不良、沿海地区盐雾侵蚀、冬季温差导致的冷凝水等问题时刻威胁着包裹内物品的安全。想象这样一个场景李女士网购了一台昂贵的单反相机快递员将其存入小区智能快递柜。恰逢连日阴雨柜内湿度悄然攀升至85%。第二天李女士取件时发现相机的包装盒已经受潮变形内部电路板出现霉斑最终造成数千元的损失。类似的情况在全国各地的快递柜中每天都在发生——电子产品短路、纸质文件发霉、药品失效、衣物异味...本系统专门针对这一痛点通过在智能快递柜内部署高精度湿度传感器实时监控柜内环境当湿度超过安全阈值时立即触发多级防潮预警并通过LED指示灯、蜂鸣器和云端推送通知管理人员及时处理最大程度保护包裹内物品免受湿气侵害。二、引入痛点痛点类型 具体表现 潜在损失 本方案解决方式环境湿度不可控 雨季/梅雨季柜内湿度可达90% 电子产品损坏、文档发霉 实时监测阈值预警冷凝水隐患 昼夜温差大时内壁结露滴落 包裹湿损、细菌滋生 温湿度联动分析冷凝预警通风死角 密闭空间水汽难以扩散 长期存放物品变质 通风建议除湿提醒预警滞后 依赖人工巡检发现潮湿 批量包裹受损 自动检测即时报警无法追溯 不知何时湿度超标 责任界定困难 数据记录历史查询维护被动 故障后才发现问题 运营成本高 预防性维护提醒三、核心逻辑讲解graph TDA[系统初始化] -- B[传感器校准与环境检测]B -- C[加载湿度阈值配置]C -- D[启动数据采集线程]D -- E[进入主循环监控]E -- F{读取当前湿度值}F -- G[数据滤波处理]G -- H{湿度警戒阈值?}H -- 是 -- I[触发一级防潮预警]I -- J[LED黄色闪烁蜂鸣器间歇鸣叫]J -- K[记录湿度超标事件]K -- L{湿度危险阈值?}L -- 是 -- M[触发二级紧急预警]M -- N[LED红色快闪蜂鸣器连续报警]N -- O[发送云端告警通知]O -- P[启动强制通风建议]L -- 否 -- Q[维持一级预警状态]H -- 否 -- R{湿度正常范围?}R -- 是 -- S[LED绿色常亮系统正常]R -- 否 -- T[更新湿度趋势分析]S -- U{检查历史数据?}U -- 是 -- V[生成湿度分析报告]U -- 否 -- ET -- EQ -- EV -- E关键逻辑点1. 多重阈值策略设置警戒阈值70%和危险阈值85%分级响应不同严重程度2. 数据滤波算法采用滑动平均中值滤波组合消除传感器瞬时波动干扰3. 温湿度联动结合温度变化预测冷凝风险提前预警结露可能性4. 趋势分析通过历史数据识别湿度上升模式预测未来风险时段5. 多级通知本地声光提示云端推送运维平台告警确保信息触达四、代码模块化实现项目结构smart_delivery_locker_humidity/├── main.py # 主程序入口├── config.py # 配置文件├── sensors/ # 传感器模块│ ├── __init__.py│ └── humidity_sensor.py # DHT22湿度传感器├── processing/ # 数据处理模块│ ├── __init__.py│ └── data_filter.py # 数据滤波与校准├── alerts/ # 预警模块│ ├── __init__.py│ ├── local_alerter.py # 本地声光报警│ └── cloud_notifier.py # 云端通知推送├── storage/ # 数据存储模块│ ├── __init__.py│ └── humidity_logger.py # 湿度数据记录├── analysis/ # 数据分析模块│ ├── __init__.py│ └── trend_analyzer.py # 湿度趋势分析├── utils/ # 工具函数│ ├── __init__.py│ └── time_utils.py # 时间工具└── web/ # Web管理界面(可选)├── __init__.py└── dashboard.py # 监控仪表板1. 配置文件 (config.py)智能快递柜湿度检测系统配置文件包含传感器参数、预警阈值、设备配置等# 系统基本信息SYSTEM_NAME SmartLockerHumidityMonitorSYSTEM_VERSION 1.0.0DEVICE_ID LOCKER_HUMIDITY_001DEVICE_NAME A区3号快递柜# 湿度传感器配置HUMIDITY_SENSOR_TYPE DHT22 # 支持DHT11/DHT22/SHT30DHT_PIN 4 # DHT传感器数据引脚SHT30_I2C_ADDR 0x44 # SHT30 I2C地址SENSOR_READ_INTERVAL 30 # 传感器读取间隔秒DATA_FILTER_WINDOW 5 # 数据滤波窗口大小# 湿度阈值配置HUMIDITY_NORMAL_MAX 60.0 # 正常湿度上限%HUMIDITY_WARNING_THRESHOLD 70.0 # 警戒阈值%HUMIDITY_DANGER_THRESHOLD 85.0 # 危险阈值%HUMIDITY_CRITICAL_THRESHOLD 95.0 # 临界阈值%# 温度联动配置用于冷凝风险预测TEMPERATURE_COLD_THRESHOLD 5.0 # 低温阈值℃TEMPERATURE_HOT_THRESHOLD 35.0 # 高温阈值℃CONDENSATION_RISK_THRESHOLD 10.0 # 温差冷凝风险℃# 预警配置ENABLE_LOCAL_ALERTS True # 启用本地声光预警ENABLE_CLOUD_ALERTS True # 启用云端通知ENABLE_SMS_ALERTS False # 启用短信通知WARNING_LED_PIN 17 # 预警LED引脚WARNING_BUZZER_PIN 27 # 预警蜂鸣器引脚LED_YELLOW True # 警戒状态LED颜色LED_RED True # 危险状态LED颜色BUZZER_WARNING_FREQ 800 # 警戒蜂鸣频率HzBUZZER_DANGER_FREQ 1200 # 危险蜂鸣频率Hz# 通风控制配置VENTILATION_RECOMMENDATION_ENABLED True # 启用通风建议MIN_VENTILATION_INTERVAL 3600 # 最小通风建议间隔秒AUTO_VENTILATION_DURATION 300 # 自动通风时长秒# 数据存储配置HUMIDITY_LOG_FILE humidity_data.csvHUMIDITY_LOG_RETENTION_DAYS 30 # 数据保留天数ENABLE_DATABASE_STORAGE False # 启用数据库存储DATABASE_PATH humidity_data.db# 云端通信配置CLOUD_API_ENDPOINT https://api.smartlocker.com/humidity/alertAPI_KEY your_api_key_hereDEVICE_TOKEN locker_device_token_hereUPLOAD_INTERVAL 300 # 数据上传间隔秒# 维护配置CALIBRATION_INTERVAL 86400 # 传感器校准间隔秒- 24小时MAINTENANCE_CHECK_INTERVAL 3600 # 维护检查间隔秒SENSOR_ERROR_THRESHOLD 5 # 连续错误次数阈值# 环境参数CABINET_VOLUME 0.5 # 柜体容积立方米AIR_EXCHANGE_RATE 0.1 # 空气交换率DEHUMIDIFIER_CAPACITY 200 # 除湿机容量g/h# 调试配置DEBUG_MODE TrueLOG_LEVEL DEBUGSIMULATION_MODE False # 模拟模式MOCK_HUMIDITY_BASE 65.0 # 模拟基础湿度MOCK_HUMIDITY_VARIATION 5.0 # 模拟湿度波动范围2. 湿度传感器模块 (sensors/humidity_sensor.py)DHT22高精度湿度传感器模块提供温度和湿度的精确测量支持数据校验import Adafruit_DHTimport timeimport randomfrom config import (HUMIDITY_SENSOR_TYPE, DHT_PIN, SHT30_I2C_ADDR,SENSOR_READ_INTERVAL, SIMULATION_MODE,MOCK_HUMIDITY_BASE, MOCK_HUMIDITY_VARIATION,DEBUG_MODE, CALIBRATION_INTERVAL)from utils.time_utils import get_timestamp, retry_on_exceptionclass HumiditySensor:DHT22数字温湿度传感器驱动提供高精度(±2%RH)的湿度测量和环境温度监测支持数据有效性验证和异常值过滤# DHT22技术参数DHT22_HUMIDITY_ACCURACY ±2.0 # 湿度精度%RHDHT22_HUMIDITY_RANGE (0, 100) # 湿度测量范围DHT22_TEMPERATURE_ACCURACY ±0.5 # 温度精度℃DHT22_MIN_READ_INTERVAL 2.0 # 最小读取间隔秒def __init__(self):初始化湿度传感器self.sensor_type HUMIDITY_SENSOR_TYPEself.pin DHT_PINself.i2c_addr SHT30_I2C_ADDRself.last_read_time 0self.last_valid_reading Noneself.read_count 0self.error_count 0self.calibration_offset 0.0self.needs_calibration Trueself.calibration_time 0if not SIMULATION_MODE:self._initialize_sensor()else:print( 模拟模式湿度传感器已初始化模拟DHT22)print(f✅ 湿度传感器初始化成功)print(f 传感器类型: {self.sensor_type})print(f 数据引脚: GPIO{DHT_PIN})print(f 测量精度: ±2%RH, ±0.5℃)print(f 读取间隔: {SENSOR_READ_INTERVAL}秒)def _initialize_sensor(self):初始化物理传感器try:if self.sensor_type DHT22:# 测试传感器连接test_humidity, test_temp Adafruit_DHT.read_retry(Adafruit_DHT.DHT22, self.pin)if test_humidity is not None:print( DHT22传感器连接正常)else:print(⚠️ DHT22传感器连接异常请检查接线)elif self.sensor_type SHT30:# SHT30 I2C初始化import smbusself.bus smbus.SMBus(1)# 软复位self.bus.write_i2c_block_data(self.i2c_addr, 0x30, [0xA2])time.sleep(0.01)print( SHT30传感器连接正常)except ImportError:print(⚠️ 传感器库未安装切换到模拟模式)global SIMULATION_MODESIMULATION_MODE Trueexcept Exception as e:print(f⚠️ 传感器初始化失败: {e})print( 切换到模拟模式运行)SIMULATION_MODE Trueretry_on_exception(retries3, delay1.0)def read_humidity_temperature(self) - tuple:读取湿度和温度数据return: (humidity, temperature) 元组失败返回(None, None)current_time time.time()# 检查读取间隔if current_time - self.last_read_time self.DHT22_MIN_READ_INTERVAL:if self.last_valid_reading:return self.last_valid_readingself.last_read_time current_timeif SIMULATION_MODE:return self._read_simulated_data()try:if self.sensor_type DHT22:humidity, temperature Adafruit_DHT.read_retry(Adafruit_DHT.DHT22, self.pin, retries3, delay_seconds0.5)elif self.sensor_type SHT30:humidity, temperature self._read_sht30()else:return self._read_simulated_data()# 数据有效性验证if self._validate_reading(humidity, temperature):# 应用校准偏移calibrated_humidity humidity self.calibration_offset# 限制有效范围calibrated_humidity max(0, min(100, calibrated_humidity))if temperature is not None:temperature max(-40, min(80, temperature))reading (round(calibrated_humidity, 1),round(temperature, 1) if temperature else None)self.last_valid_reading readingself.read_count 1if DEBUG_MODE:print(f 传感器读数: 湿度{reading[0]}%, 温度{reading[1]}℃)return readingelse:self.error_count 1if DEBUG_MODE:print(⚠️ 传感器数据无效使用上次有效值)return self.last_valid_readingexcept Exception as e:self.error_count 1if DEBUG_MODE:print(f❌ 传感器读取错误: {e})return self.last_valid_readingdef _read_sht30(self) - tuple:读取SHT30传感器数据import smbus# 触发测量self.bus.write_i2c_block_data(self.i2c_addr, 0x24, [0x00])time.sleep(0.015) # 等待测量完成# 读取6字节数据data self.bus.read_i2c_block_data(self.i2c_addr, 0xE0, 6)# 解析湿度16位单位0.01%RHhumidity_raw (data[3] 8) | data[4]humidity (humidity_raw / 65535.0) * 100.0# 解析温度16位单位0.01℃temp_raw (data[0] 8) | data[1]temperature -45.0 (175.0 * temp_raw / 65535.0)return humidity, temperaturedef _read_simulated_data(self) - tuple:生成模拟湿度数据用于测试base_humidity MOCK_HUMIDITY_BASEvariation MOCK_HUMIDITY_VARIATION# 添加时间相关的变化模式hour time.localtime().tm_hourif 6 hour 18: # 白天湿度略低base_humidity - 5else: # 夜间湿度略高base_humidity 3# 添加随机波动humidity base_humidity random.uniform(-variation, variation)temperature 20.0 random.uniform(-3, 3)# 模拟偶发异常值if random.random() 0.02: # 2%概率产生异常humidity random.choice([None, 105.0, -5.0])# 数据有效性验证if humidity is not None and 0 humidity 100:reading (round(humidity, 1), round(temperature, 1))self.last_valid_reading readingreturn readingelse:return self.last_valid_readingdef _validate_reading(self, humidity: float, temperature: float) - bool:验证传感器数据有效性humidity: 湿度值temperature: 温度值return: 数据是否有效# 检查None值if humidity is None:return False# 检查湿度范围DHT22理论范围0-100%if not (0 humidity 100):if DEBUG_MODE:print(f 湿度超出范围: {humidity}%)return False# 检查温度范围DHT22理论范围-40~80℃if temperature is not None and not (-40 temperature 80):if DEBUG_MODE:print(f 温度超出范围: {temperature}℃)return False# 检查物理合理性高温高湿的极端情况if temperature is not None and temperature 35 and humidity 90:if DEBUG_MODE:print(f 物理上不太可能的组合: {temperature}℃, {humidity}%)return False# 检查突变与前次读数差异过大if self.last_valid_reading:last_humidity self.last_valid_reading[0]if abs(humidity - last_humidity) 15: # 15%突变阈值if DEBUG_MODE:print(f 湿度突变过大: {last_humidity}% → {humidity}%)# 可能是传感器故障但仍返回True让其通过滤波处理return Truedef calibrate(self, reference_humidity: float None):传感器校准reference_humidity: 参考湿度值如使用专业仪器测得print( 开始传感器校准...)if reference_humidity is None:# 自动校准连续读取多次取平均作为基准readings []print( 采集校准数据...)for i in range(10):result self.read_humidity_temperature()if result[0] is not None:readings.append(result[0])time.sleep(1.0)if len(readings) 5:avg_humidity sum(readings) / len(readings)# 假设环境湿度在50-60%之间计算偏移if 45 avg_humidity 65:self.calibration_offset 55.0 - avg_humidityprint(f 自动校准完成偏移量: {self.calibration_offset:.2f}%)else:print(f ⚠️ 环境湿度可能异常({avg_humidity:.1f}%)使用默认校准)self.calibration_offset 0.0else:print( ⚠️ 校准数据不足使用默认校准)self.calibration_offset 0.0else:# 使用参考值校准current_reading self.read_humidity_temperature()[0]if current_reading:self.calibration_offset reference_humidity - current_readingprint(f 参考校准完成偏移量: {self.calibration_offset:.2f}%)self.calibration_time time.time()self.needs_calibration Falseprint(f✅ 校准完成当前偏移量: {self.calibration_offset:.2f}%)def check_calibration_needed(self):检查是否需要重新校准current_time time.time()if current_time - self.calibration_time CALIBRATION_INTERVAL:self.needs_calibration Trueprint(⏰ 传感器需要重新校准24小时周期)def get_sensor_status(self) - dict:获取传感器状态信息error_rate (self.error_count / max(1, self.read_count)) * 100return {sensor_type: self.sensor_type,is_simulated: SIMULATION_MODE,read_count: self.read_count,error_count: self.error_count,error_rate: round(error_rate, 2),calibration_offset: round(self.calibration_offset, 2),needs_calibration: self.needs_calibration,last_read_time: get_timestamp(),last_valid_reading: self.last_valid_reading}def cleanup(self):清理传感器资源if not SIMULATION_MODE and self.sensor_type SHT30:try:self.bus.close()except:passprint( 湿度传感器资源已清理)3. 数据滤波模块 (processing/data_filter.py)数据滤波与校准模块对原始传感器数据进行去噪、平滑和异常值处理import numpy as npfrom collections import dequefrom config import DATA_FILTER_WINDOW, DEBUG_MODEfrom utils.time_utils import moving_average, median_filterclass DataFilter:多阶数据滤波器结合滑动平均、中值滤波和卡尔曼滤波的优点提供稳定可靠的湿度数据输出def __init__(self, window_size: int None):初始化数据滤波器window_size: 滤波窗口大小self.window_size window_size or DATA_FILTER_WINDOWself.humidity_buffer deque(maxlenself.window_size)self.temperature_buffer deque(maxlenself.window_size)self.filtered_humidity Noneself.filtered_temperature Noneself.kalman_gain 0.3 # 卡尔曼增益self.estimated_humidity Noneprint(f✅ 数据滤波器初始化成功)print(f 滤波窗口: {self.window_size}个采样点)def add_reading(self, humidity: float, temperature: float None):添加新读数到滤波器humidity: 湿度值temperature: 温度值if humidity is not None:self.humidity_buffer.append(humidity)if temperature is not None:self.temperature_buffer.append(temperature)def get_filtered_humidity(self) - float:获取滤波后的湿度值使用三级滤波中值滤波→滑动平均→卡尔曼平滑return: 滤波后的湿度值if len(self.humidity_buffer) 3:return self.humidity_buffer[-1] if self.humidity_buffer else None# 第一级中值滤波去除异常值raw_data list(self.humidity_buffer)median_filtered median_filter(raw_data, kernel_size3)# 第二级滑动平均平滑average_filtered moving_average(median_filtered, window_sizemin(3, len(median_filtered)))# 第三级卡尔曼滤波进一步平滑filtered_value self._kalman_filter(average_filtered[-1] if average_filtered else median_filtered[-1])self.filtered_humidity round(filtered_value, 1)return self.filtered_humiditydef _kalman_filter(self, measurement: float) - float:简化的卡尔曼滤波器measurement: 测量值return: 滤波估计值if self.estimated_humidity is None:self.estimated_humidity measurementreturn measurement# 预测步骤假设过程噪声很小predicted self.estimated_humidity# 更新步骤kalman_gain self.kalman_gainself.estimated_humidity predicted kalman_gain * (measurement - predicted)# 动态调整卡尔曼增益if abs(measurement - predicted) 5: # 测量突变kalman_gain min(0.8, kalman_gain 0.1)else:kalman_gain max(0.1, kalman_gain * 0.95)self.kalman_gain kalman_gainreturn self.estimated_humiditydef get_filtered_temperature(self) - float:获取滤波后的温度值if len(self.temperature_buffer) 3:return self.temperature_buffer[-1] if self.temperature_buffer else Noneraw_data list(self.temperature_buffer)median_filtered median_filter(raw_data, kernel_size3)average_filtered moving_average(median_filtered, window_sizemin(3, len(median_filtered)))self.filtered_temperature round(average_filtered[-1], 1)return self.filtered_temperaturedef detect_anomaly(self, current_value: float, threshold: float 10.0) - bool:检测数据异常current_value: 当前值threshold: 异常阈值与历史均值的偏差return: 是否异常if len(self.humidity_buffer) 5:return Falsehistorical_mean np.mean(list(self.humidity_buffer)[:-1]) # 排除当前值deviation abs(current_value - historical_mean)return deviation thresholddef get_trend(self) - str:分析湿度变化趋势return: 趋势描述 (rising, falling, stable)if len(self.humidity_buffer) 5:return insufficient_datadata list(self.humidity_buffer)# 计算最近几个点的斜率recent_points data[-5:]x np.arange(len(recent_points))slope, _ np.polyfit(x, recent_points, 1)if slope 0.5:return risingelif slope -0.5:return fallingelse:return stabledef get_volatility(self) - float:计算数据波动性return: 变异系数标准差/均值if len(self.humidity_buffer) 3:return 0.0data list(self.humidity_buffer)mean_val np.mean(data)if mean_val 0:return 0.0std_val np.std(data)coefficient_of_variation std_val / mean_valreturn round(coefficient_of_variation, 3)def reset(self):重置滤波器状态self.humidity_buffer.clear()self.temperature_buffer.clear()self.filtered_humidity Noneself.filtered_temperature Noneself.estimated_humidity Noneself.kalman_gain 0.3def get_filter_status(self) - dict:获取滤波器状态return {window_size: self.window_size,利用AI解决实际问题如果你觉得这个工具好用欢迎关注长安牧笛