mori通信库分析(一)——对称内存RDMA数据发送过程 前言在高性能计算领域对称内存Symmetric Memory是简化多进程/多GPU通信的关键抽象。AMD开源的MORI库基于IBGDA借助对称堆Symmetric Heap和虚拟内存管理VMM实现了高效的GPU间RDMA数据传输。本文简单分析设备端执行shmem_put时的数据发送路径。主机端初始化建立对称映射注册对称对象, RegisterSymmMemObj// 1. 分配CPU侧对象SymmMemObj*cpuMemObjnewSymmMemObj();cpuMemObj-localPtrlocalPtr;// 2. 通过Allgather交换本地基地址cpuMemObj-peerPtrscalloc(worldSize,sizeof(uintptr_t));bootNet.Allgather(localPtr,cpuMemObj-peerPtrs,sizeof(uintptr_t));// 现在 peerPtrs[pe] 是远端进程 pe 的本地堆基地址虚拟地址// 3. 处理P2P传输同节点cpuMemObj-p2pPeerPtrscalloc(worldSize,sizeof(uintptr_t));// 交换IPC句柄并打开得到可直接访问的映射地址hipIpcGetMemHandle(handle,localPtr);bootNet.Allgather(handle,ipcMemHandles,sizeof(hipIpcMemHandle_t));foreach peer i:if(CanUseP2P(i))hipIpcOpenMemHandle(cpuMemObj-p2pPeerPtrs[i],ipcMemHandles[i],...);// 对于非RDMA传输P2P/SDMA覆盖peerPtrs为本地可访问地址if(transportType!RDMA)cpuMemObj-peerPtrs[i]cpuMemObj-p2pPeerPtrs[i];// 4. RDMA注册仅当有RDMA peerif(anyRdmaPeer){RdmaMemoryRegion mrrdmaContext-RegisterRdmaMemoryRegion(localPtr,size);cpuMemObj-lkeymr.lkey;cpuMemObj-peerRkeys[rank]mr.rkey;}bootNet.Allgather(cpuMemObj-peerRkeys[rank],cpuMemObj-peerRkeys,sizeof(uint32_t));将cpuMemObj及其指针数组通过hipMemcpy拷贝到GPU并通过globalGpuStates-heapObj供设备端内核访问。ConfigureHeapInfoForGpu关键点peerPtrs在RDMA模式下存储的是远端虚拟地址在P2P模式下存储的是本进程映射的本地地址但设备端代码统一通过peerPtrs[pe] offset计算目标地址无需区分底层传输。设备端发送流程PutNbi APIsPutNbi APIs// // PutNbi APIs - Address-based only// __device____attribute__((visibility(default)))intmori_shmem_putmem_nbi_thread(void*dest,constvoid*source,size_t bytes,intpe,intqpId){mori::shmem::ShmemPutMemNbiThread(dest,source,bytes,pe,qpId);return0;}// // PutNbi APIs - Warp Scope (Address-based only)// __device____attribute__((visibility(default)))intmori_shmem_putmem_nbi_warp(void*dest,constvoid*source,size_t bytes,intpe,intqpId){mori::shmem::ShmemPutMemNbiWarp(dest,source,bytes,pe,qpId);return0;}// // PutNbi APIs - Block Scope (Address-based only)// __device____attribute__((visibility(default)))intmori_shmem_putmem_nbi_block(void*dest,constvoid*source,size_t bytes,intpe,intqpId){mori::shmem::ShmemPutMemNbiBlock(dest,source,bytes,pe,qpId);return0;}OpenSHMEM Style PutNbi APIs#defineDEFINE_SHMEM_PUT_MEM_NBI_ADDR_API_TEMPLATE(Scope)\inline__device__voidShmemPutMemNbi##Scope(void*dest,constvoid*source,size_t bytes,\intpe,intqpId0){\DISPATCH_TRANSPORT_TYPE(ShmemPutMemNbi##Scope##Kernel,pe,dest,source,bytes,pe,qpId);\}DEFINE_SHMEM_PUT_MEM_NBI_ADDR_API_TEMPLATE(Thread)DEFINE_SHMEM_PUT_MEM_NBI_ADDR_API_TEMPLATE(Warp)DEFINE_SHMEM_PUT_MEM_NBI_ADDR_API_TEMPLATE(Block)DISPATCH_TRANSPORT_TYPE#defineDISPATCH_TRANSPORT_TYPE(func,pe,...)\GpuStates*globalGpuStatesGetGlobalGpuStatesPtr();\application::TransportType transportTypeglobalGpuStates-transportTypes[pe];\if(transportTypeapplication::TransportType::RDMA){\funcapplication::TransportType::RDMA(__VA_ARGS__);\}elseif(transportTypeapplication::TransportType::P2P){\funcapplication::TransportType::P2P(__VA_ARGS__);\}elseif(transportTypeapplication::TransportType::SDMA){\funcapplication::TransportType::SDMA(__VA_ARGS__);\}else{\assert(false);\}ShmemPutMemNbiThread调用的函数为ShmemPutMemNbiThreadKernel。在多机集群场景中本节点数据传输走p2p路径跨节点数据传输走RDMA。如何区分通过目标 PE处理单元/进程和预先协商好的传输类型TransportType进行动态路由。整个决策流程在初始化时建表在运行时查表分支。在 Context::InitializePossibleTransportsMori 会探测当前节点与所有其他 PE 之间的物理连接能力并为每个 PE 标记一个 TransportTypeDISPATCH_TRANSPORT_TYPE根据globalGpuStates-transportTypes[pe]分发到对应函数。接下来我们分析RDMA路径下的数据传输过程。以ShmemPutMemNbiThreadKernelapplication::TransportType::RDMA为例。templateinline__device__voidShmemPutMemNbiThreadKernelapplication::TransportType::RDMA(constapplication::SymmMemObjPtr dest,size_t destOffset,constapplication::SymmMemObjPtr source,size_t sourceOffset,size_t bytes,intpe,intqpId){boolneed_turn{true};uint64_tturns__ballot(need_turn);while(turns){uint8_tlane__ffsll((unsignedlonglong)turns)-1;intpe_turn__shfl(pe,lane);if(pe_turnpe){DISPATCH_PROVIDER_TYPE_COMPILE_TIME(ShmemPutMemNbiThreadKernelImpl,dest,destOffset,source,sourceOffset,bytes,pe,qpId);need_turnfalse;}turns__ballot(need_turn);}}ShmemPutMemNbiThreadKernelImplShmemPutMemNbiThreadKernelImpl 是 MORI 库中 设备端非阻塞 Put 操作 的核心模板函数负责将一个对称内存区域的数据通过 RDMA 发送到远端 PE。该函数具备以下关键特性支持静态堆与 VMM 分块堆支持多种网卡MLX5/BNXT/PSDWarp 级协同预留 WQE 槽位减少原子竞争自动流控当发送队列满时主动回收完成事件。初始化与循环控制boolneedsChunkingglobalGpuStates-useVMMHeap;size_t currentOffset0;size_t remainingbytes;while(true){// 检查本线程是否还有剩余数据boolhas_remaining(remaining0);uint64_tactivemask__ballot(has_remaining);if(activemask0)break;...}_ballot 同步 Warp 内所有线程获得仍有数据待发送的线程掩码。若掩码为 0则所有线程已完成退出循环。只有掩码中的线程参与本轮发送其他线程跳过。计算传输大小与密钥根据 needsChunking 分两种情况静态堆模式lkey source-lkey全局统一srcAddr source-localPtr sourceOffset currentOffsetraddr dest-peerPtrs[pe] destOffset currentOffsetrkey dest-peerRkeys[pe]transfer_size remaining一次性发送剩余全部VMM 堆模式查询源端 ChunkVmmQueryLocalKey(srcAddr, remaining, lkey, src_chunk_size)查询目标端 ChunkVmmQueryRemoteAddr(dstAddr, pe, remaining, raddr, rkey, dst_chunk_size)transfer_size min(src_chunk_size, dst_chunk_size, remaining)注意VmmQuery* 会返回当前地址所在 Chunk 的剩余大小及对应的 Key。关键点VMM 模式下每次循环只传输一个 Chunk 内的数据确保 lkey 和 rkey 有效。对于 BNXT 网卡需要额外计算每个线程的 PSN包序列号数量ifconstexpr(PrvdTypeBNXT){psnCnt(transfer_sizewq-mtuSize-1)/wq-mtuSize;my_psn_exclWarpActivePsnPrefix(psnCnt,activemask,warp_total_psn);}WarpActivePsnPrefix 计算每个活跃线程之前的 PSN 总数用于后续分配 PSN 偏移。Warp 级 WQE 槽位预留步骤说明确定本次需要预留的 WQE 数量 num_wqesMLX5/PSD每个活跃线程 1 个 WQE → num_wqes num_active_lanesBNXT每个线程需要的 WQE 数为 psnCnt因为每个 WQE 可能携带多个 MTU 数据→ num_wqes warp_total_psn由 Leader 线程最后一个活跃线程执行原子预留if(is_leader){ifconstexpr(MLX5)warp_sq_counteratomicAdd(wq-postIdx,num_active_lanes);elseifconstexpr(BNXT){// 同时增加 msn 和 psnatomic_add_packed_msn_and_psn(wq-msnPack,num_active_lanes,warp_total_psn,...);warp_sq_counterwarp_msntbl_counter;// MSN 即 SQ 索引atomicMax(wq-postIdx,warp_sq_counternum_active_lanes);}}广播起始槽位warp_sq_counter __shfl(warp_sq_counter, leader_phys_lane_id)计算每个线程的私有 WQE 索引MLX5my_sq_counter warp_sq_counter my_logical_lane_idBNXTmy_sq_counter warp_sq_counter my_logical_lane_id但 my_msntbl_counter 和 my_psn_counter 另有偏移设计思想将多个线程的预留合并为一次原子操作降低争用。发送队列流控while(true){uint64_tdb_touchedwq-dbTouchIdx;// 已提交硬件但可能未完成uint64_tdb_donewq-doneIdx;// 已确认完成uint64_tactivedb_touched-db_done;uint64_tfreewq-sqWqeNum-active;uint64_tneed_until_endwarp_sq_counternum_wqes-db_touched;if(freeneed_until_end)break;// 队列不够主动等待完成ShmemQuietThreadKernelImplPrvdType(pe,qpId);}dbTouchIdx网卡已读取的 WQE 最大索引但可能未完成。doneIdx已完成CQE 已回收的最大索引。若空闲槽位不足以容纳本次预留则调用 ShmemQuietThreadKernelImpl 尝试从 CQ 中回收完成事件更新 doneIdx。循环直至满足条件。填充 WQE 与更新门铃调用 core::PostWrite。该函数根据网卡类型在 SQ 内存中写入 WQE返回一个门铃值包含需要通知硬件的状态。uint64_tdbr_val;ifconstexpr(MLX5){dbr_valPostWrite(wq,my_sq_counter,my_sq_counter,my_sq_counter,is_leader,qpn,srcAddr,lkey,raddr,rkey,transfer_size);}elseifconstexpr(BNXT){dbr_valPostWrite(wq,my_sq_counter,my_msntbl_counter,my_psn_counter,is_leader,qpn,srcAddr,lkey,raddr,rkey,transfer_size);}elseifconstexpr(PSD){dbr_valPostWrite(wq,my_sq_counter,my_sq_counter,my_sq_counter,is_leader,qpn,srcAddr,lkey,raddr,rkey,transfer_size);}不同网卡需要不同的索引MSN/PSN。is_leader 决定是否在 WQE 中设置“完成通知”标志通常只有最后一个 WQE 需要产生 CQE减少中断开销。内存屏障与门铃提交__threadfence_system();// 确保所有 WQE 写入全局可见if(is_leader){// 等待 dbTouchIdx 追上预留起始位置确保前面的 WQE 已被硬件读取while(wq-dbTouchIdx!warp_sq_counter){}// 更新网卡侧的 dbr 记录UpdateSendDbrRecord(wq-dbrRecAddr,warp_sq_counternum_wqes);__threadfence_system();// 敲响门铃RingDoorbell(wq-dbrAddr,dbr_val);__threadfence_system();// 更新 needConsIdx 并设置 dbTouchIdxatomicAdd(cq-needConsIdx,1);atomicStore(wq-dbTouchIdx,warp_sq_counternum_wqes);}UpdateSendDbrRecord 更新网卡侧的内存指针告诉硬件新的 WQE 范围。RingDoorbell 写入 PCIe 门铃寄存器触发硬件 DMA 读取 WQE 并执行。dbTouchIdx 的更新让其他线程知道这些槽位已被提交。特殊优化与细节多线程协同预留只有 Leader 线程执行原子操作其他线程通过 __shfl 获取索引避免所有线程争抢同一个原子变量。这种做法充分利用了 Warp 内通信的低延迟优势。流控与完成回收的原子锁ShmemQuietThreadKernelImpl 内部会尝试获取 CQ 锁pollCqLock只有获取到的线程执行 PollCq其他线程自旋等待或返回非最终静默场景。这保证了 CQ 处理的串行化。QueryRemoteAddrConcurrentPutImmThreadKernelPureAddr// New API: Using pure addresses__global__voidConcurrentPutImmThreadKernelPureAddr(intmyPe,uint32_t*localBuff){constexprintsendPe0;constexprintrecvPe1;uint32_tval42;intglobalTidblockIdx.x*blockDim.xthreadIdx.x;if(myPesendPe){// Calculate destination addressuint32_t*destlocalBuffglobalTid;// Use pure address-based APIShmemPutSizeImmNbiThread(dest,val,sizeof(uint32_t),recvPe);__threadfence_system();if(blockIdx.x0){ShmemQuietThread();}}else{// Wait for data to arrivewhile(atomicAdd(localBuffglobalTid,0)!val){}}}在RDMA模式下分析程序怎么把本地的dest地址转换为远程地址的。ShmemPutSizeImmNbiThread#defineSHMEM_PUT_SIZE_IMM_NBI_ADDR_API(Scope)\inline__device__voidShmemPutSizeImmNbi##Scope(void*dest,void*val,size_t bytes,intpe,\intqpId0){\DISPATCH_TRANSPORT_TYPE(ShmemPutSizeImmNbi##Scope##Kernel,pe,dest,val,bytes,pe,qpId);\}SHMEM_PUT_SIZE_IMM_NBI_ADDR_API(Thread)SHMEM_PUT_SIZE_IMM_NBI_ADDR_API(Warp)以ShmemPutSizeImmNbiThreadKernel为例其内部调用函数ShmemPutSizeImmNbiThreadKernelAddrImpl。// Convert addresses to remote addresses (supports both Static Heap and VMM Heap)uintptr_t raddr;uint32_trkey;QueryRemoteAddr(dest,pe,raddr,rkey);QueryRemoteAddr// Calculate offset within the symmetric heapsize_t offsetlocalAddrInt-globalGpuStates-heapBaseAddr;application::SymmMemObj*heapObjglobalGpuStates-heapObj;if(globalGpuStates-useVMMHeap){// VMM Heap: need to get chunk-specific rkeyVmmLookupRemote(localAddrInt,pe,out_raddr,out_rkey);}else{// Static Heap: direct rkey accessout_raddrheapObj-peerPtrs[pe]offset;out_rkeyheapObj-peerRkeys[pe];}out_raddr peerPtrs[pe] offset。VmmLookupRemote中也是类似处理。Reference[1] rocm mori[2] vLLM distributed inference with MoRI[3] [ROCm][PD] add moriio kv connector[4] nccl分析(二)——RDMA带外建链过程[5] RDMA带外建联过程