基于 Ray 的分布式 AI 训练与推理从单机到集群的弹性扩展一、AI 训练的单机天花板模型太大一张卡装不下7B 参数的模型 FP16 精度需要 14GB 显存训练时加上梯度和优化器状态至少需要 56GB——单张 A100 80GB 勉强能跑但 13B 及以上的模型就必须多卡并行。更关键的是推理服务也需要弹性扩展——白天高峰需要 10 个推理副本夜间低谷只需要 2 个手动调整效率太低。Ray 的核心价值是用统一的编程模型实现从单机到集群的无缝扩展。单机开发时用 Ray 本地模式部署时切换到集群模式代码无需修改。Ray 的 Actor 模型天然适合推理服务有状态的长连接Task 模型适合训练任务无状态的并行计算。二、Ray 分布式架构graph TB subgraph Ray 集群 A[Head Nodebr/GCS Dashboard] -- B[Worker Node 1br/GPU 0,1] A -- C[Worker Node 2br/GPU 2,3] A -- D[Worker Node 3br/CPU only] end subgraph 编程模型 E[Remote Functionbr/无状态并行任务] -- F[训练数据并行] G[Actorbr/有状态长连接] -- H[推理服务] I[Placement Groupbr/资源拓扑约束] -- J[GPU 亲和调度] end subgraph 弹性伸缩 K[Autoscalerbr/根据队列深度扩缩] K -- B K -- C endRay 集群由 Head Node全局控制服务Dashboard和多个 Worker Node 组成。Remote Function 用于无状态的并行任务数据并行训练Actor 用于有状态的长连接服务推理Placement Group 用于 GPU 拓扑约束调度。三、实现3.1 分布式训练import ray # 初始化 Ray本地模式或集群模式 ray.init() ray.remote(num_gpus1) class TrainingWorker: 训练 Worker每个 GPU 一个 Actor def __init__(self, model_config: dict): self.model_config model_config self.model None self.optimizer None def setup(self): 初始化模型和优化器 import torch # 实际场景中加载模型 self.model torch.nn.Linear(768, 768).cuda() self.optimizer torch.optim.Adam( self.model.parameters(), lr1e-4 ) def train_step(self, batch: dict) - dict: 执行一步训练 import torch self.model.train() inputs torch.tensor(batch[input]).cuda() labels torch.tensor(batch[label]).cuda() self.optimizer.zero_grad() outputs self.model(inputs) loss torch.nn.functional.mse_loss(outputs, labels) loss.backward() self.optimizer.step() return {loss: loss.item()} def get_gradients(self) - dict: 获取梯度用于 AllReduce import torch return { name: param.grad.clone() for name, param in self.model.named_parameters() if param.grad is not None } def apply_gradients(self, avg_grads: dict): 应用平均梯度 import torch with torch.no_grad(): for name, param in self.model.named_parameters(): if name in avg_grads: param.grad avg_grads[name].to(param.device) class DistributedTrainer: 分布式训练协调器 def __init__(self, num_workers: int 4): self.num_workers num_workers self.workers [] def setup(self): 创建训练 Workers for _ in range(self.num_workers): worker TrainingWorker.remote(model_config{}) ray.get(worker.setup.remote()) self.workers.append(worker) def train_epoch( self, data_shards: list, num_steps: int 100 ) - list: 执行一轮训练 losses [] for step in range(num_steps): # 并行执行前向反向 futures [] for i, worker in enumerate(self.workers): batch data_shards[i % len(data_shards)] futures.append( worker.train_step.remote(batch) ) # 等待所有 Worker 完成 step_results ray.get(futures) step_loss sum( r[loss] for r in step_results ) / len(step_results) losses.append(step_loss) # AllReduce收集梯度并平均 grad_futures [ w.get_gradients.remote() for w in self.workers ] all_grads ray.get(grad_futures) # 简化平均梯度 avg_grads self._average_gradients(all_grads) # 应用平均梯度 apply_futures [ w.apply_gradients.remote(avg_grads) for w in self.workers ] ray.get(apply_futures) return losses def _average_gradients(self, all_grads: list) - dict: 平均梯度简化版 AllReduce import torch avg {} for name in all_grads[0]: avg[name] torch.stack( [g[name] for g in all_grads] ).mean(0) return avg3.2 弹性推理服务ray.remote(num_gpus0.5) class InferenceActor: 推理 Actor有状态的推理服务 def __init__(self, model_path: str): self.model_path model_path self.model None self.request_count 0 def load_model(self): 加载模型 import torch # 实际场景中加载模型 self.model torch.nn.Linear(768, 768).cuda() self.model.eval() def predict(self, input_data: dict) - dict: 执行推理 import torch with torch.no_grad(): inputs torch.tensor( input_data[features] ).cuda() outputs self.model(inputs) self.request_count 1 return { output: outputs.cpu().tolist(), request_id: input_data.get(id), } def get_load(self) - int: 获取当前负载 return self.request_count class ElasticInferenceService: 弹性推理服务 def __init__( self, model_path: str, min_replicas: int 1, max_replicas: int 8, ): self.model_path model_path self.min_replicas min_replicas self.max_replicas max_replicas self.actors [] def start(self): 启动最小副本数 for _ in range(self.min_replicas): actor InferenceActor.remote(self.model_path) ray.get(actor.load_model.remote()) self.actors.append(actor) def predict(self, input_data: dict) - dict: 路由推理请求到负载最低的 Actor # 获取所有 Actor 的负载 load_futures [ a.get_load.remote() for a in self.actors ] loads ray.get(load_futures) # 选择负载最低的 Actor min_idx loads.index(min(loads)) return ray.get( self.actors[min_idx].predict.remote(input_data) ) def scale(self, target_replicas: int): 调整副本数 target max( self.min_replicas, min(self.max_replicas, target_replicas) ) current len(self.actors) if target current: # 扩容 for _ in range(target - current): actor InferenceActor.remote(self.model_path) ray.get(actor.load_model.remote()) self.actors.append(actor) elif target current: # 缩容 removed self.actors[target:] self.actors self.actors[:target] for actor in removed: ray.kill(actor)四、Ray 分布式方案的 Trade-offs 分析Ray vs. Horovod/DeepSpeedHorovod 和 DeepSpeed 是专门的分布式训练框架对通信优化更深入梯度压缩、混合精度 AllReduce。Ray 的优势是通用性——训练和推理用同一套编程模型部署和运维更简单。如果只需要训练Horovod/DeepSpeed 更优如果训练推理数据处理需要统一调度Ray 更合适。Actor 模型的内存开销每个 Actor 是独立的进程有自己的 Python 解释器和 GPU 上下文。大量小 Actor 会浪费 GPU 内存每个 Actor 至少占用 1-2GB 上下文内存。建议每个 GPU 运行 1-2 个 Actor避免过度拆分。Autoscaler 的延迟Ray Autoscaler 从检测到队列积压到新 Worker 加入集群约需 1-3 分钟启动 EC2 实例注册到集群。对于突发流量需要预热池或超额配置。GCS 单点Ray 的全局控制服务GCS运行在 Head Node 上是单点。Head Node 故障会导致整个集群不可用。生产环境建议 Head Node 部署在高可用实例上并定期备份 GCS 状态。五、总结Ray 的核心价值是用统一的编程模型实现从单机到集群的无缝扩展。Remote Function 用于无状态并行任务Actor 用于有状态推理服务Placement Group 用于 GPU 拓扑调度。单机开发和集群部署代码无需修改。落地建议先用 Ray 本地模式开发和调试确认逻辑正确后切换到集群模式。训练任务用 Remote Function 手动 AllReduce推理服务用 Actor 负载均衡路由。Autoscaler 配合预热池应对突发流量。Head Node 部署在高可用实例上。
基于 Ray 的分布式 AI 训练与推理:从单机到集群的弹性扩展
基于 Ray 的分布式 AI 训练与推理从单机到集群的弹性扩展一、AI 训练的单机天花板模型太大一张卡装不下7B 参数的模型 FP16 精度需要 14GB 显存训练时加上梯度和优化器状态至少需要 56GB——单张 A100 80GB 勉强能跑但 13B 及以上的模型就必须多卡并行。更关键的是推理服务也需要弹性扩展——白天高峰需要 10 个推理副本夜间低谷只需要 2 个手动调整效率太低。Ray 的核心价值是用统一的编程模型实现从单机到集群的无缝扩展。单机开发时用 Ray 本地模式部署时切换到集群模式代码无需修改。Ray 的 Actor 模型天然适合推理服务有状态的长连接Task 模型适合训练任务无状态的并行计算。二、Ray 分布式架构graph TB subgraph Ray 集群 A[Head Nodebr/GCS Dashboard] -- B[Worker Node 1br/GPU 0,1] A -- C[Worker Node 2br/GPU 2,3] A -- D[Worker Node 3br/CPU only] end subgraph 编程模型 E[Remote Functionbr/无状态并行任务] -- F[训练数据并行] G[Actorbr/有状态长连接] -- H[推理服务] I[Placement Groupbr/资源拓扑约束] -- J[GPU 亲和调度] end subgraph 弹性伸缩 K[Autoscalerbr/根据队列深度扩缩] K -- B K -- C endRay 集群由 Head Node全局控制服务Dashboard和多个 Worker Node 组成。Remote Function 用于无状态的并行任务数据并行训练Actor 用于有状态的长连接服务推理Placement Group 用于 GPU 拓扑约束调度。三、实现3.1 分布式训练import ray # 初始化 Ray本地模式或集群模式 ray.init() ray.remote(num_gpus1) class TrainingWorker: 训练 Worker每个 GPU 一个 Actor def __init__(self, model_config: dict): self.model_config model_config self.model None self.optimizer None def setup(self): 初始化模型和优化器 import torch # 实际场景中加载模型 self.model torch.nn.Linear(768, 768).cuda() self.optimizer torch.optim.Adam( self.model.parameters(), lr1e-4 ) def train_step(self, batch: dict) - dict: 执行一步训练 import torch self.model.train() inputs torch.tensor(batch[input]).cuda() labels torch.tensor(batch[label]).cuda() self.optimizer.zero_grad() outputs self.model(inputs) loss torch.nn.functional.mse_loss(outputs, labels) loss.backward() self.optimizer.step() return {loss: loss.item()} def get_gradients(self) - dict: 获取梯度用于 AllReduce import torch return { name: param.grad.clone() for name, param in self.model.named_parameters() if param.grad is not None } def apply_gradients(self, avg_grads: dict): 应用平均梯度 import torch with torch.no_grad(): for name, param in self.model.named_parameters(): if name in avg_grads: param.grad avg_grads[name].to(param.device) class DistributedTrainer: 分布式训练协调器 def __init__(self, num_workers: int 4): self.num_workers num_workers self.workers [] def setup(self): 创建训练 Workers for _ in range(self.num_workers): worker TrainingWorker.remote(model_config{}) ray.get(worker.setup.remote()) self.workers.append(worker) def train_epoch( self, data_shards: list, num_steps: int 100 ) - list: 执行一轮训练 losses [] for step in range(num_steps): # 并行执行前向反向 futures [] for i, worker in enumerate(self.workers): batch data_shards[i % len(data_shards)] futures.append( worker.train_step.remote(batch) ) # 等待所有 Worker 完成 step_results ray.get(futures) step_loss sum( r[loss] for r in step_results ) / len(step_results) losses.append(step_loss) # AllReduce收集梯度并平均 grad_futures [ w.get_gradients.remote() for w in self.workers ] all_grads ray.get(grad_futures) # 简化平均梯度 avg_grads self._average_gradients(all_grads) # 应用平均梯度 apply_futures [ w.apply_gradients.remote(avg_grads) for w in self.workers ] ray.get(apply_futures) return losses def _average_gradients(self, all_grads: list) - dict: 平均梯度简化版 AllReduce import torch avg {} for name in all_grads[0]: avg[name] torch.stack( [g[name] for g in all_grads] ).mean(0) return avg3.2 弹性推理服务ray.remote(num_gpus0.5) class InferenceActor: 推理 Actor有状态的推理服务 def __init__(self, model_path: str): self.model_path model_path self.model None self.request_count 0 def load_model(self): 加载模型 import torch # 实际场景中加载模型 self.model torch.nn.Linear(768, 768).cuda() self.model.eval() def predict(self, input_data: dict) - dict: 执行推理 import torch with torch.no_grad(): inputs torch.tensor( input_data[features] ).cuda() outputs self.model(inputs) self.request_count 1 return { output: outputs.cpu().tolist(), request_id: input_data.get(id), } def get_load(self) - int: 获取当前负载 return self.request_count class ElasticInferenceService: 弹性推理服务 def __init__( self, model_path: str, min_replicas: int 1, max_replicas: int 8, ): self.model_path model_path self.min_replicas min_replicas self.max_replicas max_replicas self.actors [] def start(self): 启动最小副本数 for _ in range(self.min_replicas): actor InferenceActor.remote(self.model_path) ray.get(actor.load_model.remote()) self.actors.append(actor) def predict(self, input_data: dict) - dict: 路由推理请求到负载最低的 Actor # 获取所有 Actor 的负载 load_futures [ a.get_load.remote() for a in self.actors ] loads ray.get(load_futures) # 选择负载最低的 Actor min_idx loads.index(min(loads)) return ray.get( self.actors[min_idx].predict.remote(input_data) ) def scale(self, target_replicas: int): 调整副本数 target max( self.min_replicas, min(self.max_replicas, target_replicas) ) current len(self.actors) if target current: # 扩容 for _ in range(target - current): actor InferenceActor.remote(self.model_path) ray.get(actor.load_model.remote()) self.actors.append(actor) elif target current: # 缩容 removed self.actors[target:] self.actors self.actors[:target] for actor in removed: ray.kill(actor)四、Ray 分布式方案的 Trade-offs 分析Ray vs. Horovod/DeepSpeedHorovod 和 DeepSpeed 是专门的分布式训练框架对通信优化更深入梯度压缩、混合精度 AllReduce。Ray 的优势是通用性——训练和推理用同一套编程模型部署和运维更简单。如果只需要训练Horovod/DeepSpeed 更优如果训练推理数据处理需要统一调度Ray 更合适。Actor 模型的内存开销每个 Actor 是独立的进程有自己的 Python 解释器和 GPU 上下文。大量小 Actor 会浪费 GPU 内存每个 Actor 至少占用 1-2GB 上下文内存。建议每个 GPU 运行 1-2 个 Actor避免过度拆分。Autoscaler 的延迟Ray Autoscaler 从检测到队列积压到新 Worker 加入集群约需 1-3 分钟启动 EC2 实例注册到集群。对于突发流量需要预热池或超额配置。GCS 单点Ray 的全局控制服务GCS运行在 Head Node 上是单点。Head Node 故障会导致整个集群不可用。生产环境建议 Head Node 部署在高可用实例上并定期备份 GCS 状态。五、总结Ray 的核心价值是用统一的编程模型实现从单机到集群的无缝扩展。Remote Function 用于无状态并行任务Actor 用于有状态推理服务Placement Group 用于 GPU 拓扑调度。单机开发和集群部署代码无需修改。落地建议先用 Ray 本地模式开发和调试确认逻辑正确后切换到集群模式。训练任务用 Remote Function 手动 AllReduce推理服务用 Actor 负载均衡路由。Autoscaler 配合预热池应对突发流量。Head Node 部署在高可用实例上。