CANN/GE LLM-DataDist Python 功能指南 功能介绍【免费下载链接】geGEGraph Engine是面向昇腾的图编译器和执行器提供了计算图优化、多流并行、内存复用和模型下沉等技术手段加速模型执行效率减少模型内存占用。 GE 提供对 PyTorch、TensorFlow 前端的友好接入能力并同时支持 onnx、pb 等主流模型格式的解析与编译。项目地址: https://gitcode.com/cann/ge链路管理功能介绍NN模型执行时调用的HCCL集合通信接口是双边通信即需要两边同时发起建链而在P-D分离方案中简化建链操作由Client单侧发起建链。由于动态扩缩的部分大多数是Decode侧因此将P定义为Server端D定义为Client端建链过程实现由D向P发起建链的流程。主要提供的是link_clusters和unlink_clusters两个接口都是由D侧进行调用建链行为是点对点的。link_clusters用于节点之间的建链unlink_clusters用于节点之间的断链使用场景建链操作是PD之间进行KV Cache传输的前提所以使能KV Cache传输功能前需要先建链。集群可靠性场景下当P或者D集群节点出现异常时在不影响整个集群可用性前提下通过断链下线对应故障节点。通过建链和断链动态调整PD集群配比。根据闲忙动态的增加或减少对应的机器节点。增加节点需要建链减少节点需要断链。功能示例此处代码示例为1P1D之间建链的伪代码流程。拉起P和D侧脚本脚本中调用LLM-DataDist的初始化接口。P侧需要设置侦听的Device IP和port。# P侧脚本 from llm_datadist import LLMDataDist, LLMRole, LLMStatusCode, LLMClusterInfo # llm datadist初始化 llm_datadist LLMDataDist(LLMRole.Prompt, cluster_id0) llm_config LLMConfig() llm_config.listen_ip_info 192.168.1.1:26000# local_ip port llm_config.device_id 0 # 此处的device_id需要和local_ip匹配 llm_options llm_config.generate_options() llm_datadist.init(llm_options)# D侧脚本 from llm_datadist import LLMDataDist, LLMRole, LLMStatusCode, LLMClusterInfo # llm datadist初始化 llm_datadist LLMDataDist(LLMRole.DECODER, cluster_id0) llm_config LLMConfig() llm_config.device_id 0 llm_options llm_config.generate_options() llm_datadist.init(llm_options)在D侧脚本中调用link_clusters发起建链操作当业务退出时在D侧调用unlink_clusters进行断链。# 生成cluster info信息用于建链 cluster LLMClusterInfo() cluster.remote_cluster_id 1 # 此处的remote_cluster_id需要和P侧创建的LLMDataDist对应 cluster.append_local_ip_info(192.168.2.1, 26000) # local_ip_info的IP是本机需要建链的Device IP地址 cluster.append_remote_ip_info(192.168.1.1, 26000) # remote_ip_info的IP是想和对端建链的Device IP地址 # 调用link_clusters进行建链 # ret是接口的返回值rets表示每个cluster建链的结果 ret, rets llm_datadist.link_clusters([cluster], timeout5000) # 判断建链结果 if ret ! LLMStatusCode.LLM_SUCCESS: raiseException(link failed.) for cluster_i in range(len(rets)): link_ret rets[cluster_i] if link_ret ! LLMStatusCode.LLM_SUCCESS: print(f{cluster_i} link failed.)在D侧可以调用check_link_status快速检查链路传输数据是否正常。try: llm_datadist.check_link_status(remote_cluster_id1) except LLMException as ex: print(fcheck_link_status exception:{ex.status_code}) raise ex业务结束D侧进行断链P和D都调用llm_datadist的finalize释放资源。# P侧脚本 # 调用llm_datadist申请KV Cache # 执行业务推理 # ... # 业务退出需等D侧断链后调用 llm_datadist.finalize()# D侧脚本 # pull_cache、模型推理 # ... # 业务退出调用unlink_clusters进行断链 ret, rets llm_datadist.unlink_clusters([cluster], timeout5000) if ret ! LLMStatusCode.LLM_SUCCESS: raiseRuntimeError(f[unlink_cluster] failed, ret{ret}) llm_datadist.finalize()当新增节点或者已下线节点再上线时需要执行一遍上述使用流程。当下线节点时正常情况下D侧需要主动调用unlink_clusters接口__如果D侧无法调用unlink_clusters接口则需要P侧调用unlink_clusters接口。通过节点上线调用link_clusters接口节点下线调用unlink_clusters接口来灵活的进行分布式集群的动态扩缩容。异常处理当D侧出现异常导致无法调用unlink_clusters时需要由P侧调用unlink_clusters进行资源清理否则无法再次进行建链。unlink_clusters接口提供了强制断链的能力该能力适用于链路故障时普通断链操作会耗时比较久。使用强制断链接口设置forceTrue需要两侧都发起调用只会清理本端的链接。# 强制断链 ret, rets llm_datadist.unlink_clusters([cluster], timeout5000, forceTrue) if ret ! LLMStatusCode.LLM_SUCCESS: raise RuntimeError(f[unlink_clusters] failed, ret{ret})KV Cache管理功能介绍在LLM-DataDist初始化时会预申请一块指定大小的内存池由ge.flowGraphMemMaxSize配置项决定其大小后续的KV Cache的内存申请及释放都在预申请的内存上进行相比每次申请一块内存可以节省耗时。KV Cache管理涉及的主要接口及功能如下表1KV Cache管理的主要接口及功能|接口名称|功能| |--|--| |allocate_cache|分配cache。cache分配成功后会同时被cache_id与cache_keys引用只有当这些引用都解除后cache所占用的资源才会实际释放。cache_keys会在LLMRole为PROMPT时传入。| |deallocate_cache|释放cache。如果该cache在分配内存时关联了cache_keys则实际的释放会延后到所有的cache_keys都已经释放cache_keys的引用则可以通过以下2种方式解除Decode调用pull_cache接口成功后解除。Prompt调用remove_cache_key接口时解除。| |remove_cache_key|移除cache_key。移除cache_key后该cache将无法再被pull_cache拉取。仅当LLMRole为PROMPT时可调用LLMRole为DECODER时因为allocate_cache申请时不需要传入cache_key所以不需要调用。| |pull_cache|根据CacheKey从对应的Prompt节点拉取KV到本地KV Cache仅当LLMRole为DECODER时可调用。该CacheKey需要和allocate_cache的CacheKey保持一致。| |copy_cache|拷贝KV Cache。当期望pull_cache和其他使用KV Cache的操作流水时可以额外申请一块中转cache。当其他流程在使用KV Cache时可以先将下一次的KV pull到中转cache待其他流程使用完KV Cache后拷贝到指定的位置从而将pull_cache的耗时隐藏减少总耗时。公共前缀场景在新请求推理前可以将公共前缀拷贝到新的内存中和当前请求的KV合并推理。| |allocate_blocks_cache|PagedAttention场景下分配多个blocks的Cache。| |pull_blocks|PA场景下KV的拉取接口。和pull_cache的差异是pull_blocks是按照block_index拉取的对应位置的KV Cache。| |copy_blocks|PA场景下KV的拷贝接口。和copy_cache的差异是copy_blocks是按照block_index拷贝的对应位置的KV Cache。在使用场景上也存在差异copy_blocks主要是针对当多个回答需要共用相同blockblock没填满时新增的token需要拷贝到新的block上继续迭代。| |swap_blocks|PA场景下对应block_index上KV内存的换入换出主要面向用户需要自行管理KV内存的场景。| |transfer_cache_async|由Prompt节点发起将cache的数据传输到Decode。| |push_blocks|PA场景下KV的推送接口从对应的Prompt节点推送KV到远端KV Cache。| |push_cache|从对应的Prompt节点推送KV到远端KV Cache。|使用场景主要用于分布式集群间的KV Cache传输。功能示例一般Cache传输场景本示例介绍一般Cache传输场景下接口的使用主要涉及KV Cache的申请、释放、传输。如下将根据业务角色给出伪代码示例。P侧和D侧根据链路管理章节的示例完成LLMDataDist的初始化和建链操作。在P侧给每个请求申请对应大小的KV Cache内存以torch为例将KV Cache转换为torch tensor进行全量模型推理。import torchair import torch import torch_npu # 从初始化完的llm_datadist中获取kv_cache_manager kv_cache_manager llm_datadist.kv_cache_manager # 根据模型中KV Cache的shape以及总个数创建CacheDesc。此处shape只是示例实际填写网络中的kv cache shape。 cache_desc CacheDesc(num_tensors4, shape[4, 4, 8], data_typeDataType.DT_FLOAT16) # 根据初始化llm_datadist时的cluster_id创建对应请求的cache_key当P侧是多batch模型时需要创建batch数量的cache_key batch_num 8 kv_cache kv_cache_manager.allocate_cache(cache_desc, [CacheKey(prompt_cluster_id0, req_idi, model_id0) for i in range(batch_num)]) # 将申请好的KV Cache转换为torch tensor kv_tensor_addrs kv_cache.per_device_tensor_addrs[0] kv_tensors torchair.llm_datadist.create_npu_tensors(kv_cache.cache_desc.shape, torch.float16, kv_tensor_addrs) # 将转换的kv_tensors传给模型推理计算产生KV Cache将模型输出传输给增量推理模型作为输入在D侧申请一份用于模型执行的KV Cache内存。# 从初始化完的llm_datadist中获取kv_cache_manager kv_cache_manager llm_datadist.kv_cache_manager # 根据模型中KV Cache的shape以及总个数创建CacheDesc cache_desc CacheDesc(num_tensors4, shape[4, 4, 8], data_typeDataType.DT_FLOAT16) # 调用allocate_cache接口申请对应请求的KV Cache内存 kv_cache kv_cache_manager.allocate_cache(cache_desc)将KV Cache从P侧传输到D侧有以下两种方式在D侧调用pull_cache接口拉取对应请求的KV Cache到申请的内存中。# 创建和P侧申请cache时相同的cache_key用于拉取对应的KV Cache cache_key CacheKey(prompt_cluster_id0, req_id1, model_id0) kv_cache_manager.pull_cache(cache_key, kv_cache, batch_index1) # 拉到batch index为1的位置上在P侧调用transfer_cache_async接口将数据传输至Decode。from llm_datadist import LayerSynchronizer, TransferConfig class LayerSynchronizerImpl(LayerSynchronizer): def __init__(self, events): self._events events def synchronize_layer(self, layer_index: int, timeout_in_millis: Optional[int]) - bool: self._events[layer_index].wait() return True events [torch.npu.Event() for _ in range(cache_desc.num_tensors // 2)] # 执行模型模型在各层计算完成后调用events[layer_index].record()记录完成状态 # 模型执行由用户实现 # user_model.Predict(kv_tensors, events) # 模型下发完成后调用transfer_cache_async传输数据此处需要填写Decode已申请的KV Cache各层tensor的内存地址 transfer_config TransferConfig(DECODER_CLUSTER_ID, decoder_kv_cache_addrs) cache_task kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(events), [transfer_config]) # 同步等待传输结果 cache_task.synchronize()以torch为例将KV Cache转换为torch tensor进行增量模型推理。# 将申请好的KV Cache转换为框架中的KV Cache类型不同框架中都需要提供根据KV Cache地址创建对应类型的KV Cache的接口。此处以PyTorch为例 # 转换操作和pull操作顺序不分先后 kv_tensor_addrs kv_cache.per_device_tensor_addrs[0] kv_tensors torchair.llm_datadist.create_npu_tensors(kv_cache.cache_desc.shape, torch.float16, kv_tensor_addrs) # 将转换后的tensor拆分为框架需要的KV配对方式可以自定义组合KV mid len(kv_tensors) // 2 k_tensors kv_tensors[: mid] v_tensors kv_tensors[mid:] kv_cache_tensors list(zip(k_tensors, v_tensors)) # 将转换的kv_tensors传给模型进行迭代推理 # 等待请求增量推理完成根据业务中cache的使用时机自行释放对应请求的KV Cache内存。# P侧当batch中所有请求的KV Cache都被拉走后调用deallocate_cache才会真正释放内存。如果D侧未拉走KV Cache则还需要调用remove_cache_key。 kv_cache_manager.remove_cache_key(cache_key_0) kv_cache_manager.remove_cache_key(cache_key_2) kv_cache_manager.deallocate_cache(kv_cache)# D侧由于申请时不需要cache_key所以释放时只需要调用deallocate接口。 kv_cache_manager.deallocate_cache(kv_cache)业务退出时P侧和D侧根据链路管理章节的示例进行断链然后调用finalize接口释放资源。功能示例Blocks Cache传输场景本示例介绍Blocks Cache将Cache使用块状形式管理传输场景下接口的使用主要涉及KV Cache的申请、释放、传输。如下将根据业务角色给出伪代码示例。P侧和D侧根据链路管理的示例完成LLMDataDist的初始化和建链操作。在P侧和D侧模型的每层按照计算好的num_blocks数量调用AllocateCache申请KV Cache。Blocks Cache场景下不同请求对创建的num_blocks大小的KV Cache进行复用上层框架自行管理业务结束后释放申请的内存。# P侧 # 从初始化完的llm_datadist中获取kv_cache_manager kv_cache_manager llm_datadist.kv_cache_manager # 根据模型中KV Cache的shape以及总个数创建CacheDesc。PA场景的kv cache shape通常为[num_blocks, block_size,...,...] num_blocks 10 block_mem_size 128 cache_desc CacheDesc(num_tensors4, shape[num_blocks, block_mem_size], data_typeDataType.DT_FLOAT16) # 根据初始化llm_datadist时的cluster_id创建对应请求的BlocksCacheKey cache_key BlocksCacheKey(prompt_cluster_id0, model_id0) # 调用allocate_blocks_cache接口申请KV Cache内存 kv_cache kv_cache_manager.allocate_blocks_cache(cache_desc, cache_key) # 将申请好的KV Cache转换为框架中的KV Cache类型不同框架中需要提供根据KV Cache的地址创建对应类型的KV Cache的接口。此处以PyTorch为例 kv_tensor_addrs kv_cache.per_device_tensor_addrs[0] kv_tensors torchair.llm_datadist.create_npu_tensors(kv_cache.cache_desc.shape, torch.float16, kv_tensor_addrs)# D侧 kv_cache_manager llm_datadist.kv_cache_manager num_blocks 10 block_mem_size 128 cache_desc CacheDesc(num_tensors4, shape[10, 128], data_typeDataType.DT_FLOAT16) kv_cache kv_cache_manager.allocate_blocks_cache(cache_desc) kv_tensor_addrs kv_cache.per_device_tensor_addrs[0] kv_tensors torchair.llm_datadist.create_npu_tensors(kv_cache.cache_desc.shape, torch.float16, kv_tensor_addrs)P侧有新请求进来后推理框架会给每个请求分配好对应的block_index。模型推理完之后该请求对应的KV Cache就在对应的block_index所在的内存上将模型输出和请求对应的block_table传输给D侧推理模型作为输入。D侧有新请求进来后推理框架也会给每个请求分配好对应的block_index然后调用pull_blocks接口根据P侧的block_index和D侧的block_index的对应关系将KV Cache传输到指定位置。此时有两种方式在D侧调用pull_blocks接口拉取KV Cache。# D侧根据P侧传过来的信息添加新请求并申请对应的block_table # D侧根据传过来请求的src_block_table和新申请的dst_block_table拉取KV到对应block cache_key BlocksCacheKey(prompt_cluster_id0, model_id0) # P侧allocate_blocks_cache时的入参 kv_cache_manager.pull_blocks(cache_key, cache, [0, 1], [2, 3]) # 将P侧0, 1 block位置上的数据拉到D侧2, 3 block位置上在P侧调用transfer_cache_async接口时将数据传输至D侧。# 实现LayerSynchronizerImpl通过torch Event获取各层计算结束状态本例中通过Event机制实现 class LayerSynchronizerImpl(LayerSynchronizer): def __init__(self, events): self._events events def synchronize_layer(self, layer_index: int, timeout_in_millis: Optional[int]) - bool: self._events[layer_index].wait() return True events [torch.npu.Event() for _ in range(cache_desc.num_tensors // 2)] # 执行模型,模型在各层计算完成后调用events[layer_index].record()记录完成状态 # 该函数由用户实现 user_model.Predict(kv_cache_tensors, events) # 模型下发完成后调用transfer_cache_async传输数据此处需要填写Decode申请出的KV Cache各层tensor的内存地址 transfer_config TransferConfig(DECODER_CLUSTER_ID, decoder_kv_cache_addrs) cache_task kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(events), [transfer_config], [0, 1], [2, 3]) # 同步等待传输结果 cache_task.synchronize()业务结束后P侧和D侧调用deallocate_cache释放已申请的KV Cache内存。# 等待D侧拉取完对应请求的KV Cache # 根据业务中cache的使用时机自行释放对应请求的KV Cache。PA场景无需释放对应的cache_key kv_cache_manager.deallocate_cache(kv_cache)业务退出时P侧和D侧根据集群断链的示例进行断链和llm_datadist的finalize。异常处理错误码LLM_DEVICE_OUT_OF_MEMORY表示Device申请KV Cache内存失败。需要检查初始化时设置的ge.flowGraphMemMaxSize大小以及申请KV Cache的大小查看是否有请求KV Cache拉取之后没有释放内存。错误码LLM_KV_CACHE_NOT_EXIST表示对端KV Cache不存在需要检查对端进程是否异常或者对应KV Cache的请求有没有推理完成。该错误不影响其他请求流程确认流程后可以重试。错误码LLM_WAIT_PROCESS_TIMEOUT表示pull KV超时说明链路出现问题需要重新断链建链再尝试。【免费下载链接】geGEGraph Engine是面向昇腾的图编译器和执行器提供了计算图优化、多流并行、内存复用和模型下沉等技术手段加速模型执行效率减少模型内存占用。 GE 提供对 PyTorch、TensorFlow 前端的友好接入能力并同时支持 onnx、pb 等主流模型格式的解析与编译。项目地址: https://gitcode.com/cann/ge创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考