InfluxDB 数据库迁移与增量数据同步实战一、项目背景在企业级应用中数据库迁移是一项常见的运维任务。本文将详细介绍如何将 InfluxDB 数据库从一台服务器A服务器迁移到另一台服务器B服务器并重点讲解迁移过程中时间段增量数据同步的实现方案。1.1 迁移场景源服务器A服务器:172.16.231.218运行 InfluxDB 服务端口8087目标服务器B服务器:172.16.231.219部署新的 InfluxDB 服务端口8086涉及数据库:objiot物联网设备数据、accdetect加速度检测数据1.2 数据同步需求由于迁移过程中业务系统仍在运行需要同步迁移前后的数据差异同步时间范围2025-05-15 16:00:00 —— 2025-05-18 11:00:00二、技术实现方案2.1 整体架构┌─────────────────┐ 备份 ┌─────────────────┐ │ A服务器 │ ────────────────── │ 备份文件 │ │ (172.16.231.218)│ │ (/mnt/nas/) │ └─────────────────┘ └────────┬────────┘ │ scp ▼ ┌─────────────────┐ 恢复 ┌─────────────────┐ │ B服务器 │ ───────────────── │ 备份文件 │ │ (172.16.231.219)│ └─────────────────┘ └────────┬────────┘ │ │ 增量同步时间段数据 ▼ ┌─────────────────┐ │ C#同步工具 │ │ (InfluxdbTool) │ └─────────────────┘2.2 技术选型技术组件版本用途InfluxDB2.x时序数据库.NET10.0同步工具开发框架InfluxDB.ClientLatestInfluxDB .NET SDKSerilogLatest日志框架三、关键代码解析3.1 数据模型定义3.1.1 DeviceState终端状态[Measurement(DeviceState)]publicclassDeviceState{[Column(IsTimestamptrue)]publicDateTimeCreaTime{get;set;}[Column(IsTagtrue)]publicstringKey{get;set;}[Column(Value)]publicstringValue{get;set;}}关键点说明[Measurement]特性指定 InfluxDB 中的表名[Column(IsTimestamp true)]标记时间戳字段[Column(IsTag true)]标记索引标签字段3.1.2 HGRealtimeData终端实时数据[Measurement(HGRealtimeData)]publicclassHGRealtimeData{[Column(IsTimestamptrue)]publicDateTimeCreaTime{get;set;}[Column(IsTagtrue)]publicstringKey{get;set;}[Column(Value)]publicstringValue{get;set;}}3.2 主程序入口staticvoidMain(string[]args){try{Dijing.SerilogExt.InitLog.SetLog(Dijing.SerilogExt.RunModeEnum.Debug);// 定义同步时间段varbeginTimeDateTime.Parse(2025-05-15 16:00:00);varendTimeDateTime.Parse(2025-05-18 11:00:00);// 初始化源端和目标端配置varinfluxFromOptionInitInfluxdbFromOption();varinfluxToOptionInitInfluxdbToOption();// 同步 DeviceState 数据SyncMeasurementDeviceState(DeviceState,beginTime,endTime,influxFromOption,influxToOption);// 同步 HGRealtimeData 数据SyncMeasurementHGRealtimeData(HGRealtimeData,beginTime,endTime,influxFromOption,influxToOption);}catch(Exceptionex){Log.Error(ex,数据迁移失败);}}3.3 InfluxDB 配置初始化privatestaticInfluxOptionInitInfluxdbFromOption(){returnnewInfluxOption{Urlhttp://SOURCE_SERVER:8086,// 源服务器地址TokenYOUR_API_TOKEN,Bucketobjiot,Orgruiyun};}privatestaticInfluxOptionInitInfluxdbToOption(){returnnewInfluxOption{Urlhttp://TARGET_SERVER:8086,// 目标服务器地址TokenYOUR_API_TOKEN,Bucketobjiot,Orgruiyun};}3.4 增量同步核心逻辑// 获取指定时间段内的所有设备KeyprivatestaticListstringGetInlfuxdbKeys(stringmeasurement,DateTimebeginTime,DateTimeendTime,InfluxOptioninfluxOption){varkeysDijing.InfluxdbExt.Influxdb2Helper.GetKeys(measurement,beginTime,endTime,influxOption,1);returnkeys??newListstring();}// 查询指定Key的DeviceState数据privatestaticListDeviceStateGetDeviceStateList(stringkey,DateTimebeginTime,DateTimeendTime,InfluxOptioninfluxOption){returnDijing.InfluxdbExt.Influxdb2Helper.GetAllDeviceState(key,beginTime,endTime,influxOption);}// 批量插入DeviceState数据privatestaticvoidBatchInsertDeviceState(ListDeviceStatelist,InfluxOptioninfluxOption){Dijing.InfluxdbExt.Influxdb2Helper.WritesDeviceState(list,influxOption);}四、操作步骤4.1 数据库备份A服务器# 备份 objiot 数据库influx backup /mnt/nas--bucketobjiot-tYOUR_API_TOKEN# 备份 accdetect 数据库influx backup /mnt/nas/accalarm--bucketaccdetect-tYOUR_API_TOKEN4.2 传输备份文件scp-r/mnt/nas/influxdbbackup20260515/* rootTARGET_SERVER_IP:/mnt/nas/influxdbbackup20260515/scp-r/mnt/nas/accalarm/* rootTARGET_SERVER_IP:/mnt/nas/accalarm/4.3 数据库恢复B服务器# 恢复 objiot 数据库influx restore--bucketobjiot --new-bucket objiot--orgruiyun /mnt/nas/influxdbbackup20260515-tYOUR_API_TOKEN# 恢复 accdetect 数据库influx restore--bucketaccdetect --new-bucket accdetect--orgruiyun /mnt/nas/accalarm/-tYOUR_API_TOKEN4.4 增量数据同步# 配置端口转发将远程端口映射到本地ssh-L8087:127.0.0.1:8086 root172.16.231.218ssh-L8086:127.0.0.1:8086 root172.16.231.219# 运行同步工具dotnet run--projectInfluxdbTool.csproj五、服务健康检测# 检测 A 服务器curl-vhttp://172.16.231.218:8086/ping# 检测 B 服务器curl-vhttp://172.16.231.219:8086/ping正常响应HTTP/1.1 204 No Content Content-Type: application/json六、注意事项6.1 数据一致性停机窗口在增量同步完成前避免写入新数据到源数据库时间精度确保源端和目标端的系统时间一致建议使用 NTP 同步重复数据增量同步可能产生重复数据建议在应用层做去重处理6.2 网络安全Token 管理API Token 具有管理员权限需妥善保管传输加密建议使用 HTTPS 或 SSH 隧道进行数据传输访问控制限制 InfluxDB 服务只允许内网访问6.3 性能优化批量操作使用批量写入 API 提高写入效率时间段分片对于超大数据量建议按时间段分片同步索引优化在目标库创建必要的标签索引七、实际应用效果7.1 迁移结果指标数值迁移数据库数量2 个同步数据时间段~69 小时DeviceState 记录数~50,000 条HGRealtimeData 记录数~120,000 条同步成功率100%7.2 日志示例[INF] 开始获取InfluxDB DeviceState keys... [INF] 获取DeviceState keys完成共获取到 156 个key [INF] 开始查询DeviceStatekeydevice_001 [INF] 查询完成keydevice_001共获取到 328 条数据 [INF] 开始批量插入DeviceState数据数量328 [INF] 批量插入DeviceState数据完成 ... [INF] key总数: 156附录配置参数汇总参数值A 服务器地址SOURCE_SERVER_IPB 服务器地址TARGET_SERVER_IP组织名称orgnameAPI TokenYOUR_API_TOKEN数据库名称objiot,accdetect总结本文详细介绍了 InfluxDB 数据库迁移的完整流程重点讲解了基于 C# 实现的时间段增量数据同步方案。通过合理的备份策略和增量同步机制可以在保证数据一致性的前提下实现零停机或最小停机时间的数据库迁移。
InfluxDB 数据库迁移与增量数据同步实战
InfluxDB 数据库迁移与增量数据同步实战一、项目背景在企业级应用中数据库迁移是一项常见的运维任务。本文将详细介绍如何将 InfluxDB 数据库从一台服务器A服务器迁移到另一台服务器B服务器并重点讲解迁移过程中时间段增量数据同步的实现方案。1.1 迁移场景源服务器A服务器:172.16.231.218运行 InfluxDB 服务端口8087目标服务器B服务器:172.16.231.219部署新的 InfluxDB 服务端口8086涉及数据库:objiot物联网设备数据、accdetect加速度检测数据1.2 数据同步需求由于迁移过程中业务系统仍在运行需要同步迁移前后的数据差异同步时间范围2025-05-15 16:00:00 —— 2025-05-18 11:00:00二、技术实现方案2.1 整体架构┌─────────────────┐ 备份 ┌─────────────────┐ │ A服务器 │ ────────────────── │ 备份文件 │ │ (172.16.231.218)│ │ (/mnt/nas/) │ └─────────────────┘ └────────┬────────┘ │ scp ▼ ┌─────────────────┐ 恢复 ┌─────────────────┐ │ B服务器 │ ───────────────── │ 备份文件 │ │ (172.16.231.219)│ └─────────────────┘ └────────┬────────┘ │ │ 增量同步时间段数据 ▼ ┌─────────────────┐ │ C#同步工具 │ │ (InfluxdbTool) │ └─────────────────┘2.2 技术选型技术组件版本用途InfluxDB2.x时序数据库.NET10.0同步工具开发框架InfluxDB.ClientLatestInfluxDB .NET SDKSerilogLatest日志框架三、关键代码解析3.1 数据模型定义3.1.1 DeviceState终端状态[Measurement(DeviceState)]publicclassDeviceState{[Column(IsTimestamptrue)]publicDateTimeCreaTime{get;set;}[Column(IsTagtrue)]publicstringKey{get;set;}[Column(Value)]publicstringValue{get;set;}}关键点说明[Measurement]特性指定 InfluxDB 中的表名[Column(IsTimestamp true)]标记时间戳字段[Column(IsTag true)]标记索引标签字段3.1.2 HGRealtimeData终端实时数据[Measurement(HGRealtimeData)]publicclassHGRealtimeData{[Column(IsTimestamptrue)]publicDateTimeCreaTime{get;set;}[Column(IsTagtrue)]publicstringKey{get;set;}[Column(Value)]publicstringValue{get;set;}}3.2 主程序入口staticvoidMain(string[]args){try{Dijing.SerilogExt.InitLog.SetLog(Dijing.SerilogExt.RunModeEnum.Debug);// 定义同步时间段varbeginTimeDateTime.Parse(2025-05-15 16:00:00);varendTimeDateTime.Parse(2025-05-18 11:00:00);// 初始化源端和目标端配置varinfluxFromOptionInitInfluxdbFromOption();varinfluxToOptionInitInfluxdbToOption();// 同步 DeviceState 数据SyncMeasurementDeviceState(DeviceState,beginTime,endTime,influxFromOption,influxToOption);// 同步 HGRealtimeData 数据SyncMeasurementHGRealtimeData(HGRealtimeData,beginTime,endTime,influxFromOption,influxToOption);}catch(Exceptionex){Log.Error(ex,数据迁移失败);}}3.3 InfluxDB 配置初始化privatestaticInfluxOptionInitInfluxdbFromOption(){returnnewInfluxOption{Urlhttp://SOURCE_SERVER:8086,// 源服务器地址TokenYOUR_API_TOKEN,Bucketobjiot,Orgruiyun};}privatestaticInfluxOptionInitInfluxdbToOption(){returnnewInfluxOption{Urlhttp://TARGET_SERVER:8086,// 目标服务器地址TokenYOUR_API_TOKEN,Bucketobjiot,Orgruiyun};}3.4 增量同步核心逻辑// 获取指定时间段内的所有设备KeyprivatestaticListstringGetInlfuxdbKeys(stringmeasurement,DateTimebeginTime,DateTimeendTime,InfluxOptioninfluxOption){varkeysDijing.InfluxdbExt.Influxdb2Helper.GetKeys(measurement,beginTime,endTime,influxOption,1);returnkeys??newListstring();}// 查询指定Key的DeviceState数据privatestaticListDeviceStateGetDeviceStateList(stringkey,DateTimebeginTime,DateTimeendTime,InfluxOptioninfluxOption){returnDijing.InfluxdbExt.Influxdb2Helper.GetAllDeviceState(key,beginTime,endTime,influxOption);}// 批量插入DeviceState数据privatestaticvoidBatchInsertDeviceState(ListDeviceStatelist,InfluxOptioninfluxOption){Dijing.InfluxdbExt.Influxdb2Helper.WritesDeviceState(list,influxOption);}四、操作步骤4.1 数据库备份A服务器# 备份 objiot 数据库influx backup /mnt/nas--bucketobjiot-tYOUR_API_TOKEN# 备份 accdetect 数据库influx backup /mnt/nas/accalarm--bucketaccdetect-tYOUR_API_TOKEN4.2 传输备份文件scp-r/mnt/nas/influxdbbackup20260515/* rootTARGET_SERVER_IP:/mnt/nas/influxdbbackup20260515/scp-r/mnt/nas/accalarm/* rootTARGET_SERVER_IP:/mnt/nas/accalarm/4.3 数据库恢复B服务器# 恢复 objiot 数据库influx restore--bucketobjiot --new-bucket objiot--orgruiyun /mnt/nas/influxdbbackup20260515-tYOUR_API_TOKEN# 恢复 accdetect 数据库influx restore--bucketaccdetect --new-bucket accdetect--orgruiyun /mnt/nas/accalarm/-tYOUR_API_TOKEN4.4 增量数据同步# 配置端口转发将远程端口映射到本地ssh-L8087:127.0.0.1:8086 root172.16.231.218ssh-L8086:127.0.0.1:8086 root172.16.231.219# 运行同步工具dotnet run--projectInfluxdbTool.csproj五、服务健康检测# 检测 A 服务器curl-vhttp://172.16.231.218:8086/ping# 检测 B 服务器curl-vhttp://172.16.231.219:8086/ping正常响应HTTP/1.1 204 No Content Content-Type: application/json六、注意事项6.1 数据一致性停机窗口在增量同步完成前避免写入新数据到源数据库时间精度确保源端和目标端的系统时间一致建议使用 NTP 同步重复数据增量同步可能产生重复数据建议在应用层做去重处理6.2 网络安全Token 管理API Token 具有管理员权限需妥善保管传输加密建议使用 HTTPS 或 SSH 隧道进行数据传输访问控制限制 InfluxDB 服务只允许内网访问6.3 性能优化批量操作使用批量写入 API 提高写入效率时间段分片对于超大数据量建议按时间段分片同步索引优化在目标库创建必要的标签索引七、实际应用效果7.1 迁移结果指标数值迁移数据库数量2 个同步数据时间段~69 小时DeviceState 记录数~50,000 条HGRealtimeData 记录数~120,000 条同步成功率100%7.2 日志示例[INF] 开始获取InfluxDB DeviceState keys... [INF] 获取DeviceState keys完成共获取到 156 个key [INF] 开始查询DeviceStatekeydevice_001 [INF] 查询完成keydevice_001共获取到 328 条数据 [INF] 开始批量插入DeviceState数据数量328 [INF] 批量插入DeviceState数据完成 ... [INF] key总数: 156附录配置参数汇总参数值A 服务器地址SOURCE_SERVER_IPB 服务器地址TARGET_SERVER_IP组织名称orgnameAPI TokenYOUR_API_TOKEN数据库名称objiot,accdetect总结本文详细介绍了 InfluxDB 数据库迁移的完整流程重点讲解了基于 C# 实现的时间段增量数据同步方案。通过合理的备份策略和增量同步机制可以在保证数据一致性的前提下实现零停机或最小停机时间的数据库迁移。