本文将结合SkeyeVSS开源项目代码详细讲解设备 / 通道状态如何获取与设置。项目源码地址https://github.com/openskeye/go-vss1. 角色划分层级含义特点DBdevices.online、channels.online业务与列表的数据来源由 RPCDeviceUpsert/DeviceUpdate/ChannelUpdate/ChannelUpsert写入ServiceContext.DeviceOnlineState所有设备在线状态集合由FetchDataLogic.deviceOnlineState周期性拉 RPCOnlineState填充SSE 读取这一份不需要直连 DB前端通过SSEtypedevice_online_state拿到的是数据与 DB 之间最多存在拉取周期默认约 10s的延迟。2. SSE如何「获取」设备 / 通道在线状态2.1 入口与参数路由注册internal/handler/sse/routers.gotype为device_online_state时构造DeviceOnlineStateLogic。请求体SSEDeviceOnlineStatesReqdeviceType1→ 设备2→ 通道。逻辑文件internal/logic/sse/device_online_state.go。2.2 获取DO先do(req)推送一轮数据随后循环ctx取消则退出否则time.After(5*time.Second)再调do即首次立即推送之后约每 5 秒推送一次。do若svcCtx.DeviceOnlineState nil向客户端回ErrdeviceInlineState 为空并带DelayClose结束。deviceType 1推送DeviceOnlineState.Devices。否则推送DeviceOnlineState.Channels。数据结构来自公共类型core/common/types.DeviceOnlineStateResptypeDeviceOnlineStateRespstruct{Channelsmap[string]uintjson:channelsDevicesmap[string]uintjson:devices}DevicesdeviceUniqueId → online0离线1在线。Channels复合键→online键格式在 DB 服务组装为{deviceUniqueId}-{channelUniqueId}。因此SSE 不做计算只把内存里已由拉取任务灌好的两个 map按需设备 / 通道推给订阅端。3. 数据来源FetchDataLogic RPCOnlineState3.1 何时刷新DeviceOnlineState文件internal/logic/proc/fetch_data_proc.go方法deviceOnlineState。启动与设置、媒体服务、ONVIF 发现等并行在初始化路径里会执行第一次deviceOnlineState()。周期主循环里now%100时每 10 秒go l.deviceOnlineState()。成功后将l.svcCtx.DeviceOnlineState res.Data。3.2 DB RPCDB 服务core/app/sev/db/internal/logic/deviceservice/online_state_logic.goOnlineState。DevicesModel.OnlineStateList查设备表仅id、deviceUniqueId、online建成deviceUniqueId → online。ChannelsModel.OnlineStateList查通道表字段含uniqueId、deviceUniqueId、online建成fmt.Sprintf(%s-%s, deviceUniqueId, uniqueId) → online。这里需要注意的是在同一网络下设备iddeviceUniqueId能保证唯一但通道channelUniqueId无法保证唯一。所以这里将设备和通道id拼接组成唯一id4. 设置状态在线状态最终落在devices.online、channels.online及onlineAt、keepaliveAt等辅助字段。按来源可分几类。4.1 国标 GB28181注册直接写设备心跳走聚合队列场景路径说明REGISTERgbs_sip/register.goDeviceUpsert写入设备记录含online、expire等不经过SetDeviceOnline。下线时另发SipCatalogLoop/SipHeartbeatLoop清理定时任务。心跳 Messagegbs_sip/keepalive.goSetDeviceOnline - DCOnlineReq{DeviceUniqueId, Online:true}无通道字段由下游批量刷新DeviceUpdate更新keepaliveAt、onlineAt等见setDeviceOnlineState_loop。注册超时 / 心跳超时gbs_proc/heartbeat_offline_loop.go当now - RegisterExpireAt 10或心跳间隔 ≥Sip.HeartbeatTimeout时SetDeviceOnline离线同设备从 **SipHeartbeatLoopMap移除。目录 Catalog 应答gbs_sip/catalog.go解析StatusON→ 通道online1否则0先按条件把长时间未更新的通道批量置离线再ChannelUpsert同步目录并可能DeviceUpdate更新通道数量。国标设备级在线主要来自注册 upsert与心跳驱动的setDevice经队列通道级在线主要来自Catalog里的目录项状态。4.2SetDeviceOnline聚合SetDeviceOnlineStateLogic文件internal/logic/gbs_proc/set_device_online_state_loop.go。SetDeviceOnlinechannel收到DCOnlineReqDeviceUniqueId、ChannelUniqueId、CId、Online。DeviceOnlineStateUpdateMap.Set(v.DeviceUniqueId, v)—— 以设备 ID 为唯一键覆盖写入。每秒Ticker把 map 内条目分成设备上/下线、通道上/下线四组分别go setDevice/go setChannel然后Clear()map。setDeviceDevice.DeviceUpdate更新devices.online等设备下线时顺带ChannelUpdate将该设备下通道置离线、并发SipCatalogLoop删除 Catalog 任务。setChannel按channels.idCId条件ChannelUpdate更新channels.online。注意同一秒内、同一设备多条通道的DCOnlineReq在 map 里会互相覆盖键只有DeviceUniqueId。流探测是每条通道单独 goroutine 投递若同一秒多通道同时更新理论上可能丢中间状态属于并发与批处理设计的折中排障时可知悉。4.3 主动流探测非国标文件internal/logic/gbs_proc/check_device_online_state_loop.go。周期调用 DB RPCRtspStreamGroups对ONVIF / 流媒体源 / RTMP / HTTP等待遇的StreamUrl做RTSP / RTMP / HTTP探测。每个结果SetDeviceOnline - DCOnlineReq带CId、ChannelUniqueId、DeviceUniqueId、Online进入上节通道维度更新链路。4.4 MS notify如 RTMP推流ONVIF协议internal/logic/http/notify/common.go在推拉流状态变更时ChannelUpdatestreamState、online、onlineAt等不经过DeviceOnlineState但会改变 DB下一轮deviceOnlineState拉取后SSE 客户端才能看到更新。5. 获取状态Database devices/channelsDB OnlineStateFetchDataLogicDeviceOnlineStateVSS DeviceOnlineStateLogic前端 SSEDatabase devices/channelsDB OnlineStateFetchDataLogicDeviceOnlineStateVSS DeviceOnlineStateLogic前端 SSEloop[每 10s及启动]loop[每 5s / 首次]OnlineStateOnlineStateList x2DeviceOnlineStateResp指针替换typedevice_online_statedeviceType1|2读 Devices / ChannelsSSEResponse.Data6. 设置状态国标写 DBREGISTER DeviceUpsertCatalog ChannelUpsertKeepalive SetDeviceOnlineHeartbeatOffline SetDeviceOnlineSetDeviceOnlineStateLogic 每秒批量DeviceUpdateChannelUpdatedevices7. 小结与排障问题建议SSE 一直报deviceInlineState 为空确认FetchDataLogic已跑完首次deviceOnlineState且 RPC 成功检查 DB 与Licensed DB RPC。SSE 与界面列表不一致正常数据10s、SSE5s存在延迟以 DB / 业务列表为准做最终一致。国标设备在线但通道长期不对查Catalog是否上报、Status是否为ON查ChannelUpsert与过滤ChannelFilters。8. 源码索引环节路径SSE 订阅逻辑core/app/sev/vss/internal/logic/sse/device_online_state.goSSE 路由core/app/sev/vss/internal/handler/sse/routers.go内存数据拉取core/app/sev/vss/internal/logic/proc/fetch_data_proc.go→deviceOnlineStateRPC 实现core/app/sev/db/internal/logic/deviceservice/online_state_logic.go响应结构core/common/types/devices.go→DeviceOnlineStateResp在线更新队列core/app/sev/vss/internal/types/types.go→SetDeviceOnline、DCOnlineReq批量写 DBcore/app/sev/vss/internal/logic/gbs_proc/set_device_online_state_loop.go国标注册 / 心跳 / 目录core/app/sev/vss/internal/logic/gbs_sip/register.go、keepalive.go、catalog.go心跳超时下线core/app/sev/vss/internal/logic/gbs_proc/heartbeat_offline_loop.go流探测core/app/sev/vss/internal/logic/gbs_proc/check_device_online_state_loop.goDB 列表查询core/repositories/models/devices/db.go、channels/db.go→OnlineStateList
SkeyeVSS国标视频平台开源项目-设备与通道在线状态详解
本文将结合SkeyeVSS开源项目代码详细讲解设备 / 通道状态如何获取与设置。项目源码地址https://github.com/openskeye/go-vss1. 角色划分层级含义特点DBdevices.online、channels.online业务与列表的数据来源由 RPCDeviceUpsert/DeviceUpdate/ChannelUpdate/ChannelUpsert写入ServiceContext.DeviceOnlineState所有设备在线状态集合由FetchDataLogic.deviceOnlineState周期性拉 RPCOnlineState填充SSE 读取这一份不需要直连 DB前端通过SSEtypedevice_online_state拿到的是数据与 DB 之间最多存在拉取周期默认约 10s的延迟。2. SSE如何「获取」设备 / 通道在线状态2.1 入口与参数路由注册internal/handler/sse/routers.gotype为device_online_state时构造DeviceOnlineStateLogic。请求体SSEDeviceOnlineStatesReqdeviceType1→ 设备2→ 通道。逻辑文件internal/logic/sse/device_online_state.go。2.2 获取DO先do(req)推送一轮数据随后循环ctx取消则退出否则time.After(5*time.Second)再调do即首次立即推送之后约每 5 秒推送一次。do若svcCtx.DeviceOnlineState nil向客户端回ErrdeviceInlineState 为空并带DelayClose结束。deviceType 1推送DeviceOnlineState.Devices。否则推送DeviceOnlineState.Channels。数据结构来自公共类型core/common/types.DeviceOnlineStateResptypeDeviceOnlineStateRespstruct{Channelsmap[string]uintjson:channelsDevicesmap[string]uintjson:devices}DevicesdeviceUniqueId → online0离线1在线。Channels复合键→online键格式在 DB 服务组装为{deviceUniqueId}-{channelUniqueId}。因此SSE 不做计算只把内存里已由拉取任务灌好的两个 map按需设备 / 通道推给订阅端。3. 数据来源FetchDataLogic RPCOnlineState3.1 何时刷新DeviceOnlineState文件internal/logic/proc/fetch_data_proc.go方法deviceOnlineState。启动与设置、媒体服务、ONVIF 发现等并行在初始化路径里会执行第一次deviceOnlineState()。周期主循环里now%100时每 10 秒go l.deviceOnlineState()。成功后将l.svcCtx.DeviceOnlineState res.Data。3.2 DB RPCDB 服务core/app/sev/db/internal/logic/deviceservice/online_state_logic.goOnlineState。DevicesModel.OnlineStateList查设备表仅id、deviceUniqueId、online建成deviceUniqueId → online。ChannelsModel.OnlineStateList查通道表字段含uniqueId、deviceUniqueId、online建成fmt.Sprintf(%s-%s, deviceUniqueId, uniqueId) → online。这里需要注意的是在同一网络下设备iddeviceUniqueId能保证唯一但通道channelUniqueId无法保证唯一。所以这里将设备和通道id拼接组成唯一id4. 设置状态在线状态最终落在devices.online、channels.online及onlineAt、keepaliveAt等辅助字段。按来源可分几类。4.1 国标 GB28181注册直接写设备心跳走聚合队列场景路径说明REGISTERgbs_sip/register.goDeviceUpsert写入设备记录含online、expire等不经过SetDeviceOnline。下线时另发SipCatalogLoop/SipHeartbeatLoop清理定时任务。心跳 Messagegbs_sip/keepalive.goSetDeviceOnline - DCOnlineReq{DeviceUniqueId, Online:true}无通道字段由下游批量刷新DeviceUpdate更新keepaliveAt、onlineAt等见setDeviceOnlineState_loop。注册超时 / 心跳超时gbs_proc/heartbeat_offline_loop.go当now - RegisterExpireAt 10或心跳间隔 ≥Sip.HeartbeatTimeout时SetDeviceOnline离线同设备从 **SipHeartbeatLoopMap移除。目录 Catalog 应答gbs_sip/catalog.go解析StatusON→ 通道online1否则0先按条件把长时间未更新的通道批量置离线再ChannelUpsert同步目录并可能DeviceUpdate更新通道数量。国标设备级在线主要来自注册 upsert与心跳驱动的setDevice经队列通道级在线主要来自Catalog里的目录项状态。4.2SetDeviceOnline聚合SetDeviceOnlineStateLogic文件internal/logic/gbs_proc/set_device_online_state_loop.go。SetDeviceOnlinechannel收到DCOnlineReqDeviceUniqueId、ChannelUniqueId、CId、Online。DeviceOnlineStateUpdateMap.Set(v.DeviceUniqueId, v)—— 以设备 ID 为唯一键覆盖写入。每秒Ticker把 map 内条目分成设备上/下线、通道上/下线四组分别go setDevice/go setChannel然后Clear()map。setDeviceDevice.DeviceUpdate更新devices.online等设备下线时顺带ChannelUpdate将该设备下通道置离线、并发SipCatalogLoop删除 Catalog 任务。setChannel按channels.idCId条件ChannelUpdate更新channels.online。注意同一秒内、同一设备多条通道的DCOnlineReq在 map 里会互相覆盖键只有DeviceUniqueId。流探测是每条通道单独 goroutine 投递若同一秒多通道同时更新理论上可能丢中间状态属于并发与批处理设计的折中排障时可知悉。4.3 主动流探测非国标文件internal/logic/gbs_proc/check_device_online_state_loop.go。周期调用 DB RPCRtspStreamGroups对ONVIF / 流媒体源 / RTMP / HTTP等待遇的StreamUrl做RTSP / RTMP / HTTP探测。每个结果SetDeviceOnline - DCOnlineReq带CId、ChannelUniqueId、DeviceUniqueId、Online进入上节通道维度更新链路。4.4 MS notify如 RTMP推流ONVIF协议internal/logic/http/notify/common.go在推拉流状态变更时ChannelUpdatestreamState、online、onlineAt等不经过DeviceOnlineState但会改变 DB下一轮deviceOnlineState拉取后SSE 客户端才能看到更新。5. 获取状态Database devices/channelsDB OnlineStateFetchDataLogicDeviceOnlineStateVSS DeviceOnlineStateLogic前端 SSEDatabase devices/channelsDB OnlineStateFetchDataLogicDeviceOnlineStateVSS DeviceOnlineStateLogic前端 SSEloop[每 10s及启动]loop[每 5s / 首次]OnlineStateOnlineStateList x2DeviceOnlineStateResp指针替换typedevice_online_statedeviceType1|2读 Devices / ChannelsSSEResponse.Data6. 设置状态国标写 DBREGISTER DeviceUpsertCatalog ChannelUpsertKeepalive SetDeviceOnlineHeartbeatOffline SetDeviceOnlineSetDeviceOnlineStateLogic 每秒批量DeviceUpdateChannelUpdatedevices7. 小结与排障问题建议SSE 一直报deviceInlineState 为空确认FetchDataLogic已跑完首次deviceOnlineState且 RPC 成功检查 DB 与Licensed DB RPC。SSE 与界面列表不一致正常数据10s、SSE5s存在延迟以 DB / 业务列表为准做最终一致。国标设备在线但通道长期不对查Catalog是否上报、Status是否为ON查ChannelUpsert与过滤ChannelFilters。8. 源码索引环节路径SSE 订阅逻辑core/app/sev/vss/internal/logic/sse/device_online_state.goSSE 路由core/app/sev/vss/internal/handler/sse/routers.go内存数据拉取core/app/sev/vss/internal/logic/proc/fetch_data_proc.go→deviceOnlineStateRPC 实现core/app/sev/db/internal/logic/deviceservice/online_state_logic.go响应结构core/common/types/devices.go→DeviceOnlineStateResp在线更新队列core/app/sev/vss/internal/types/types.go→SetDeviceOnline、DCOnlineReq批量写 DBcore/app/sev/vss/internal/logic/gbs_proc/set_device_online_state_loop.go国标注册 / 心跳 / 目录core/app/sev/vss/internal/logic/gbs_sip/register.go、keepalive.go、catalog.go心跳超时下线core/app/sev/vss/internal/logic/gbs_proc/heartbeat_offline_loop.go流探测core/app/sev/vss/internal/logic/gbs_proc/check_device_online_state_loop.goDB 列表查询core/repositories/models/devices/db.go、channels/db.go→OnlineStateList