本文是 Linux 服务器定时器系列的第二篇在上一篇升序链表定时器的基础上讲解性能更优的小根堆定时器实现。 小根堆定时器将添加定时器的时间复杂度从 O (n) 优化到 O (logn)完美解决了升序链表在高并发下的性能瓶颈是工业界中小型服务器的首选方案。(如果没有学习过升序链表实现定时器推荐先学习一下很多细节在这篇里讲很详细了重复的就不再赘述了 高并发服务器必备升序链表定时器从设计到实现全流程-CSDN博客)一、小根堆定时器核心架构小根堆的核心性质父节点的超时时间永远小于等于左右子节点的超时时间因此堆顶永远是最早超时的定时器。 和升序链表一样采用双向绑定懒删除统一事件源的设计思想保证性能和安全性。1.1 核心数据结构定义// 绑定socket和定时器 struct client_data { sockaddr_in address; int socket; char buf[BUFFER_SIZE]; heap_timer *timer; };用户信息结构体和升序链表完全一致实现了客户端与定时器的双向绑定。// 定时器类 class heap_timer { public: heap_timer(int delay) { expire time(NULL) delay; } public: time_t expire; // 定时器绝对超时时间 void (*cb_func)(client_data *); // 超时回调函数 client_data *user_data; // 绑定的用户数据 };定时器类通过构造函数直接初始化超时时间其余字段和升序链表一致。1.2 时间堆类核心成员private: heap_timer **array; // 堆数组二级指针存储定时器指针 int capacity; // 堆数组的最大容量 int cur_size; // 堆中当前实际元素个数用指针数组存储定时器堆操作需要频繁交换元素交换指针8 字节比交换整个定时器对象快得多也避免了对象拷贝开销new 类型[N]分配连续 N 个元素的数组new 类型仅分配单个元素二、时间堆核心函数实现2.1 构造函数构造函数 1初始化空堆// 构造函数初始化一个大小为cap的空堆 time_heap(int cap) throw(std::exception) : capacity(cap), cur_size(0) { array new heap_timer *[capacity]; // 创建指针数组 if (!array) { throw std::exception(); } // 初始化所有元素为NULL避免野指针 for (int i 0; i capacity; i) { array[i] NULL; } }传入堆的初始容量分配连续内存并初始化所有指针为 NULL。这里的array是time_heap私有的二级指针变量作为指针数组管理定时器。构造函数 2用已有数组初始化堆// 构造函数用已有数组来初始化堆 time_heap(heap_timer **init_array, int size, int capacity) throw(std::exception) : cur_size(size), capacity(capacity) { if (capacity size) { throw std::exception(); // 容量不足直接抛出异常 } array new heap_timer *[capacity]; if (!array) { throw std::exception(); } for (int i 0; i capacity; i) { array[i] NULL; } if (size ! 0) { // 拷贝已有数组元素 for (int i 0; i size; i) { array[i] init_array[i]; } // 从最后一个父节点开始自下而上堆化 for (int i (cur_size - 1) / 2; i 0; i--) { percolate_down(i); } } }(cur_size - 1) / 2是最后一个非叶子节点父节点的索引叶子节点本身就是合法的堆只需对所有父节点执行下滤操作即可完成堆化2.2 析构函数// 销毁时间堆 ~time_heap() { // 先释放每个定时器对象 for (int i 0; i cur_size; i) { delete array[i]; } // 再释放堆数组本身必须用delete[] delete[] array; }先释放数组中每个指针指向的定时器对象再释放数组本身避免内存泄漏。2.3 添加定时器上滤操作// 添加目标定时器timer void add_timer(heap_timer *timer) throw(std::exception) { if (!timer) { return; } // 堆满了自动扩容一倍 if (cur_size capacity) { resize(); } // 新元素先放在堆尾空穴位置 int hole cur_size; int parent 0; // 上滤循环从空穴向上找合适位置 for (; hole 0; hole parent) { parent (hole - 1) / 2; // 计算父节点索引 // 父节点超时时间更小位置合法退出 if (array[parent]-expire timer-expire) break; // 父节点更大将父节点下移到空穴位置 array[hole] array[parent]; } // 空穴最终位置就是新定时器的正确位置 array[hole] timer; }时间复杂度O (logn)仅需遍历二叉树的高度远优于升序链表的 O (n)。在add_timer中如果当前实际数组容量已满触发扩容机制调用resize函数扩容1倍。在堆尾插入空穴通过比较新元素expire和它的父节点不断互换进行上滤操作直到父节点的expire子节点为止新元素插入最后的空穴中。2.4 删除定时器懒删除// 删除目标定时器timer void del_timer(heap_timer *timer) { if (!timer) return; // 延迟销毁仅将回调函数置空标记定时器失效 // 节省删除定时器的O(logn)开销缺点是会造成轻微的数组膨胀 timer-cb_func NULL; }这是时间堆最经典的优化不调整堆结构仅标记失效后续由心跳函数统一清理实现 O (1) 删除。2.5 删除堆顶定时器// 删除堆顶的定时器 void pop_timer() { if (empty()) return; if (array[0]) { delete array[0]; // 释放堆顶定时器内存 // 用堆底最后一个元素替换堆顶 array[0] array[--cur_size]; // 对新堆顶执行下滤操作维持小根堆性质 percolate_down(0); } }2.6 心跳函数处理超时定时器// 心跳函数每5秒执行一次处理所有超时定时器 void tick() { heap_timer *tmp array[0]; time_t cur time(NULL); while (!empty()) { if (!tmp) break; // 堆顶未超时 → 后面所有节点都未超时直接退出 if (tmp-expire cur) break; // 定时器有效未被懒删除→ 执行回调函数 if (array[0]-cb_func) { array[0]-cb_func(array[0]-user_data); } // 删除堆顶自动调整堆结构 pop_timer(); tmp array[0]; } }时间复杂度摊销 O (1)只处理已经超时的节点遇到第一个未超时的就退出。如果定时器到期执行堆顶的定时器任务(在业务函数main里是删除socketfd注册和关闭fd)。在del_timer中把那些因为其他情况(客户端下线等已经提前调用了cb_func处理sockfd)的回调函数设为了NULL也可以正常处理。最后将堆顶元素删除设为下个堆顶继续循环。2.7 下滤操作堆化核心// 下滤操作将指定节点向下调整到合适位置 void percolate_down(int hole) { heap_timer *temp array[hole]; // 保存当前节点 int child 0; for (; ((hole * 2 1) (cur_size - 1)); hole child) { child hole * 2 1; // 左孩子索引 // 找到左右孩子中更小的那个 if ((child (cur_size - 1)) (array[child 1]-expire array[child]-expire)) { child; } // 孩子节点更小 → 孩子上移 if (array[child]-expire temp-expire) { array[hole] array[child]; } else { break; // 位置合法退出 } } // 把原节点放到最终位置 array[hole] temp; }从temp下标的左孩子(hole*21)开始向下遍历数组直到堆尾为止。在循环中检查temp是否有右孩子右孩子expire是否比左孩子小选出两孩子中最小的一个跟temp的expire比较。temp孩子的expire直接跳出循环。否则让孩子节点跟temp(hole位置)互换,temp位置变成child。最后将空出的hole位置赋值为temp。2.8 堆扩容// 将堆数组扩大一倍 void resize() throw(std::exception) { heap_timer **temp new heap_timer *[2 * capacity]; for (int i 0; i 2 * capacity; i) { temp[i] NULL; } if (!temp) { throw std::exception(); } capacity 2 * capacity; // 复制原数组元素到新数组 for (int i 0; i cur_size; i) { temp[i] array[i]; } // 释放原数组指向新数组 delete[] array; array temp; }每次扩容为原来的 2 倍避免频繁扩容的开销。三、服务器集成定时器大部分代码和升序链表定时器完全一致重点讲解不同的地方。3.1 全局变量初始化// 利用小根堆来管理定时器初始容量和最大文件描述符数一致 static time_heap t_heap(FD_LIMIT); static int epollfd 0;声明全局静态变量t_heap来管理所有的定时器3.2 定时任务处理函数void timer_handler() { // 调用心跳函数处理所有超时定时器 t_heap.tick(); // 重新设置闹钟5秒后再次触发 alarm(TIMESLOT); }3.3 新客户端连接处理// 处理新到的客户连接 if(sockfd listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength sizeof(client_address); int connfd accept(listenfd, (struct sockaddr*)client_address, client_addrlength); addfd(epollfd, connfd); users[connfd].address client_address; users[connfd].socket connfd; // 创建定时器超时时间15秒 heap_timer* timer new heap_timer(15); timer-user_data users[connfd]; timer-cb_func cb_func; // 双向绑定 users[connfd].timer timer; // 添加到时间堆 t_heap.add_timer(timer); }初始化timer需要传入delay延时时间因为还没插入到小根堆中也可以直接修改expire配制完后插入小根堆。3.4 客户端读事件处理else if(events[i].events EPOLLIN) { memset(users[sockfd].buf, \0, BUFFER_SIZE); ret recv(sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0); printf(get %d bytes of client data %s from %d\n, ret, users[sockfd].buf, sockfd); heap_timer* timer users[sockfd].timer; if(ret 0) { // 发生致命错误关闭连接 if(errno ! EAGAIN) { cb_func(users[sockfd]); // 关闭fd、从epoll移除 if(timer){ t_heap.del_timer(timer); // 给tick看的tick到时直接跳过执行、释放内存 users[sockfd].timer NULL; // 清空指针防止野指针 } } } else if(ret 0) { // 对端关闭连接这边也关闭连接并移除对应定时器 cb_func(users[sockfd]); if(timer) { t_heap.del_timer(timer); users[sockfd].timer NULL; } } else { // 如果某个客户连接有数据可读也需要调整对应的timer延迟关闭的时间 if(timer) { // 懒删除旧定时器 t_heap.del_timer(timer); users[sockfd].timer NULL; //adjust_timer(timer); // 创建新定时器设置新的超时时间 heap_timer* new_timer new heap_timer(0); time_t cur time(NULL); new_timer-expire cur 3*TIMESLOT; new_timer-user_data users[sockfd]; new_timer-cb_func cb_func; // 添加新定时器到堆 t_heap.add_timer(new_timer); users[sockfd].timer new_timer; printf(adjust timer once\n); } } }接收客户端数据时发生错误或者客户端下线先调用回调函数取消epoll注册事件并关闭socketfd调用del_timer将timer的回调函数设为NULL再清空客户端的定时器指针设为NULL防止野指针。如果正常读取到数据延时客户端的定时器expire时间在这里不像升序链表提供了adjust_timer函数可以调整在链表内的顺序只能通过新创建一个heap_timer将值拷贝过去重新设置expire的方式来更新延时。将旧的heap_timer的回调函数设为NULL新的heap_timer赋给users的timers。和升序链表的最大区别时间堆没有实现adjust_timer函数而是通过「删除旧定时器 添加新定时器」的方式实现超时时间刷新这是时间堆最标准的实现方式。四、完整可运行代码min_heap.h#ifndef MIN_HEAP #define MIN_HEAP #include time.h #include sys/socket.h #include netinet/in.h #include arpa/inet.h #include assert.h #include stdio.h #include unistd.h #include stdlib.h #include errno.h #include string.h #include sys/types.h #include sys/stat.h #include fcntl.h #include sys/sendfile.h #include sys/epoll.h #include pthread.h #include signal.h using std::exception; #define BUFFER_SIZE 64 class heap_timer; // 提前声明 // 绑定socket和定时器 struct client_data { sockaddr_in address; int socket; char buf[BUFFER_SIZE]; heap_timer *timer; }; // 定时器类 class heap_timer { public: heap_timer(int delay) { expire time(NULL) delay; } public: time_t expire; // 定时器绝对时间 void (*cb_func)(client_data *); // 定时器的回调函数 client_data *user_data; // 用户数据 }; // 时间堆类 class time_heap { public: // 构造函数初始化一个大小为cap的空堆 time_heap(int cap) throw(std::exception) : capacity(cap), cur_size(0) { array new heap_timer *[capacity]; // 创建堆数组(指针数组) if (!array) { throw std::exception(); } for (int i 0; i capacity; i) { array[i] NULL; } } // 构造函数用已有数组来初始化堆 time_heap(heap_timer **init_array, int size, int capacity) throw(std::exception) : cur_size(size), capacity(capacity) { if (capacity size) { throw std::exception(); // 如果实际容量堆数组容量直接抛出异常 } array new heap_timer *[capacity]; // 创建堆数组 if (!array) { throw std::exception(); } for (int i 0; i capacity; i) { array[i] NULL; } if (size ! 0) { // 初始化堆数组 for (int i 0; i size; i) { array[i] init_array[i]; } // (cur_size - 1) / 2是最后一个父节点(非叶子节点) for (int i (cur_size - 1) / 2; i 0; i--) { // 对数组的第(cur_size-1)/2~0个元素做下滤操作 percolate_down(i); } } } // 销毁时间堆 ~time_heap() { for (int i 0; i cur_size; i) { delete array[i]; } delete[] array; } public: // 添加目标定时器timer void add_timer(heap_timer *timer) throw(std::exception) { if (!timer) { return; } if (cur_size capacity) { // 如果当前堆数组容量不够就扩大一倍 resize(); } // 新插入了一个元素当前堆大小1hole是新建空穴的位置 int hole cur_size; int parent 0; // 对空穴到根节点的路径下的所有节点执行上滤操作 for (; hole 0; hole parent) { parent (hole - 1) / 2; // 找父节点 if (array[parent]-expire timer-expire) break; // 父节点expire子节点是合适位置 array[hole] array[parent]; // 父节点互换到原来的洞里 } array[hole] timer; } // 删除目标定时器timer void del_timer(heap_timer *timer) { if (!timer) return; // 仅仅将目标定时器的回调函数设为空即所谓的延迟销毁 // 可以节省删除定时器开销但也会造成数组膨胀 timer-cb_func NULL; } // 获得堆顶的定时器 heap_timer *top() const { if (empty()) return NULL; return array[0]; } // 删除堆顶的定时器 void pop_timer() { if (empty()) return; if (array[0]) { delete array[0]; // 将原来的堆顶元素替换成堆元素中最后一个元素 array[0] array[--cur_size]; percolate_down(0); // 对新的堆顶做下滤 } } // 心跳函数 void tick() { heap_timer *tmp array[0]; time_t cur time(NULL); while (!empty()) { if (!tmp) break; // 如果定时器没到期退出循环 if (tmp-expire cur) break; // 到期执行堆顶定时器任务 if (array[0]-cb_func) { array[0]-cb_func(array[0]-user_data); } // 堆顶元素删除同时生成新的堆顶定时器 pop_timer(); tmp array[0]; } } bool empty() const { return cur_size 0; } private: // 下滤操作 void percolate_down(int hole) { heap_timer *temp array[hole]; int child 0; for (; ((hole * 2 1) (cur_size - 1)); hole child) { child hole * 2 1; // 左孩子 if ((child (cur_size - 1)) (array[child 1]-expire array[child]-expire)) { child; } if (array[child]-expire temp-expire) { array[hole] array[child]; } else { break; } } array[hole] temp; } // 将堆数组扩大一倍 void resize() throw(std::exception) { heap_timer **temp new heap_timer *[2 * capacity]; for (int i 0; i 2 * capacity; i) { temp[i] NULL; } if (!temp) { throw std::exception(); } capacity 2 * capacity; for (int i 0; i cur_size; i) { temp[i] array[i]; } delete[] array; array temp; } private: heap_timer **array; // 堆数组二级指针 int capacity; // 堆数组的容量 int cur_size; // 堆数组中实际元素个数 }; #endifmain.cpp#include min_heap.h using namespace std; #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 #define TIMESLOT 5 static int pipefd[2]; // 利用小根堆来管理定时器 static time_heap t_heap(FD_LIMIT); static int epollfd 0; int setnonblocking(int fd) { int old_option fcntl(fd, F_GETFL); int new_option old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int epollfd, int fd) { epoll_event event; event.data.fd fd; event.events EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, event); setnonblocking(fd); } void sig_handler(int sig) { int save_errno errno; int msg sig; send(pipefd[1], (char*)msg, 1, 0); errno save_errno; } void addsig(int sig) { struct sigaction sa; memset(sa, \0, sizeof(sa)); sa.sa_handler sig_handler; sa.sa_flags | SA_RESTART; sigfillset(sa.sa_mask); assert(sigaction(sig, sa, NULL) ! -1); } void timer_handler() { // 定时处理任务实际上就是使用tick函数 t_heap.tick(); // 因为一次alarm调用只会引起一次SIGALRM信号所以要重新定时 alarm(TIMESLOT); } // 定时器回调函数删除非活动连接socket上的注册事件并关闭fd void cb_func(client_data* user_data) { // 安全判断防止重复关闭 if(user_data-socket 0) return; epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data-socket, 0); assert(user_data); close(user_data-socket); printf(close fd %d\n, user_data-socket); // 关闭后清空fd防止重复关闭 user_data-socket -1; } int main(int argc, char* argv[]) { if(argc 2) { printf(usage: %s ip port\n, argv[0]); return -1; } const char* ip argv[1]; int port atoi(argv[2]); int ret 0; struct sockaddr_in address; bzero(address, sizeof(address)); address.sin_family AF_INET; inet_pton(AF_INET, ip, address.sin_addr); address.sin_port htons(port); int listenfd socket(PF_INET, SOCK_STREAM, 0); assert(listenfd 0); ret bind(listenfd, (struct sockaddr*)address, sizeof(address)); assert(ret ! -1); ret listen(listenfd, 5); assert(ret ! -1); epoll_event events[MAX_EVENT_NUMBER]; epollfd epoll_create1(0); assert(epollfd ! -1); addfd(epollfd, listenfd); // 创建全双工无名管道 ret socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); assert(ret ! -1); // 避免信号处理函数阻塞 setnonblocking(pipefd[1]); addfd(epollfd, pipefd[0]); // 设置信号处理函数 addsig(SIGALRM); // 闹钟定时信号 addsig(SIGTERM); // 程序终止信号 bool stop_server false; // 用户信息数组 client_data* users new client_data[FD_LIMIT]; bool timeout false; alarm(TIMESLOT); // 定时 while(!stop_server) { int number epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if((number 0) (errno ! EINTR)) { printf(epoll failure\n); break; } for(int i 0; i number; i) { int sockfd events[i].data.fd; // 处理新到的客户连接 if(sockfd listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength sizeof(client_address); int connfd accept(listenfd, (struct sockaddr*)client_address, client_addrlength); addfd(epollfd, connfd); users[connfd].address client_address; users[connfd].socket connfd; // 创建定时器设置回调函数和超时时间绑定定时器与用户数据 heap_timer* timer new heap_timer(15); timer-user_data users[connfd]; timer-cb_func cb_func; // 双向绑定 users[connfd].timer timer; t_heap.add_timer(timer); } else if((sockfd pipefd[0]) (events[i].events EPOLLIN)) { int sig; char signals[1024]; ret recv(pipefd[0], signals, sizeof(signals), 0); if(ret -1 || ret 0) { continue; } else{ for(int i 0; i ret; i) { switch(signals[i]) { case SIGALRM: { // 用timeout标记有定时任务优先处理IO事件 timeout true; break; } case SIGTERM: { stop_server true; break; } } } } } else if(events[i].events EPOLLIN) { memset(users[sockfd].buf, \0, BUFFER_SIZE); ret recv(sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0); printf(get %d bytes of client data %s from %d\n, ret, users[sockfd].buf, sockfd); heap_timer* timer users[sockfd].timer; if(ret 0) { // 发生错误关闭连接并移除定时器 if(errno ! EAGAIN) { cb_func(users[sockfd]); if(timer){ t_heap.del_timer(timer); users[sockfd].timer NULL; } } } else if(ret 0) { // 对端关闭连接这边也关闭连接 cb_func(users[sockfd]); if(timer) { t_heap.del_timer(timer); users[sockfd].timer NULL; } } else{ // 客户端有数据可读延长超时时间 if(timer) { t_heap.del_timer(timer); users[sockfd].timer NULL; // 创建新定时器设置新的超时时间 heap_timer* new_timer new heap_timer(0); time_t cur time(NULL); new_timer-expire cur 3*TIMESLOT; new_timer-user_data users[sockfd]; new_timer-cb_func cb_func; t_heap.add_timer(new_timer); users[sockfd].timer new_timer; printf(adjust timer once\n); } } } } // 最后处理定时事件IO事件优先级更高 if(timeout){ timer_handler(); timeout false; } } close(listenfd); close(pipefd[1]); close(pipefd[0]); delete [] users; return 0; }编译运行# 编译 g main.cpp -o server -Wall # 运行 ./server ip 8888 # 测试 nc ip 8888(原笔记GameServer-Learning/00-Notes/Network/linux-high-performance at main · maomianbaobumoyu/GameServer-Learning)
高并发服务器必备:小根堆定时器从设计到实现全流程
本文是 Linux 服务器定时器系列的第二篇在上一篇升序链表定时器的基础上讲解性能更优的小根堆定时器实现。 小根堆定时器将添加定时器的时间复杂度从 O (n) 优化到 O (logn)完美解决了升序链表在高并发下的性能瓶颈是工业界中小型服务器的首选方案。(如果没有学习过升序链表实现定时器推荐先学习一下很多细节在这篇里讲很详细了重复的就不再赘述了 高并发服务器必备升序链表定时器从设计到实现全流程-CSDN博客)一、小根堆定时器核心架构小根堆的核心性质父节点的超时时间永远小于等于左右子节点的超时时间因此堆顶永远是最早超时的定时器。 和升序链表一样采用双向绑定懒删除统一事件源的设计思想保证性能和安全性。1.1 核心数据结构定义// 绑定socket和定时器 struct client_data { sockaddr_in address; int socket; char buf[BUFFER_SIZE]; heap_timer *timer; };用户信息结构体和升序链表完全一致实现了客户端与定时器的双向绑定。// 定时器类 class heap_timer { public: heap_timer(int delay) { expire time(NULL) delay; } public: time_t expire; // 定时器绝对超时时间 void (*cb_func)(client_data *); // 超时回调函数 client_data *user_data; // 绑定的用户数据 };定时器类通过构造函数直接初始化超时时间其余字段和升序链表一致。1.2 时间堆类核心成员private: heap_timer **array; // 堆数组二级指针存储定时器指针 int capacity; // 堆数组的最大容量 int cur_size; // 堆中当前实际元素个数用指针数组存储定时器堆操作需要频繁交换元素交换指针8 字节比交换整个定时器对象快得多也避免了对象拷贝开销new 类型[N]分配连续 N 个元素的数组new 类型仅分配单个元素二、时间堆核心函数实现2.1 构造函数构造函数 1初始化空堆// 构造函数初始化一个大小为cap的空堆 time_heap(int cap) throw(std::exception) : capacity(cap), cur_size(0) { array new heap_timer *[capacity]; // 创建指针数组 if (!array) { throw std::exception(); } // 初始化所有元素为NULL避免野指针 for (int i 0; i capacity; i) { array[i] NULL; } }传入堆的初始容量分配连续内存并初始化所有指针为 NULL。这里的array是time_heap私有的二级指针变量作为指针数组管理定时器。构造函数 2用已有数组初始化堆// 构造函数用已有数组来初始化堆 time_heap(heap_timer **init_array, int size, int capacity) throw(std::exception) : cur_size(size), capacity(capacity) { if (capacity size) { throw std::exception(); // 容量不足直接抛出异常 } array new heap_timer *[capacity]; if (!array) { throw std::exception(); } for (int i 0; i capacity; i) { array[i] NULL; } if (size ! 0) { // 拷贝已有数组元素 for (int i 0; i size; i) { array[i] init_array[i]; } // 从最后一个父节点开始自下而上堆化 for (int i (cur_size - 1) / 2; i 0; i--) { percolate_down(i); } } }(cur_size - 1) / 2是最后一个非叶子节点父节点的索引叶子节点本身就是合法的堆只需对所有父节点执行下滤操作即可完成堆化2.2 析构函数// 销毁时间堆 ~time_heap() { // 先释放每个定时器对象 for (int i 0; i cur_size; i) { delete array[i]; } // 再释放堆数组本身必须用delete[] delete[] array; }先释放数组中每个指针指向的定时器对象再释放数组本身避免内存泄漏。2.3 添加定时器上滤操作// 添加目标定时器timer void add_timer(heap_timer *timer) throw(std::exception) { if (!timer) { return; } // 堆满了自动扩容一倍 if (cur_size capacity) { resize(); } // 新元素先放在堆尾空穴位置 int hole cur_size; int parent 0; // 上滤循环从空穴向上找合适位置 for (; hole 0; hole parent) { parent (hole - 1) / 2; // 计算父节点索引 // 父节点超时时间更小位置合法退出 if (array[parent]-expire timer-expire) break; // 父节点更大将父节点下移到空穴位置 array[hole] array[parent]; } // 空穴最终位置就是新定时器的正确位置 array[hole] timer; }时间复杂度O (logn)仅需遍历二叉树的高度远优于升序链表的 O (n)。在add_timer中如果当前实际数组容量已满触发扩容机制调用resize函数扩容1倍。在堆尾插入空穴通过比较新元素expire和它的父节点不断互换进行上滤操作直到父节点的expire子节点为止新元素插入最后的空穴中。2.4 删除定时器懒删除// 删除目标定时器timer void del_timer(heap_timer *timer) { if (!timer) return; // 延迟销毁仅将回调函数置空标记定时器失效 // 节省删除定时器的O(logn)开销缺点是会造成轻微的数组膨胀 timer-cb_func NULL; }这是时间堆最经典的优化不调整堆结构仅标记失效后续由心跳函数统一清理实现 O (1) 删除。2.5 删除堆顶定时器// 删除堆顶的定时器 void pop_timer() { if (empty()) return; if (array[0]) { delete array[0]; // 释放堆顶定时器内存 // 用堆底最后一个元素替换堆顶 array[0] array[--cur_size]; // 对新堆顶执行下滤操作维持小根堆性质 percolate_down(0); } }2.6 心跳函数处理超时定时器// 心跳函数每5秒执行一次处理所有超时定时器 void tick() { heap_timer *tmp array[0]; time_t cur time(NULL); while (!empty()) { if (!tmp) break; // 堆顶未超时 → 后面所有节点都未超时直接退出 if (tmp-expire cur) break; // 定时器有效未被懒删除→ 执行回调函数 if (array[0]-cb_func) { array[0]-cb_func(array[0]-user_data); } // 删除堆顶自动调整堆结构 pop_timer(); tmp array[0]; } }时间复杂度摊销 O (1)只处理已经超时的节点遇到第一个未超时的就退出。如果定时器到期执行堆顶的定时器任务(在业务函数main里是删除socketfd注册和关闭fd)。在del_timer中把那些因为其他情况(客户端下线等已经提前调用了cb_func处理sockfd)的回调函数设为了NULL也可以正常处理。最后将堆顶元素删除设为下个堆顶继续循环。2.7 下滤操作堆化核心// 下滤操作将指定节点向下调整到合适位置 void percolate_down(int hole) { heap_timer *temp array[hole]; // 保存当前节点 int child 0; for (; ((hole * 2 1) (cur_size - 1)); hole child) { child hole * 2 1; // 左孩子索引 // 找到左右孩子中更小的那个 if ((child (cur_size - 1)) (array[child 1]-expire array[child]-expire)) { child; } // 孩子节点更小 → 孩子上移 if (array[child]-expire temp-expire) { array[hole] array[child]; } else { break; // 位置合法退出 } } // 把原节点放到最终位置 array[hole] temp; }从temp下标的左孩子(hole*21)开始向下遍历数组直到堆尾为止。在循环中检查temp是否有右孩子右孩子expire是否比左孩子小选出两孩子中最小的一个跟temp的expire比较。temp孩子的expire直接跳出循环。否则让孩子节点跟temp(hole位置)互换,temp位置变成child。最后将空出的hole位置赋值为temp。2.8 堆扩容// 将堆数组扩大一倍 void resize() throw(std::exception) { heap_timer **temp new heap_timer *[2 * capacity]; for (int i 0; i 2 * capacity; i) { temp[i] NULL; } if (!temp) { throw std::exception(); } capacity 2 * capacity; // 复制原数组元素到新数组 for (int i 0; i cur_size; i) { temp[i] array[i]; } // 释放原数组指向新数组 delete[] array; array temp; }每次扩容为原来的 2 倍避免频繁扩容的开销。三、服务器集成定时器大部分代码和升序链表定时器完全一致重点讲解不同的地方。3.1 全局变量初始化// 利用小根堆来管理定时器初始容量和最大文件描述符数一致 static time_heap t_heap(FD_LIMIT); static int epollfd 0;声明全局静态变量t_heap来管理所有的定时器3.2 定时任务处理函数void timer_handler() { // 调用心跳函数处理所有超时定时器 t_heap.tick(); // 重新设置闹钟5秒后再次触发 alarm(TIMESLOT); }3.3 新客户端连接处理// 处理新到的客户连接 if(sockfd listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength sizeof(client_address); int connfd accept(listenfd, (struct sockaddr*)client_address, client_addrlength); addfd(epollfd, connfd); users[connfd].address client_address; users[connfd].socket connfd; // 创建定时器超时时间15秒 heap_timer* timer new heap_timer(15); timer-user_data users[connfd]; timer-cb_func cb_func; // 双向绑定 users[connfd].timer timer; // 添加到时间堆 t_heap.add_timer(timer); }初始化timer需要传入delay延时时间因为还没插入到小根堆中也可以直接修改expire配制完后插入小根堆。3.4 客户端读事件处理else if(events[i].events EPOLLIN) { memset(users[sockfd].buf, \0, BUFFER_SIZE); ret recv(sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0); printf(get %d bytes of client data %s from %d\n, ret, users[sockfd].buf, sockfd); heap_timer* timer users[sockfd].timer; if(ret 0) { // 发生致命错误关闭连接 if(errno ! EAGAIN) { cb_func(users[sockfd]); // 关闭fd、从epoll移除 if(timer){ t_heap.del_timer(timer); // 给tick看的tick到时直接跳过执行、释放内存 users[sockfd].timer NULL; // 清空指针防止野指针 } } } else if(ret 0) { // 对端关闭连接这边也关闭连接并移除对应定时器 cb_func(users[sockfd]); if(timer) { t_heap.del_timer(timer); users[sockfd].timer NULL; } } else { // 如果某个客户连接有数据可读也需要调整对应的timer延迟关闭的时间 if(timer) { // 懒删除旧定时器 t_heap.del_timer(timer); users[sockfd].timer NULL; //adjust_timer(timer); // 创建新定时器设置新的超时时间 heap_timer* new_timer new heap_timer(0); time_t cur time(NULL); new_timer-expire cur 3*TIMESLOT; new_timer-user_data users[sockfd]; new_timer-cb_func cb_func; // 添加新定时器到堆 t_heap.add_timer(new_timer); users[sockfd].timer new_timer; printf(adjust timer once\n); } } }接收客户端数据时发生错误或者客户端下线先调用回调函数取消epoll注册事件并关闭socketfd调用del_timer将timer的回调函数设为NULL再清空客户端的定时器指针设为NULL防止野指针。如果正常读取到数据延时客户端的定时器expire时间在这里不像升序链表提供了adjust_timer函数可以调整在链表内的顺序只能通过新创建一个heap_timer将值拷贝过去重新设置expire的方式来更新延时。将旧的heap_timer的回调函数设为NULL新的heap_timer赋给users的timers。和升序链表的最大区别时间堆没有实现adjust_timer函数而是通过「删除旧定时器 添加新定时器」的方式实现超时时间刷新这是时间堆最标准的实现方式。四、完整可运行代码min_heap.h#ifndef MIN_HEAP #define MIN_HEAP #include time.h #include sys/socket.h #include netinet/in.h #include arpa/inet.h #include assert.h #include stdio.h #include unistd.h #include stdlib.h #include errno.h #include string.h #include sys/types.h #include sys/stat.h #include fcntl.h #include sys/sendfile.h #include sys/epoll.h #include pthread.h #include signal.h using std::exception; #define BUFFER_SIZE 64 class heap_timer; // 提前声明 // 绑定socket和定时器 struct client_data { sockaddr_in address; int socket; char buf[BUFFER_SIZE]; heap_timer *timer; }; // 定时器类 class heap_timer { public: heap_timer(int delay) { expire time(NULL) delay; } public: time_t expire; // 定时器绝对时间 void (*cb_func)(client_data *); // 定时器的回调函数 client_data *user_data; // 用户数据 }; // 时间堆类 class time_heap { public: // 构造函数初始化一个大小为cap的空堆 time_heap(int cap) throw(std::exception) : capacity(cap), cur_size(0) { array new heap_timer *[capacity]; // 创建堆数组(指针数组) if (!array) { throw std::exception(); } for (int i 0; i capacity; i) { array[i] NULL; } } // 构造函数用已有数组来初始化堆 time_heap(heap_timer **init_array, int size, int capacity) throw(std::exception) : cur_size(size), capacity(capacity) { if (capacity size) { throw std::exception(); // 如果实际容量堆数组容量直接抛出异常 } array new heap_timer *[capacity]; // 创建堆数组 if (!array) { throw std::exception(); } for (int i 0; i capacity; i) { array[i] NULL; } if (size ! 0) { // 初始化堆数组 for (int i 0; i size; i) { array[i] init_array[i]; } // (cur_size - 1) / 2是最后一个父节点(非叶子节点) for (int i (cur_size - 1) / 2; i 0; i--) { // 对数组的第(cur_size-1)/2~0个元素做下滤操作 percolate_down(i); } } } // 销毁时间堆 ~time_heap() { for (int i 0; i cur_size; i) { delete array[i]; } delete[] array; } public: // 添加目标定时器timer void add_timer(heap_timer *timer) throw(std::exception) { if (!timer) { return; } if (cur_size capacity) { // 如果当前堆数组容量不够就扩大一倍 resize(); } // 新插入了一个元素当前堆大小1hole是新建空穴的位置 int hole cur_size; int parent 0; // 对空穴到根节点的路径下的所有节点执行上滤操作 for (; hole 0; hole parent) { parent (hole - 1) / 2; // 找父节点 if (array[parent]-expire timer-expire) break; // 父节点expire子节点是合适位置 array[hole] array[parent]; // 父节点互换到原来的洞里 } array[hole] timer; } // 删除目标定时器timer void del_timer(heap_timer *timer) { if (!timer) return; // 仅仅将目标定时器的回调函数设为空即所谓的延迟销毁 // 可以节省删除定时器开销但也会造成数组膨胀 timer-cb_func NULL; } // 获得堆顶的定时器 heap_timer *top() const { if (empty()) return NULL; return array[0]; } // 删除堆顶的定时器 void pop_timer() { if (empty()) return; if (array[0]) { delete array[0]; // 将原来的堆顶元素替换成堆元素中最后一个元素 array[0] array[--cur_size]; percolate_down(0); // 对新的堆顶做下滤 } } // 心跳函数 void tick() { heap_timer *tmp array[0]; time_t cur time(NULL); while (!empty()) { if (!tmp) break; // 如果定时器没到期退出循环 if (tmp-expire cur) break; // 到期执行堆顶定时器任务 if (array[0]-cb_func) { array[0]-cb_func(array[0]-user_data); } // 堆顶元素删除同时生成新的堆顶定时器 pop_timer(); tmp array[0]; } } bool empty() const { return cur_size 0; } private: // 下滤操作 void percolate_down(int hole) { heap_timer *temp array[hole]; int child 0; for (; ((hole * 2 1) (cur_size - 1)); hole child) { child hole * 2 1; // 左孩子 if ((child (cur_size - 1)) (array[child 1]-expire array[child]-expire)) { child; } if (array[child]-expire temp-expire) { array[hole] array[child]; } else { break; } } array[hole] temp; } // 将堆数组扩大一倍 void resize() throw(std::exception) { heap_timer **temp new heap_timer *[2 * capacity]; for (int i 0; i 2 * capacity; i) { temp[i] NULL; } if (!temp) { throw std::exception(); } capacity 2 * capacity; for (int i 0; i cur_size; i) { temp[i] array[i]; } delete[] array; array temp; } private: heap_timer **array; // 堆数组二级指针 int capacity; // 堆数组的容量 int cur_size; // 堆数组中实际元素个数 }; #endifmain.cpp#include min_heap.h using namespace std; #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 #define TIMESLOT 5 static int pipefd[2]; // 利用小根堆来管理定时器 static time_heap t_heap(FD_LIMIT); static int epollfd 0; int setnonblocking(int fd) { int old_option fcntl(fd, F_GETFL); int new_option old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int epollfd, int fd) { epoll_event event; event.data.fd fd; event.events EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, event); setnonblocking(fd); } void sig_handler(int sig) { int save_errno errno; int msg sig; send(pipefd[1], (char*)msg, 1, 0); errno save_errno; } void addsig(int sig) { struct sigaction sa; memset(sa, \0, sizeof(sa)); sa.sa_handler sig_handler; sa.sa_flags | SA_RESTART; sigfillset(sa.sa_mask); assert(sigaction(sig, sa, NULL) ! -1); } void timer_handler() { // 定时处理任务实际上就是使用tick函数 t_heap.tick(); // 因为一次alarm调用只会引起一次SIGALRM信号所以要重新定时 alarm(TIMESLOT); } // 定时器回调函数删除非活动连接socket上的注册事件并关闭fd void cb_func(client_data* user_data) { // 安全判断防止重复关闭 if(user_data-socket 0) return; epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data-socket, 0); assert(user_data); close(user_data-socket); printf(close fd %d\n, user_data-socket); // 关闭后清空fd防止重复关闭 user_data-socket -1; } int main(int argc, char* argv[]) { if(argc 2) { printf(usage: %s ip port\n, argv[0]); return -1; } const char* ip argv[1]; int port atoi(argv[2]); int ret 0; struct sockaddr_in address; bzero(address, sizeof(address)); address.sin_family AF_INET; inet_pton(AF_INET, ip, address.sin_addr); address.sin_port htons(port); int listenfd socket(PF_INET, SOCK_STREAM, 0); assert(listenfd 0); ret bind(listenfd, (struct sockaddr*)address, sizeof(address)); assert(ret ! -1); ret listen(listenfd, 5); assert(ret ! -1); epoll_event events[MAX_EVENT_NUMBER]; epollfd epoll_create1(0); assert(epollfd ! -1); addfd(epollfd, listenfd); // 创建全双工无名管道 ret socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); assert(ret ! -1); // 避免信号处理函数阻塞 setnonblocking(pipefd[1]); addfd(epollfd, pipefd[0]); // 设置信号处理函数 addsig(SIGALRM); // 闹钟定时信号 addsig(SIGTERM); // 程序终止信号 bool stop_server false; // 用户信息数组 client_data* users new client_data[FD_LIMIT]; bool timeout false; alarm(TIMESLOT); // 定时 while(!stop_server) { int number epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if((number 0) (errno ! EINTR)) { printf(epoll failure\n); break; } for(int i 0; i number; i) { int sockfd events[i].data.fd; // 处理新到的客户连接 if(sockfd listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength sizeof(client_address); int connfd accept(listenfd, (struct sockaddr*)client_address, client_addrlength); addfd(epollfd, connfd); users[connfd].address client_address; users[connfd].socket connfd; // 创建定时器设置回调函数和超时时间绑定定时器与用户数据 heap_timer* timer new heap_timer(15); timer-user_data users[connfd]; timer-cb_func cb_func; // 双向绑定 users[connfd].timer timer; t_heap.add_timer(timer); } else if((sockfd pipefd[0]) (events[i].events EPOLLIN)) { int sig; char signals[1024]; ret recv(pipefd[0], signals, sizeof(signals), 0); if(ret -1 || ret 0) { continue; } else{ for(int i 0; i ret; i) { switch(signals[i]) { case SIGALRM: { // 用timeout标记有定时任务优先处理IO事件 timeout true; break; } case SIGTERM: { stop_server true; break; } } } } } else if(events[i].events EPOLLIN) { memset(users[sockfd].buf, \0, BUFFER_SIZE); ret recv(sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0); printf(get %d bytes of client data %s from %d\n, ret, users[sockfd].buf, sockfd); heap_timer* timer users[sockfd].timer; if(ret 0) { // 发生错误关闭连接并移除定时器 if(errno ! EAGAIN) { cb_func(users[sockfd]); if(timer){ t_heap.del_timer(timer); users[sockfd].timer NULL; } } } else if(ret 0) { // 对端关闭连接这边也关闭连接 cb_func(users[sockfd]); if(timer) { t_heap.del_timer(timer); users[sockfd].timer NULL; } } else{ // 客户端有数据可读延长超时时间 if(timer) { t_heap.del_timer(timer); users[sockfd].timer NULL; // 创建新定时器设置新的超时时间 heap_timer* new_timer new heap_timer(0); time_t cur time(NULL); new_timer-expire cur 3*TIMESLOT; new_timer-user_data users[sockfd]; new_timer-cb_func cb_func; t_heap.add_timer(new_timer); users[sockfd].timer new_timer; printf(adjust timer once\n); } } } } // 最后处理定时事件IO事件优先级更高 if(timeout){ timer_handler(); timeout false; } } close(listenfd); close(pipefd[1]); close(pipefd[0]); delete [] users; return 0; }编译运行# 编译 g main.cpp -o server -Wall # 运行 ./server ip 8888 # 测试 nc ip 8888(原笔记GameServer-Learning/00-Notes/Network/linux-high-performance at main · maomianbaobumoyu/GameServer-Learning)