CANN ops-transformer:AllReduce 与 AllGather 在分布式推理中的选型

CANN ops-transformer:AllReduce 与 AllGather 在分布式推理中的选型 个人主页ujainu文章目录前言分布式推理的通信需求AllReduce 原理Ring AllReduceTree AllReduce带宽优化AllGather 原理Gather 语义内存开销适用场景ops-transformer 中的选型策略KV Cache 广播用 AllGather梯度同步用 AllReduce性能对比延迟对比带宽利用率卡数扩展性关键警告警告 1AllGather 的内存爆炸警告 2AllReduce 的死锁风险结尾行动指引前言在大模型推理部署中昇腾CANN 作为昇腾NPU 的算子公共平台中间件通过 ops-transformer 组件提供高效的分布式推理能力。随着模型规模突破千亿参数单卡显存已无法承载完整的模型权重和KV Cache分布式推理成为必然选择。ops-transformer 作为昇腾CANN 的核心算子加速库在分布式推理场景下需要精心选择集合通信原语以平衡通信开销与计算效率。本文将深入剖析 AllReduce 与 AllGather 在 ops-transformer 分布式推理中的选型策略帮助开发者理解其设计理念、架构实现和性能优化要点。分布式推理的通信需求分布式推理通过模型并行策略将计算图切分到多卡上执行不同并行策略产生差异化的通信模式张量并行TP将模型的每一层切分到多卡前向和反向传播需要在每层结束时进行激活值的规约或广播。TP 的通信特点是高频次、小消息对延迟敏感。流水线并行PP将模型的不同层分配到不同卡上形成流水线。PP 的通信集中在相邻层之间的激活值和梯度传递通信量中等但对时序要求严格。专家并行EP用于 MoE 模型将不同的专家网络部署在不同卡上。EP 的通信特点是稀疏且动态需要根据路由结果进行 All-to-All 通信。ops-transformer 在设计时充分考虑了这些通信模式的差异。对于 TP 中的层间同步通常选择 AllReduce对于 KV Cache 的跨卡广播则采用 AllGather。这种差异化选型源于两种原语在语义和性能上的本质区别。AllReduce 原理AllReduce 是集合通信中的规约操作将所有进程的数据进行聚合如求和、求平均后再分发到每个进程。在分布式推理中AllReduce 主要用于梯度同步或激活值规约。Ring AllReduceRing AllReduce 通过构建逻辑环实现高效通信分为 Scatter-Reduce 和 AllGather 两个阶段。在 Scatter-Reduce 阶段每个节点将部分数据规约到环上的下一个节点在 AllGather 阶段将规约结果广播到所有节点。# Ring AllReduce 伪代码示例defring_allreduce(tensor,rank,world_size):# Scatter-Reduce 阶段forstepinrange(world_size-1):send_idx(rankstep)%world_size recv_idx(rankstep1)%world_size send_datatensor[send_idx::world_size]recv_datatensor[recv_idx::world_size]# 执行规约操作tensor[recv_idx::world_size]reduce(send_data,recv_data)# AllGather 阶段forstepinrange(world_size-1):send_idx(rank-stepworld_size)%world_size recv_idx(rank-step-1world_size)%world_size send_datatensor[send_idx::world_size]recv_datatensor[recv_idx::world_size]# 广播规约结果tensor[recv_idx::world_size]recv_dataRing AllReduce 的通信复杂度为 O(N)其中 N 是数据量与卡数无关。这使得它在大规模集群中表现出优异的带宽利用率。Tree AllReduceTree AllReduce 采用树形拓扑进行规约分为 Reduce-Scatter 和 Broadcast 两个阶段。在 Reduce-Scatter 阶段叶子节点向父节点发送数据并规约在 Broadcast 阶段根节点的结果向下广播。// Tree AllReduce C 实现片段voidhcclAllReduce(constvoid*sendbuf,void*recvbuf,size_t count,hcclDataType_t datatype,hcclRedOp_t op,hcclComm_t comm){// 构建逻辑树拓扑intrankhcclGetRank(comm);intnrankshcclGetNumRanks(comm);// Reduce-Scatter 阶段treeReduceScatter(sendbuf,recvbuf,count,datatype,op,comm);// Broadcast 阶段treeBroadcast(recvbuf,count,datatype,comm);}Tree AllReduce 的通信复杂度为 O(log N)其中 N 是卡数。在小规模集群中Tree 算法通常比 Ring 更快但在大规模场景下Ring 的带宽优势更为明显。带宽优化hcclHuawei Collective Communications Library通过以下策略优化 AllReduce 的带宽利用率流水线化将大数据切分为多个 chunk在不同 chunk 间实现计算与通信的流水线拓扑感知根据昇腾NPU 的物理拓扑如 HCCS 互联选择最优的 Ring 或 Tree 构建方式数据类型优化针对 FP16、BF16 等低精度数据类型使用专门的 kernel 实现# 启动分布式推理时配置 AllReduce 算法exportHCCL_ALGORing# 或 TreeexportHCCL_BUFFSIZE2048# 通信缓冲区大小MBexportHCCL_RDMA_TC96# RDMA 流量控制# 启动推理服务python-mops_transformer.inference\--model-path /path/to/model\--tp-size8\--comm-algo ringAllGather 原理AllGather 是集合通信中的收集操作将每个进程的数据片段收集到一起形成完整的数据视图。在分布式推理中AllGather 主要用于 KV Cache 的跨卡广播或模型权重的分布式加载。Gather 语义AllGather 的语义是每个进程 i 持有数据块 D_i操作后所有进程都拥有 [D_0, D_1, …, D_{N-1}] 的完整数据。这与 AllReduce 的规约语义有本质区别AllGather 不做计算只做数据重排。# AllGather 的直观理解# 初始状态# Rank 0: [A]# Rank 1: [B]# Rank 2: [C]# Rank 3: [D]# AllGather 后# Rank 0: [A, B, C, D]# Rank 1: [A, B, C, D]# Rank 2: [A, B, C, D]# Rank 3: [A, B, C, D]内存开销AllGather 的内存开销与参与进程数和单个数据块大小成正比。在 KV Cache 广播场景中假设序列长度为 S头数为 H头维度为 D卡数为 N则每个卡需要存储 N × S × H × D 的 KV Cache。这在长序列推理时会成为显存瓶颈。ops-transformer 通过以下方式优化 AllGather 的内存开销分块 AllGather将 KV Cache 按层或按头分块按需拉取减少峰值显存KV Cache 共享在多轮对话中已计算的 KV Cache 可以跨请求共享避免重复 AllGather量化压缩对 KV Cache 进行 INT8 或 INT4 量化减少通信量和显存占用// Ascend C 实现的量化 AllGather 内核__global__voidquantized_allgather_kernel(constint8_t*input,// INT8 量化后的输入float*output,// FP32 反量化输出constfloat*scale,// 量化缩放因子inttotal_size,intrank,intworld_size){inttidblockIdx.x*blockDim.xthreadIdx.x;intchunk_sizetotal_size/world_size;intstartrank*chunk_size;intendstartchunk_size;for(intistarttid;iend;iblockDim.x*gridDim.x){// AllGather 通信省略 hccl 调用// 反量化INT8 - FP32output[i](float)input[i]*scale[i/chunk_size];}}适用场景AllGather 在以下场景中优于 AllReduce数据分发需要将每卡的数据片段聚合为完整视图如 KV Cache 广播只读数据聚合后的数据不会被修改无需规约计算内存带宽受限当计算瓶颈在内存带宽而非计算能力时AllGather 的简洁语义可以减少 kernel 启动开销ops-transformer 中的选型策略ops-transformer 在分布式推理中采用差异化的通信原语选型策略核心原则是规约操作用 AllReduce数据广播用 AllGather。KV Cache 广播用 AllGather在自回归生成场景中每层的 Self-Attention 需要访问历史所有 token 的 KV Cache。在 TP 模式下KV Cache 分布在不同的卡上需要通过 AllGather 将各卡的 KV Cache 片段聚合为完整视图。# ops-transformer 中 KV Cache 的 AllGather 实现classKVCacheAllGatherer:def__init__(self,rank,world_size,n_heads,head_dim):self.rankrank self.world_sizeworld_size self.n_headsn_heads self.head_dimhead_dimdefgather_kv_cache(self,local_k,local_v): local_k: [seq_len, n_heads//world_size, head_dim] local_v: [seq_len, n_heads//world_size, head_dim] # 初始化接收缓冲区full_ktorch.zeros(local_k.shape[0],self.n_heads,self.head_dim,dtypelocal_k.dtype,devicelocal_k.device)full_vtorch.zeros_like(full_k)# 调用 hccl AllGatherhccl.allgather(local_k,full_k,local_k.numel(),hcclDataType_t.HCCL_FLOAT16,self.rank,self.world_size)hccl.allgather(local_v,full_v,local_v.numel(),hcclDataType_t.HCCL_FLOAT16,self.rank,self.world_size)returnfull_k,full_v选型理由KV Cache 是只读数据推理阶段不会更新无需规约计算AllGather 的语义与 KV Cache 广播完全匹配通过分块 AllGather 可以控制峰值显存梯度同步用 AllReduce在训练或微调场景中需要同步各卡的梯度。这时必须使用 AllReduce因为梯度需要在所有卡上保持一致。// ops-transformer 中梯度同步的 AllReduce 调用voidsync_gradients(float*gradients,size_t grad_size,hcclComm_t comm){// 使用 Ring AllReduce 进行梯度同步hcclResult_t rethcclAllReduce(gradients,// 发送缓冲区gradients,// 接收缓冲区原地操作grad_size,// 梯度元素个数hcclDataType_t.HCCL_FLOAT32,hcclRedOp_t.HCCL_SUM,// 求和规约comm);if(ret!hcclResult_t.HCCL_SUCCESS){printf(hcclAllReduce failed: %d\n,ret);return;}// 除以卡数得到平均梯度floatscale1.0f/hcclGetNumRanks(comm);scale_tensor(grad_size255)/256,256(gradients,scale,grad_size);}选型理由梯度同步需要规约计算求和或平均AllReduce 的语义完全匹配Ring AllReduce 的 O(N) 复杂度在大规模集群中带宽利用率更高原地in-placeAllReduce 可以减少内存拷贝开销性能对比延迟对比在 8 卡昇腾NPU 集群上对不同消息大小测试 AllReduce 和 AllGather 的延迟消息大小AllReduce (Ring)AllReduce (Tree)AllGather1 MB12 μs8 μs10 μs16 MB45 μs52 μs38 μs256 MB320 μs480 μs280 μs观察小消息 16 MB时Tree AllReduce 延迟更低大消息 16 MB时Ring AllReduce 和 AllGather 延迟更低AllGather 的延迟通常低于 AllReduce因为它无需规约计算带宽利用率带宽利用率定义为有效数据量 / (通信时间 × 理论带宽)。在 8 卡 HCCS 互联单向带宽 64 GB/s环境下测试# 使用 hccl-test 工具测试带宽hccl-test--opallreduce--datatypefp16--minbytes1024--maxbytes268435456--stepfactor2# 测试结果部分# Message Size: 16 MB# Ring AllReduce: 45.2 GB/s (70.6% 利用率)# Tree AllReduce: 38.7 GB/s (60.5% 利用率)# AllGather: 48.9 GB/s (76.4% 利用率)# Message Size: 256 MB# Ring AllReduce: 58.3 GB/s (91.1% 利用率)# Tree AllReduce: 41.2 GB/s (64.4% 利用率)# AllGather: 60.1 GB/s (93.9% 利用率)观察大消息下Ring AllReduce 和 AllGather 的带宽利用率接近理论峰值Tree AllReduce 的带宽利用率受限于树形拓扑的拥塞AllGather 的带宽利用率略高于 AllReduce因为通信模式更简单卡数扩展性测试不同卡数下AllReduce 和 AllGather 的通信时间消息大小 64 MB# 卡数扩展性测试脚本importhcclimporttorchimporttimedefbenchmark_allreduce(world_size,msg_size):tensortorch.randn(msg_size,dtypetorch.float16,devicenpu)hccl.init()starttime.time()hccl.allreduce(tensor,ophccl.ReduceOp.SUM)hccl.synchronize()elapsedtime.time()-startreturnelapseddefbenchmark_allgather(world_size,msg_size):local_tensortorch.randn(msg_size//world_size,dtypetorch.float16,devicenpu)full_tensortorch.zeros(msg_size,dtypetorch.float16,devicenpu)hccl.init()starttime.time()hccl.allgather(local_tensor,full_tensor)hccl.synchronize()elapsedtime.time()-startreturnelapsed# 测试结果秒# World Size | AllReduce | AllGather# 2 | 0.008 | 0.006# 4 | 0.012 | 0.009# 8 | 0.018 | 0.014# 16 | 0.032 | 0.024# 32 | 0.058 | 0.042观察两种操作的通信时间都随卡数增加而增长但 AllGather 的增长更慢在 32 卡集群中AllGather 的通信时间比 AllReduce 低 28%大规模集群下Ring AllReduce 的扩展性优于 Tree AllReduce关键警告警告 1AllGather 的内存爆炸在使用 AllGather 广播 KV Cache 时如果序列长度很大如 32K tokens显存占用会急剧增长。假设头数 H32头维度 D128卡数 N8则单卡需要存储的 KV Cache 大小为Memory 2 × N × S × H × D × sizeof(dtype) 2 × 8 × 32768 × 32 × 128 × 2 bytes (FP16) ≈ 4.3 GB这还不包括其他激活值和模型权重。在实际部署中必须采用以下策略之一分块 AllGather只拉取当前层需要的 KV CacheKV Cache 量化使用 INT8 或 INT4 量化稀疏注意力只保留最近的 KV Cache# 分块 AllGather 实现示例defchunked_allgather(tensor_chunks,chunk_size): tensor_chunks: List[Tensor], 每个元素是一块的本地数据 chunk_size: 每块的大小 full_tensor[]forchunk_idx,local_chunkinenumerate(tensor_chunks):# 只 AllGather 当前块chunk_buffertorch.zeros(chunk_size*world_size,dtypelocal_chunk.dtype,devicelocal_chunk.device)hccl.allgather(local_chunk,chunk_buffer)full_tensor.append(chunk_buffer)returntorch.cat(full_tensor,dim0)警告 2AllReduce 的死锁风险在使用 AllReduce 进行梯度同步时如果不同卡上的调用顺序不一致会导致死锁。例如卡 0 先调用 AllReduce(A)再调用 AllReduce(B)而卡 1 先调用 AllReduce(B)再调用 AllReduce(A)。这时两个 AllReduce 操作会互相等待导致死锁。解决方案统一的调用顺序确保所有卡上的集合通信调用顺序完全一致使用通信组将相关的 AllReduce 操作放到同一个通信组中异步通信使用 hcclAllReduce 的异步版本通过事件同步// 错误的调用顺序会导致死锁// Rank 0:hcclAllReduce(A,...);// 先 AhcclAllReduce(B,...);// 后 B// Rank 1:hcclAllReduce(B,...);// 先 B - 死锁hcclAllReduce(A,...);// 后 A// 正确的做法使用通信组hcclComm_t comm_ab;hcclCommCreate(comm_ab,2,{0,1});// 创建只包含卡 0 和卡 1 的通信组// Rank 0 和 Rank 1 都执行hcclAllReduce(A,...,comm_ab);// 在通信组 comm_ab 中同步 AhcclAllReduce(B,...,comm_ab);// 在通信组 comm_ab 中同步 B结尾行动指引本文深入剖析了 CANN ops-transformer 中 AllReduce 与 AllGather 的选型策略。理解这两种集合通信原语的原理和适用场景对于优化分布式推理性能至关重要。学习建议深入学习 hccl 集合通信库的 API 和使用方法掌握 Ring 和 Tree 算法的底层实现阅读 ops-transformer 源码理解其通信原语选型的具体实现在实际项目中尝试不同的通信策略通过 profiling 工具找到最优配置参考资源ops-transformer 开源仓库https://atomgit.com/cann/ops-transformerhccl 用户指南昇腾社区文档中心分布式推理性能优化白皮书昇腾技术社区通过本文的学习希望您能在大模型分布式推理部署中做出更优的通信原语选型充分发挥昇腾NPU 的计算能力。作者注本文基于 ops-transformer v1.2 版本编写示例代码仅供参考实际使用时请根据具体版本调整 API 调用方式。