目录摘要一、时间序列引擎概述1.1 什么是时间序列引擎1.2 时间序列引擎特点1.3 适用场景二、创建时间序列引擎2.1 基本语法2.2 创建简单引擎2.3 创建分组引擎三、窗口类型3.1 固定窗口3.2 滑动窗口3.3 时间对齐四、聚合指标4.1 基本聚合4.2 百分位聚合4.3 自定义聚合4.4 多列聚合五、引擎管理5.1 查看引擎状态5.2 删除引擎5.3 引擎监控六、实战案例6.1 设备实时监控6.2 实时告警系统6.3 数据降采样七、性能优化7.1 窗口大小选择7.2 分组数量7.3 内存管理八、总结参考资料摘要本文深入讲解DolphinDB时间序列引擎。从引擎原理到创建配置从窗口聚合到滑动计算从多设备分组到实时输出全面介绍时间序列引擎的核心功能。通过丰富的代码示例帮助读者掌握实时聚合计算的核心技能。一、时间序列引擎概述1.1 什么是时间序列引擎时间序列引擎是DolphinDB内置的流计算引擎用于实时时间窗口聚合时间序列引擎数据流时间窗口窗口聚合实时输出核心功能固定窗口滑动窗口会话窗口1.2 时间序列引擎特点特点说明实时聚合毫秒级延迟窗口计算支持多种窗口分组聚合支持多设备分组自动触发窗口结束自动输出1.3 适用场景场景说明实时监控设备指标实时统计告警检测实时阈值告警趋势分析实时趋势计算数据降采样高频数据降采样二、创建时间序列引擎2.1 基本语法//创建时间序列引擎 aggcreateTimeSeriesEngine(engine_name,//引擎名称 windowSize,//窗口大小毫秒 metrics,//聚合指标 outputTable,//输出表 timeColumn,//时间列[keyColumn],//分组列可选[garbageSize],//垃圾回收阈值[roundTime]//时间对齐方式)2.2 创建简单引擎//创建输入流表 share streamTable(1:0,timestamptemperature,[TIMESTAMP,DOUBLE])asinput_stream//创建输出表 share table(1:0,timestampavg_tempmax_tempmin_tempcnt,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asoutput_table//创建时间序列引擎 aggcreateTimeSeriesEngine(ts_engine,60000,//60秒窗口[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(temperature)ascnt],output_table,timestamp)//订阅流表 subscribeTable(,input_stream,ts_agg,-1,agg,true)2.3 创建分组引擎//创建输入流表 share streamTable(1:0,device_idtimestamptemperature,[INT,TIMESTAMP,DOUBLE])asinput_stream//创建输出表 share table(1:0,device_idtimestampavg_tempmax_tempmin_tempcnt,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asoutput_table//创建分组时间序列引擎 aggcreateTimeSeriesEngine(grouped_ts_engine,60000,[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(temperature)ascnt],output_table,timestamp,device_id)//订阅流表 subscribeTable(,input_stream,grouped_ts_agg,-1,agg,true)三、窗口类型3.1 固定窗口//固定窗口窗口大小固定不重叠//例如每60秒一个窗口 aggcreateTimeSeriesEngine(fixed_window,60000,[avg(temperature)asavg_temp,count(*)ascnt],output_table,timestamp,device_id)//窗口划分//[00:00:00,00:01:00)-窗口1//[00:01:00,00:02:00)-窗口2//[00:02:00,00:03:00)-窗口33.2 滑动窗口//滑动窗口窗口按步长滑动//使用createTimeSeriesEngine的step参数 aggcreateTimeSeriesEngine(sliding_window,60000,//窗口大小60秒[avg(temperature)asavg_temp],output_table,timestamp,device_id,,,,10000)//step10秒//窗口划分每10秒输出一次//[00:00:00,00:01:00)-00:01:00输出//[00:00:10,00:01:10)-00:01:10输出//[00:00:20,00:01:20)-00:01:20输出3.3 时间对齐//时间对齐窗口边界对齐到整点时间 aggcreateTimeSeriesEngine(aligned_window,60000,[avg(temperature)asavg_temp],output_table,timestamp,device_id,,,true)//roundTimetrue时间对齐//对齐效果//数据时间00:00:30-窗口[00:00:00,00:01:00)//数据时间00:01:15-窗口[00:01:00,00:02:00)四、聚合指标4.1 基本聚合//基本聚合函数 aggcreateTimeSeriesEngine(basic_agg,60000,[avg(temperature)asavg_temp,//平均值sum(temperature)assum_temp,//总和max(temperature)asmax_temp,//最大值min(temperature)asmin_temp,//最小值 count(temperature)ascnt,//计数 std(temperature)asstd_temp,//标准差 var(temperature)asvar_temp],//方差 output_table,timestamp,device_id)4.2 百分位聚合//百分位聚合 aggcreateTimeSeriesEngine(percentile_agg,60000,[percentile(temperature,50)asmedian,//中位数 percentile(temperature,95)asp95,//95分位 percentile(temperature,99)asp99],//99分位 output_table,timestamp,device_id)4.3 自定义聚合//自定义聚合指标 aggcreateTimeSeriesEngine(custom_agg,60000,[avg(temperature)asavg_temp,max(temperature)-min(temperature)asrange,//极差 std(temperature)/avg(temperature)ascv],//变异系数 output_table,timestamp,device_id)4.4 多列聚合//多列聚合 aggcreateTimeSeriesEngine(multi_col_agg,60000,[avg(temperature)asavg_temp,avg(humidity)asavg_humidity,avg(pressure)asavg_pressure,corr(temperature,humidity)ascorr_temp_humid],output_table,timestamp,device_id)五、引擎管理5.1 查看引擎状态//查看引擎状态 getStreamEngineStat()//查看特定引擎 getStreamEngineStat(ts_engine)5.2 删除引擎//删除引擎 dropStreamEngine(ts_engine)//删除所有引擎 dropStreamEngine()5.3 引擎监控//引擎监控函数defmonitorEngine(){statgetStreamEngineStat()for(rowinstat){print(引擎: row.name)print( 状态: row.status)print( 处理行数: string(row.processedRows))print( 输出行数: string(row.outputRows))}}monitorEngine()六、实战案例6.1 设备实时监控//1.创建流表share streamTable(100000:0,device_idtimestamptemperaturehumiditypressure,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE])assensor_stream//2.创建输出表share table(1:0,device_idtimestampavg_tempmax_tempmin_tempavg_humiditycnt,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG])asmonitor_result//3.创建时间序列引擎aggcreateTimeSeriesEngine(monitor_engine,60000,//1分钟窗口[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,avg(humidity)asavg_humidity,count(*)ascnt],monitor_result,timestamp,device_id)//4.订阅流表subscribeTable(,sensor_stream,monitor_agg,-1,agg,true)//5.启用持久化enableTablePersistence(sensor_stream,true,true,1000000)//6.模拟数据写入defsimulateData(){for(iin1..100){sensor_stream.append!(table(take(1..100,1000)asdevice_id,take(now(),1000)astimestamp,rand(20.0..30.0,1000)astemperature,rand(40.0..60.0,1000)ashumidity,rand(1000.0..1020.0,1000)aspressure))sleep(100)}}//执行模拟 simulateData()//查看结果 select top20*frommonitor_result6.2 实时告警系统//1.创建流表share streamTable(100000:0,device_idtimestamptemperature,[INT,TIMESTAMP,DOUBLE])assensor_stream//2.创建输出表share table(1:0,device_idtimestampavg_tempalert,[INT,TIMESTAMP,DOUBLE,BOOL])asalert_result//3.创建告警引擎aggcreateTimeSeriesEngine(alert_engine,30000,//30秒窗口[avg(temperature)asavg_temp,avg(temperature)30asalert],//温度30告警 alert_result,timestamp,device_id)//4.订阅流表subscribeTable(,sensor_stream,alert_agg,-1,agg,true)//5.查询告警select*fromalert_result where alerttrue6.3 数据降采样//1.创建高频数据流表share streamTable(100000:0,device_idtimestampvalue,[INT,TIMESTAMP,DOUBLE])ashigh_freq_stream//2.创建降采样输出表share table(1:0,device_idtimestampopenhighlowclosecnt,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG])asohlc_result//3.创建降采样引擎aggcreateTimeSeriesEngine(ohlc_engine,60000,//1分钟降采样[first(value)asopen,//开盘max(value)ashigh,//最高min(value)aslow,//最低 last(value)asclose,//收盘 count(*)ascnt],//数量 ohlc_result,timestamp,device_id)//4.订阅流表subscribeTable(,high_freq_stream,ohlc_agg,-1,agg,true)七、性能优化7.1 窗口大小选择场景建议窗口大小实时监控10-60秒告警检测30-60秒趋势分析1-5分钟数据降采样1-10分钟7.2 分组数量//分组数量建议//单引擎分组数10000//如果分组数过多考虑//1.使用多个引擎//2.使用哈希分组7.3 内存管理//设置垃圾回收阈值 aggcreateTimeSeriesEngine(ts_engine,60000,[avg(temperature)asavg_temp],output_table,timestamp,device_id,100000)//garbageSize100000超过10万条触发GC八、总结本文详细介绍了DolphinDB时间序列引擎引擎原理实时时间窗口聚合创建方法简单引擎、分组引擎窗口类型固定窗口、滑动窗口、时间对齐聚合指标基本聚合、百分位、自定义引擎管理状态查看、删除、监控实战应用实时监控、告警系统、数据降采样思考题如何选择合适的窗口大小固定窗口和滑动窗口有什么区别如何设计实时告警系统参考资料DolphinDB时间序列引擎DolphinDB流计算
DolphinDB时间序列引擎:实时聚合计算
目录摘要一、时间序列引擎概述1.1 什么是时间序列引擎1.2 时间序列引擎特点1.3 适用场景二、创建时间序列引擎2.1 基本语法2.2 创建简单引擎2.3 创建分组引擎三、窗口类型3.1 固定窗口3.2 滑动窗口3.3 时间对齐四、聚合指标4.1 基本聚合4.2 百分位聚合4.3 自定义聚合4.4 多列聚合五、引擎管理5.1 查看引擎状态5.2 删除引擎5.3 引擎监控六、实战案例6.1 设备实时监控6.2 实时告警系统6.3 数据降采样七、性能优化7.1 窗口大小选择7.2 分组数量7.3 内存管理八、总结参考资料摘要本文深入讲解DolphinDB时间序列引擎。从引擎原理到创建配置从窗口聚合到滑动计算从多设备分组到实时输出全面介绍时间序列引擎的核心功能。通过丰富的代码示例帮助读者掌握实时聚合计算的核心技能。一、时间序列引擎概述1.1 什么是时间序列引擎时间序列引擎是DolphinDB内置的流计算引擎用于实时时间窗口聚合时间序列引擎数据流时间窗口窗口聚合实时输出核心功能固定窗口滑动窗口会话窗口1.2 时间序列引擎特点特点说明实时聚合毫秒级延迟窗口计算支持多种窗口分组聚合支持多设备分组自动触发窗口结束自动输出1.3 适用场景场景说明实时监控设备指标实时统计告警检测实时阈值告警趋势分析实时趋势计算数据降采样高频数据降采样二、创建时间序列引擎2.1 基本语法//创建时间序列引擎 aggcreateTimeSeriesEngine(engine_name,//引擎名称 windowSize,//窗口大小毫秒 metrics,//聚合指标 outputTable,//输出表 timeColumn,//时间列[keyColumn],//分组列可选[garbageSize],//垃圾回收阈值[roundTime]//时间对齐方式)2.2 创建简单引擎//创建输入流表 share streamTable(1:0,timestamptemperature,[TIMESTAMP,DOUBLE])asinput_stream//创建输出表 share table(1:0,timestampavg_tempmax_tempmin_tempcnt,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asoutput_table//创建时间序列引擎 aggcreateTimeSeriesEngine(ts_engine,60000,//60秒窗口[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(temperature)ascnt],output_table,timestamp)//订阅流表 subscribeTable(,input_stream,ts_agg,-1,agg,true)2.3 创建分组引擎//创建输入流表 share streamTable(1:0,device_idtimestamptemperature,[INT,TIMESTAMP,DOUBLE])asinput_stream//创建输出表 share table(1:0,device_idtimestampavg_tempmax_tempmin_tempcnt,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asoutput_table//创建分组时间序列引擎 aggcreateTimeSeriesEngine(grouped_ts_engine,60000,[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(temperature)ascnt],output_table,timestamp,device_id)//订阅流表 subscribeTable(,input_stream,grouped_ts_agg,-1,agg,true)三、窗口类型3.1 固定窗口//固定窗口窗口大小固定不重叠//例如每60秒一个窗口 aggcreateTimeSeriesEngine(fixed_window,60000,[avg(temperature)asavg_temp,count(*)ascnt],output_table,timestamp,device_id)//窗口划分//[00:00:00,00:01:00)-窗口1//[00:01:00,00:02:00)-窗口2//[00:02:00,00:03:00)-窗口33.2 滑动窗口//滑动窗口窗口按步长滑动//使用createTimeSeriesEngine的step参数 aggcreateTimeSeriesEngine(sliding_window,60000,//窗口大小60秒[avg(temperature)asavg_temp],output_table,timestamp,device_id,,,,10000)//step10秒//窗口划分每10秒输出一次//[00:00:00,00:01:00)-00:01:00输出//[00:00:10,00:01:10)-00:01:10输出//[00:00:20,00:01:20)-00:01:20输出3.3 时间对齐//时间对齐窗口边界对齐到整点时间 aggcreateTimeSeriesEngine(aligned_window,60000,[avg(temperature)asavg_temp],output_table,timestamp,device_id,,,true)//roundTimetrue时间对齐//对齐效果//数据时间00:00:30-窗口[00:00:00,00:01:00)//数据时间00:01:15-窗口[00:01:00,00:02:00)四、聚合指标4.1 基本聚合//基本聚合函数 aggcreateTimeSeriesEngine(basic_agg,60000,[avg(temperature)asavg_temp,//平均值sum(temperature)assum_temp,//总和max(temperature)asmax_temp,//最大值min(temperature)asmin_temp,//最小值 count(temperature)ascnt,//计数 std(temperature)asstd_temp,//标准差 var(temperature)asvar_temp],//方差 output_table,timestamp,device_id)4.2 百分位聚合//百分位聚合 aggcreateTimeSeriesEngine(percentile_agg,60000,[percentile(temperature,50)asmedian,//中位数 percentile(temperature,95)asp95,//95分位 percentile(temperature,99)asp99],//99分位 output_table,timestamp,device_id)4.3 自定义聚合//自定义聚合指标 aggcreateTimeSeriesEngine(custom_agg,60000,[avg(temperature)asavg_temp,max(temperature)-min(temperature)asrange,//极差 std(temperature)/avg(temperature)ascv],//变异系数 output_table,timestamp,device_id)4.4 多列聚合//多列聚合 aggcreateTimeSeriesEngine(multi_col_agg,60000,[avg(temperature)asavg_temp,avg(humidity)asavg_humidity,avg(pressure)asavg_pressure,corr(temperature,humidity)ascorr_temp_humid],output_table,timestamp,device_id)五、引擎管理5.1 查看引擎状态//查看引擎状态 getStreamEngineStat()//查看特定引擎 getStreamEngineStat(ts_engine)5.2 删除引擎//删除引擎 dropStreamEngine(ts_engine)//删除所有引擎 dropStreamEngine()5.3 引擎监控//引擎监控函数defmonitorEngine(){statgetStreamEngineStat()for(rowinstat){print(引擎: row.name)print( 状态: row.status)print( 处理行数: string(row.processedRows))print( 输出行数: string(row.outputRows))}}monitorEngine()六、实战案例6.1 设备实时监控//1.创建流表share streamTable(100000:0,device_idtimestamptemperaturehumiditypressure,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE])assensor_stream//2.创建输出表share table(1:0,device_idtimestampavg_tempmax_tempmin_tempavg_humiditycnt,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG])asmonitor_result//3.创建时间序列引擎aggcreateTimeSeriesEngine(monitor_engine,60000,//1分钟窗口[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,avg(humidity)asavg_humidity,count(*)ascnt],monitor_result,timestamp,device_id)//4.订阅流表subscribeTable(,sensor_stream,monitor_agg,-1,agg,true)//5.启用持久化enableTablePersistence(sensor_stream,true,true,1000000)//6.模拟数据写入defsimulateData(){for(iin1..100){sensor_stream.append!(table(take(1..100,1000)asdevice_id,take(now(),1000)astimestamp,rand(20.0..30.0,1000)astemperature,rand(40.0..60.0,1000)ashumidity,rand(1000.0..1020.0,1000)aspressure))sleep(100)}}//执行模拟 simulateData()//查看结果 select top20*frommonitor_result6.2 实时告警系统//1.创建流表share streamTable(100000:0,device_idtimestamptemperature,[INT,TIMESTAMP,DOUBLE])assensor_stream//2.创建输出表share table(1:0,device_idtimestampavg_tempalert,[INT,TIMESTAMP,DOUBLE,BOOL])asalert_result//3.创建告警引擎aggcreateTimeSeriesEngine(alert_engine,30000,//30秒窗口[avg(temperature)asavg_temp,avg(temperature)30asalert],//温度30告警 alert_result,timestamp,device_id)//4.订阅流表subscribeTable(,sensor_stream,alert_agg,-1,agg,true)//5.查询告警select*fromalert_result where alerttrue6.3 数据降采样//1.创建高频数据流表share streamTable(100000:0,device_idtimestampvalue,[INT,TIMESTAMP,DOUBLE])ashigh_freq_stream//2.创建降采样输出表share table(1:0,device_idtimestampopenhighlowclosecnt,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG])asohlc_result//3.创建降采样引擎aggcreateTimeSeriesEngine(ohlc_engine,60000,//1分钟降采样[first(value)asopen,//开盘max(value)ashigh,//最高min(value)aslow,//最低 last(value)asclose,//收盘 count(*)ascnt],//数量 ohlc_result,timestamp,device_id)//4.订阅流表subscribeTable(,high_freq_stream,ohlc_agg,-1,agg,true)七、性能优化7.1 窗口大小选择场景建议窗口大小实时监控10-60秒告警检测30-60秒趋势分析1-5分钟数据降采样1-10分钟7.2 分组数量//分组数量建议//单引擎分组数10000//如果分组数过多考虑//1.使用多个引擎//2.使用哈希分组7.3 内存管理//设置垃圾回收阈值 aggcreateTimeSeriesEngine(ts_engine,60000,[avg(temperature)asavg_temp],output_table,timestamp,device_id,100000)//garbageSize100000超过10万条触发GC八、总结本文详细介绍了DolphinDB时间序列引擎引擎原理实时时间窗口聚合创建方法简单引擎、分组引擎窗口类型固定窗口、滑动窗口、时间对齐聚合指标基本聚合、百分位、自定义引擎管理状态查看、删除、监控实战应用实时监控、告警系统、数据降采样思考题如何选择合适的窗口大小固定窗口和滑动窗口有什么区别如何设计实时告警系统参考资料DolphinDB时间序列引擎DolphinDB流计算