前言你有没有想过每天凌晨3点系统是怎么自动执行数据同步、报表生成、缓存刷新这些定时任务的如果任务执行失败了系统会怎么处理分布式调度器是微服务架构中处理定时任务、异步任务的核心组件。今天我们用C语言从零实现一个分布式调度器· 任务注册与发现· Cron表达式解析· 任务分片并行执行· 故障转移容错处理· 任务状态管理· 监控告警---一、分布式调度器核心原理1. 架构图┌─────────────────────────────────────────────────────────────┐│ 调度器集群 ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││ │ 节点1 │ │ 节点2 │ │ 节点3 │ ││ │ (Leader) │ │ (Follower) │ │ (Follower) │ ││ └──────┬──────┘ └─────────────┘ └─────────────┘ ││ │ ││ ▼ ││ ┌─────────────────────────────────────────────────────┐ ││ │ 任务分片策略 │ ││ │ [0-33%] [33%-66%] [66%-100%] │ ││ └─────────────────────────────────────────────────────┘ │└─────────────────────────────────────────────────────────────┘2. 核心功能功能 说明任务注册 动态注册/注销任务Cron调度 支持标准Cron表达式任务分片 大任务拆分成小片并行故障转移 节点宕机自动转移任务任务状态 运行中/成功/失败/超时可观测性 日志、指标、告警---二、完整代码实现1. 基础数据结构c#include stdio.h#include stdlib.h#include string.h#include unistd.h#include pthread.h#include time.h#include errno.h#include signal.h#define MAX_TASK_NAME 128#define MAX_TASK_PARAM 256#define MAX_NODES 100#define MAX_SHARDS 100// 任务状态typedef enum {TASK_IDLE 0,TASK_RUNNING,TASK_SUCCESS,TASK_FAILED,TASK_TIMEOUT,TASK_CANCELLED} task_status_t;// 任务分片typedef struct task_shard {int shard_index;int total_shards;char param[MAX_TASK_PARAM];char node_id[32];task_status_t status;time_t start_time;time_t end_time;char error_msg[256];} task_shard_t;// 任务定义typedef struct task_definition {char name[MAX_TASK_NAME];char cron_expr[64];int shard_count;int timeout_seconds;int retry_count;void (*execute)(task_shard_t *shard, void *context);void *context;struct task_definition *next;} task_definition_t;// 任务实例typedef struct task_instance {char task_name[MAX_TASK_NAME];char instance_id[64];task_shard_t *shards;int shard_count;int completed_shards;task_status_t status;time_t start_time;time_t end_time;struct task_instance *next;} task_instance_t;// 调度器节点typedef struct scheduler_node {char node_id[32];char host[64];int port;int is_leader;int is_healthy;time_t last_heartbeat;} scheduler_node_t;// 分布式调度器typedef struct distributed_scheduler {char node_id[32];task_definition_t *tasks;task_instance_t *instances;scheduler_node_t *nodes;int node_count;int running;pthread_mutex_t mutex;pthread_t schedule_thread;pthread_t heartbeat_thread;} distributed_scheduler_t;2. Cron表达式解析c// Cron字段typedef struct cron_fields {int minute[60];int minute_count;int hour[24];int hour_count;int day[31];int day_count;int month[12];int month_count;int weekday[7];int weekday_count;} cron_fields_t;// 解析cron表达式简化版int cron_parse(const char *expr, cron_fields_t *fields) {// 格式: 分 时 日 月 周// 示例: 0 0 3 * * * 每天3点// 简化实现只支持 * 和数字char parts[6][32];int count sscanf(expr, %s %s %s %s %s,parts[0], parts[1], parts[2], parts[3], parts[4]);if (count ! 5) return -1;// 解析分钟if (strcmp(parts[0], *) 0) {fields-minute_count 60;for (int i 0; i 60; i) fields-minute[i] i;} else {fields-minute_count 1;fields-minute[0] atoi(parts[0]);}// 解析小时if (strcmp(parts[1], *) 0) {fields-hour_count 24;for (int i 0; i 24; i) fields-hour[i] i;} else {fields-hour_count 1;fields-hour[0] atoi(parts[1]);}// 解析日if (strcmp(parts[2], *) 0) {fields-day_count 31;for (int i 1; i 31; i) fields-day[i-1] i;} else {fields-day_count 1;fields-day[0] atoi(parts[2]);}// 解析月if (strcmp(parts[3], *) 0) {fields-month_count 12;for (int i 1; i 12; i) fields-month[i-1] i;} else {fields-month_count 1;fields-month[0] atoi(parts[3]);}// 解析周if (strcmp(parts[4], *) 0) {fields-weekday_count 7;for (int i 0; i 7; i) fields-weekday[i] i;} else {fields-weekday_count 1;fields-weekday[0] atoi(parts[4]);}return 0;}// 检查时间是否匹配int cron_match(cron_fields_t *fields, struct tm *tm) {int match 0;// 检查分钟for (int i 0; i fields-minute_count; i) {if (fields-minute[i] tm-tm_min) { match 1; break; }}if (!match) return 0;match 0;// 检查小时for (int i 0; i fields-hour_count; i) {if (fields-hour[i] tm-tm_hour) { match 1; break; }}if (!match) return 0;match 0;// 检查日for (int i 0; i fields-day_count; i) {if (fields-day[i] tm-tm_mday) { match 1; break; }}if (!match) return 0;match 0;// 检查月for (int i 0; i fields-month_count; i) {if (fields-month[i] tm-tm_mon 1) { match 1; break; }}if (!match) return 0;match 0;// 检查周for (int i 0; i fields-weekday_count; i) {if (fields-weekday[i] tm-tm_wday) { match 1; break; }}return match;}3. 任务调度核心c// 创建调度器distributed_scheduler_t *scheduler_create(const char *node_id) {distributed_scheduler_t *s malloc(sizeof(distributed_scheduler_t));strcpy(s-node_id, node_id);s-tasks NULL;s-instances NULL;s-nodes malloc(sizeof(scheduler_node_t) * MAX_NODES);s-node_count 0;s-running 1;pthread_mutex_init(s-mutex, NULL);printf(调度器节点启动: %s\n, node_id);return s;}// 注册任务void scheduler_register_task(distributed_scheduler_t *s, const char *name,const char *cron_expr, int shard_count,int timeout_seconds, int retry_count,void (*execute)(task_shard_t*, void*),void *context) {pthread_mutex_lock(s-mutex);task_definition_t *task malloc(sizeof(task_definition_t));strcpy(task-name, name);strcpy(task-cron_expr, cron_expr);task-shard_count shard_count 0 ? shard_count : 1;task-timeout_seconds timeout_seconds 0 ? timeout_seconds : 300;task-retry_count retry_count 0 ? retry_count : 3;task-execute execute;task-context context;task-next s-tasks;s-tasks task;pthread_mutex_unlock(s-mutex);printf([调度器] 注册任务: %s, cron%s, shards%d\n,name, cron_expr, shard_count);}// 执行分片任务void execute_shard_task(distributed_scheduler_t *s, task_definition_t *task,task_instance_t *instance, int shard_idx) {task_shard_t *shard instance-shards[shard_idx];shard-shard_index shard_idx;shard-total_shards task-shard_count;strcpy(shard-node_id, s-node_id);shard-status TASK_RUNNING;shard-start_time time(NULL);printf([调度器] 执行分片 %d/%d: %s\n,shard_idx1, task-shard_count, task-name);// 执行任务if (task-execute) {task-execute(shard, task-context);shard-status TASK_SUCCESS;}shard-end_time time(NULL);// 更新完成状态pthread_mutex_lock(s-mutex);instance-completed_shards;if (instance-completed_shards task-shard_count) {instance-status TASK_SUCCESS;instance-end_time time(NULL);}pthread_mutex_unlock(s-mutex);printf([调度器] 分片 %d/%d 完成: %s\n,shard_idx1, task-shard_count, task-name);}// 检查并触发任务void scheduler_check_tasks(distributed_scheduler_t *s) {time_t now time(NULL);struct tm *tm_now localtime(now);pthread_mutex_lock(s-mutex);task_definition_t *task s-tasks;while (task) {cron_fields_t fields;if (cron_parse(task-cron_expr, fields) 0) {if (cron_match(fields, tm_now)) {// 检查是否已有实例在运行task_instance_t *inst s-instances;int running 0;while (inst) {if (strcmp(inst-task_name, task-name) 0 inst-status TASK_RUNNING) {running 1;break;}inst inst-next;}if (!running) {// 创建新任务实例task_instance_t *new_inst malloc(sizeof(task_instance_t));strcpy(new_inst-task_name, task-name);snprintf(new_inst-instance_id, sizeof(new_inst-instance_id),%s-%ld, task-name, now);new_inst-shard_count task-shard_count;new_inst-shards malloc(sizeof(task_shard_t) * task-shard_count);memset(new_inst-shards, 0, sizeof(task_shard_t) * task-shard_count);new_inst-completed_shards 0;new_inst-status TASK_RUNNING;new_inst-start_time now;new_inst-end_time 0;new_inst-next s-instances;s-instances new_inst;printf([调度器] 触发任务: %s, 分片数: %d\n,task-name, task-shard_count);// 执行所有分片for (int i 0; i task-shard_count; i) {execute_shard_task(s, task, new_inst, i);}}}}task task-next;}pthread_mutex_unlock(s-mutex);}4. 心跳与故障转移c// 心跳检测void *heartbeat_thread(void *arg) {distributed_scheduler_t *s (distributed_scheduler_t*)arg;while (s-running) {sleep(5);// 这里会向其他节点发送心跳// 简化实现打印心跳日志// printf([心跳] %s 存活\n, s-node_id);}return NULL;}// 故障转移void scheduler_failover(distributed_scheduler_t *s) {pthread_mutex_lock(s-mutex);time_t now time(NULL);// 检查每个运行中的任务实例task_instance_t *inst s-instances;while (inst) {if (inst-status TASK_RUNNING) {// 检查是否有分片长时间未完成int timeout_count 0;for (int i 0; i inst-shard_count; i) {if (inst-shards[i].status TASK_RUNNING) {time_t elapsed now - inst-shards[i].start_time;// 获取超时配置task_definition_t *task s-tasks;int timeout 300;while (task) {if (strcmp(task-name, inst-task_name) 0) {timeout task-timeout_seconds;break;}task task-next;}if (elapsed timeout) {// 标记超时需要重新调度inst-shards[i].status TASK_TIMEOUT;strcpy(inst-shards[i].error_msg, Task timeout, rescheduling);timeout_count;}}}if (timeout_count 0) {printf([故障转移] 任务 %s 有 %d 个分片超时重新调度\n,inst-task_name, timeout_count);// 实际会重新分配超时分片}}inst inst-next;}pthread_mutex_unlock(s-mutex);}5. 示例任务c// 示例任务1数据同步void data_sync_task(task_shard_t *shard, void *context) {printf([任务] 数据同步 分片 %d/%d: 同步数据范围 [%d, %d]\n,shard-shard_index1, shard-total_shards,shard-shard_index * 1000 / shard-total_shards,(shard-shard_index1) * 1000 / shard-total_shards);sleep(1); // 模拟执行}// 示例任务2报表生成void report_task(task_shard_t *shard, void *context) {int *report_id (int*)context;printf([任务] 报表生成 分片 %d/%d: 生成报表 %d 的切片 %d\n,shard-shard_index1, shard-total_shards, *report_id, shard-shard_index);sleep(2); // 模拟执行}// 示例任务3缓存刷新void cache_refresh_task(task_shard_t *shard, void *context) {printf([任务] 缓存刷新 分片 %d/%d: 刷新缓存键前缀 %d\n,shard-shard_index1, shard-total_shards, shard-shard_index);sleep(1); // 模拟执行}6. 测试代码cint main() {printf( 分布式调度器测试 \n\n);// 创建调度器distributed_scheduler_t *s scheduler_create(node-001);// 注册任务scheduler_register_task(s, data-sync, 0 0 3 * * *, 4, 300, 3,data_sync_task, NULL);int report_id 1001;scheduler_register_task(s, report-gen, 0 30 2 * * *, 2, 600, 2,report_task, report_id);scheduler_register_task(s, cache-refresh, */10 * * * *, 3, 60, 5,cache_refresh_task, NULL);// 启动心跳线程pthread_t heartbeat_tid;pthread_create(heartbeat_tid, NULL, heartbeat_thread, s);// 主调度循环printf(\n[调度器] 开始调度循环...\n);int loop_count 0;while (s-running loop_count 20) {scheduler_check_tasks(s);scheduler_failover(s);sleep(10);loop_count;// 打印任务实例状态printf(\n 任务实例状态 \n);pthread_mutex_lock(s-mutex);task_instance_t *inst s-instances;while (inst) {printf(任务: %s, 状态: %d, 完成: %d/%d\n,inst-task_name, inst-status,inst-completed_shards, inst-shard_count);inst inst-next;}pthread_mutex_unlock(s-mutex);}s-running 0;pthread_join(heartbeat_tid, NULL);return 0;}---三、编译和运行bashgcc -o scheduler scheduler.c -lpthread./scheduler---四、调度器 vs 其他方案特性 本实现 Quartz XXL-JOB任务分片 ✅ ✅ ✅故障转移 ✅ ✅ ✅Cron调度 ✅ ✅ ✅任务编排 ❌ ✅ ✅可视化 ❌ ✅ ✅依赖 无 JDBC MySQL---五、总结通过这篇文章你学会了· 分布式调度器的核心原理· Cron表达式解析· 任务分片与并行执行· 故障转移机制· 任务状态管理· 心跳检测分布式调度器是大数据平台和微服务架构的核心组件。掌握它你就理解了数据同步、报表生成、定时清理等系统的设计原理。下一篇预告《从零实现一个服务注册中心Eureka与Consul》---评论区分享一下你用调度器解决过什么场景
从零实现一个分布式调度器:任务分片与容错
前言你有没有想过每天凌晨3点系统是怎么自动执行数据同步、报表生成、缓存刷新这些定时任务的如果任务执行失败了系统会怎么处理分布式调度器是微服务架构中处理定时任务、异步任务的核心组件。今天我们用C语言从零实现一个分布式调度器· 任务注册与发现· Cron表达式解析· 任务分片并行执行· 故障转移容错处理· 任务状态管理· 监控告警---一、分布式调度器核心原理1. 架构图┌─────────────────────────────────────────────────────────────┐│ 调度器集群 ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││ │ 节点1 │ │ 节点2 │ │ 节点3 │ ││ │ (Leader) │ │ (Follower) │ │ (Follower) │ ││ └──────┬──────┘ └─────────────┘ └─────────────┘ ││ │ ││ ▼ ││ ┌─────────────────────────────────────────────────────┐ ││ │ 任务分片策略 │ ││ │ [0-33%] [33%-66%] [66%-100%] │ ││ └─────────────────────────────────────────────────────┘ │└─────────────────────────────────────────────────────────────┘2. 核心功能功能 说明任务注册 动态注册/注销任务Cron调度 支持标准Cron表达式任务分片 大任务拆分成小片并行故障转移 节点宕机自动转移任务任务状态 运行中/成功/失败/超时可观测性 日志、指标、告警---二、完整代码实现1. 基础数据结构c#include stdio.h#include stdlib.h#include string.h#include unistd.h#include pthread.h#include time.h#include errno.h#include signal.h#define MAX_TASK_NAME 128#define MAX_TASK_PARAM 256#define MAX_NODES 100#define MAX_SHARDS 100// 任务状态typedef enum {TASK_IDLE 0,TASK_RUNNING,TASK_SUCCESS,TASK_FAILED,TASK_TIMEOUT,TASK_CANCELLED} task_status_t;// 任务分片typedef struct task_shard {int shard_index;int total_shards;char param[MAX_TASK_PARAM];char node_id[32];task_status_t status;time_t start_time;time_t end_time;char error_msg[256];} task_shard_t;// 任务定义typedef struct task_definition {char name[MAX_TASK_NAME];char cron_expr[64];int shard_count;int timeout_seconds;int retry_count;void (*execute)(task_shard_t *shard, void *context);void *context;struct task_definition *next;} task_definition_t;// 任务实例typedef struct task_instance {char task_name[MAX_TASK_NAME];char instance_id[64];task_shard_t *shards;int shard_count;int completed_shards;task_status_t status;time_t start_time;time_t end_time;struct task_instance *next;} task_instance_t;// 调度器节点typedef struct scheduler_node {char node_id[32];char host[64];int port;int is_leader;int is_healthy;time_t last_heartbeat;} scheduler_node_t;// 分布式调度器typedef struct distributed_scheduler {char node_id[32];task_definition_t *tasks;task_instance_t *instances;scheduler_node_t *nodes;int node_count;int running;pthread_mutex_t mutex;pthread_t schedule_thread;pthread_t heartbeat_thread;} distributed_scheduler_t;2. Cron表达式解析c// Cron字段typedef struct cron_fields {int minute[60];int minute_count;int hour[24];int hour_count;int day[31];int day_count;int month[12];int month_count;int weekday[7];int weekday_count;} cron_fields_t;// 解析cron表达式简化版int cron_parse(const char *expr, cron_fields_t *fields) {// 格式: 分 时 日 月 周// 示例: 0 0 3 * * * 每天3点// 简化实现只支持 * 和数字char parts[6][32];int count sscanf(expr, %s %s %s %s %s,parts[0], parts[1], parts[2], parts[3], parts[4]);if (count ! 5) return -1;// 解析分钟if (strcmp(parts[0], *) 0) {fields-minute_count 60;for (int i 0; i 60; i) fields-minute[i] i;} else {fields-minute_count 1;fields-minute[0] atoi(parts[0]);}// 解析小时if (strcmp(parts[1], *) 0) {fields-hour_count 24;for (int i 0; i 24; i) fields-hour[i] i;} else {fields-hour_count 1;fields-hour[0] atoi(parts[1]);}// 解析日if (strcmp(parts[2], *) 0) {fields-day_count 31;for (int i 1; i 31; i) fields-day[i-1] i;} else {fields-day_count 1;fields-day[0] atoi(parts[2]);}// 解析月if (strcmp(parts[3], *) 0) {fields-month_count 12;for (int i 1; i 12; i) fields-month[i-1] i;} else {fields-month_count 1;fields-month[0] atoi(parts[3]);}// 解析周if (strcmp(parts[4], *) 0) {fields-weekday_count 7;for (int i 0; i 7; i) fields-weekday[i] i;} else {fields-weekday_count 1;fields-weekday[0] atoi(parts[4]);}return 0;}// 检查时间是否匹配int cron_match(cron_fields_t *fields, struct tm *tm) {int match 0;// 检查分钟for (int i 0; i fields-minute_count; i) {if (fields-minute[i] tm-tm_min) { match 1; break; }}if (!match) return 0;match 0;// 检查小时for (int i 0; i fields-hour_count; i) {if (fields-hour[i] tm-tm_hour) { match 1; break; }}if (!match) return 0;match 0;// 检查日for (int i 0; i fields-day_count; i) {if (fields-day[i] tm-tm_mday) { match 1; break; }}if (!match) return 0;match 0;// 检查月for (int i 0; i fields-month_count; i) {if (fields-month[i] tm-tm_mon 1) { match 1; break; }}if (!match) return 0;match 0;// 检查周for (int i 0; i fields-weekday_count; i) {if (fields-weekday[i] tm-tm_wday) { match 1; break; }}return match;}3. 任务调度核心c// 创建调度器distributed_scheduler_t *scheduler_create(const char *node_id) {distributed_scheduler_t *s malloc(sizeof(distributed_scheduler_t));strcpy(s-node_id, node_id);s-tasks NULL;s-instances NULL;s-nodes malloc(sizeof(scheduler_node_t) * MAX_NODES);s-node_count 0;s-running 1;pthread_mutex_init(s-mutex, NULL);printf(调度器节点启动: %s\n, node_id);return s;}// 注册任务void scheduler_register_task(distributed_scheduler_t *s, const char *name,const char *cron_expr, int shard_count,int timeout_seconds, int retry_count,void (*execute)(task_shard_t*, void*),void *context) {pthread_mutex_lock(s-mutex);task_definition_t *task malloc(sizeof(task_definition_t));strcpy(task-name, name);strcpy(task-cron_expr, cron_expr);task-shard_count shard_count 0 ? shard_count : 1;task-timeout_seconds timeout_seconds 0 ? timeout_seconds : 300;task-retry_count retry_count 0 ? retry_count : 3;task-execute execute;task-context context;task-next s-tasks;s-tasks task;pthread_mutex_unlock(s-mutex);printf([调度器] 注册任务: %s, cron%s, shards%d\n,name, cron_expr, shard_count);}// 执行分片任务void execute_shard_task(distributed_scheduler_t *s, task_definition_t *task,task_instance_t *instance, int shard_idx) {task_shard_t *shard instance-shards[shard_idx];shard-shard_index shard_idx;shard-total_shards task-shard_count;strcpy(shard-node_id, s-node_id);shard-status TASK_RUNNING;shard-start_time time(NULL);printf([调度器] 执行分片 %d/%d: %s\n,shard_idx1, task-shard_count, task-name);// 执行任务if (task-execute) {task-execute(shard, task-context);shard-status TASK_SUCCESS;}shard-end_time time(NULL);// 更新完成状态pthread_mutex_lock(s-mutex);instance-completed_shards;if (instance-completed_shards task-shard_count) {instance-status TASK_SUCCESS;instance-end_time time(NULL);}pthread_mutex_unlock(s-mutex);printf([调度器] 分片 %d/%d 完成: %s\n,shard_idx1, task-shard_count, task-name);}// 检查并触发任务void scheduler_check_tasks(distributed_scheduler_t *s) {time_t now time(NULL);struct tm *tm_now localtime(now);pthread_mutex_lock(s-mutex);task_definition_t *task s-tasks;while (task) {cron_fields_t fields;if (cron_parse(task-cron_expr, fields) 0) {if (cron_match(fields, tm_now)) {// 检查是否已有实例在运行task_instance_t *inst s-instances;int running 0;while (inst) {if (strcmp(inst-task_name, task-name) 0 inst-status TASK_RUNNING) {running 1;break;}inst inst-next;}if (!running) {// 创建新任务实例task_instance_t *new_inst malloc(sizeof(task_instance_t));strcpy(new_inst-task_name, task-name);snprintf(new_inst-instance_id, sizeof(new_inst-instance_id),%s-%ld, task-name, now);new_inst-shard_count task-shard_count;new_inst-shards malloc(sizeof(task_shard_t) * task-shard_count);memset(new_inst-shards, 0, sizeof(task_shard_t) * task-shard_count);new_inst-completed_shards 0;new_inst-status TASK_RUNNING;new_inst-start_time now;new_inst-end_time 0;new_inst-next s-instances;s-instances new_inst;printf([调度器] 触发任务: %s, 分片数: %d\n,task-name, task-shard_count);// 执行所有分片for (int i 0; i task-shard_count; i) {execute_shard_task(s, task, new_inst, i);}}}}task task-next;}pthread_mutex_unlock(s-mutex);}4. 心跳与故障转移c// 心跳检测void *heartbeat_thread(void *arg) {distributed_scheduler_t *s (distributed_scheduler_t*)arg;while (s-running) {sleep(5);// 这里会向其他节点发送心跳// 简化实现打印心跳日志// printf([心跳] %s 存活\n, s-node_id);}return NULL;}// 故障转移void scheduler_failover(distributed_scheduler_t *s) {pthread_mutex_lock(s-mutex);time_t now time(NULL);// 检查每个运行中的任务实例task_instance_t *inst s-instances;while (inst) {if (inst-status TASK_RUNNING) {// 检查是否有分片长时间未完成int timeout_count 0;for (int i 0; i inst-shard_count; i) {if (inst-shards[i].status TASK_RUNNING) {time_t elapsed now - inst-shards[i].start_time;// 获取超时配置task_definition_t *task s-tasks;int timeout 300;while (task) {if (strcmp(task-name, inst-task_name) 0) {timeout task-timeout_seconds;break;}task task-next;}if (elapsed timeout) {// 标记超时需要重新调度inst-shards[i].status TASK_TIMEOUT;strcpy(inst-shards[i].error_msg, Task timeout, rescheduling);timeout_count;}}}if (timeout_count 0) {printf([故障转移] 任务 %s 有 %d 个分片超时重新调度\n,inst-task_name, timeout_count);// 实际会重新分配超时分片}}inst inst-next;}pthread_mutex_unlock(s-mutex);}5. 示例任务c// 示例任务1数据同步void data_sync_task(task_shard_t *shard, void *context) {printf([任务] 数据同步 分片 %d/%d: 同步数据范围 [%d, %d]\n,shard-shard_index1, shard-total_shards,shard-shard_index * 1000 / shard-total_shards,(shard-shard_index1) * 1000 / shard-total_shards);sleep(1); // 模拟执行}// 示例任务2报表生成void report_task(task_shard_t *shard, void *context) {int *report_id (int*)context;printf([任务] 报表生成 分片 %d/%d: 生成报表 %d 的切片 %d\n,shard-shard_index1, shard-total_shards, *report_id, shard-shard_index);sleep(2); // 模拟执行}// 示例任务3缓存刷新void cache_refresh_task(task_shard_t *shard, void *context) {printf([任务] 缓存刷新 分片 %d/%d: 刷新缓存键前缀 %d\n,shard-shard_index1, shard-total_shards, shard-shard_index);sleep(1); // 模拟执行}6. 测试代码cint main() {printf( 分布式调度器测试 \n\n);// 创建调度器distributed_scheduler_t *s scheduler_create(node-001);// 注册任务scheduler_register_task(s, data-sync, 0 0 3 * * *, 4, 300, 3,data_sync_task, NULL);int report_id 1001;scheduler_register_task(s, report-gen, 0 30 2 * * *, 2, 600, 2,report_task, report_id);scheduler_register_task(s, cache-refresh, */10 * * * *, 3, 60, 5,cache_refresh_task, NULL);// 启动心跳线程pthread_t heartbeat_tid;pthread_create(heartbeat_tid, NULL, heartbeat_thread, s);// 主调度循环printf(\n[调度器] 开始调度循环...\n);int loop_count 0;while (s-running loop_count 20) {scheduler_check_tasks(s);scheduler_failover(s);sleep(10);loop_count;// 打印任务实例状态printf(\n 任务实例状态 \n);pthread_mutex_lock(s-mutex);task_instance_t *inst s-instances;while (inst) {printf(任务: %s, 状态: %d, 完成: %d/%d\n,inst-task_name, inst-status,inst-completed_shards, inst-shard_count);inst inst-next;}pthread_mutex_unlock(s-mutex);}s-running 0;pthread_join(heartbeat_tid, NULL);return 0;}---三、编译和运行bashgcc -o scheduler scheduler.c -lpthread./scheduler---四、调度器 vs 其他方案特性 本实现 Quartz XXL-JOB任务分片 ✅ ✅ ✅故障转移 ✅ ✅ ✅Cron调度 ✅ ✅ ✅任务编排 ❌ ✅ ✅可视化 ❌ ✅ ✅依赖 无 JDBC MySQL---五、总结通过这篇文章你学会了· 分布式调度器的核心原理· Cron表达式解析· 任务分片与并行执行· 故障转移机制· 任务状态管理· 心跳检测分布式调度器是大数据平台和微服务架构的核心组件。掌握它你就理解了数据同步、报表生成、定时清理等系统的设计原理。下一篇预告《从零实现一个服务注册中心Eureka与Consul》---评论区分享一下你用调度器解决过什么场景