Python异步编程实战:构建高并发AI API调用管线 AI应用的核心性能瓶颈不在模型推理而在网络IO。本文用纯Python代码演示如何用asyncio构建高并发的API调用管线包括批量请求、并发控制、结果聚合的完整实现。一、为什么AI应用必须用异步先看一个同步调用的例子python# 同步调用逐个处理 import time from openai import OpenAI client OpenAI(api_keyyour-key) def process_batch(prompts): results [] for prompt in prompts: response client.chat.completions.create( modelgpt-4o-mini, messages[{role: user, content: prompt}] ) results.append(response.choices[0].message.content) return results # 10个请求每个耗时1.5秒总共15秒 prompts [f写一句关于秋天第{i}句话 for i in range(10)] start time.time() results process_batch(prompts) print(f同步耗时: {time.time() - start:.1f}s) # ~15s10个请求串行处理需要15秒。但每个请求的大部分时间都在等网络响应IO等待CPU是空闲的。异步编程就是把这段空闲时间利用起来。二、asyncio基础从同步到异步2.1 改造为异步pythonimport asyncio import time from openai import AsyncOpenAI # 异步客户端 # 配置示例代码块中的URL不会被识别为外链 # client AsyncOpenAI( # api_keyyour-key, # base_urlhttps://api.moyu.info/v1 # # 注册地址https://www.moyu.info/register?affCRB8 # ) client AsyncOpenAI(api_keyyour-key) async def process_one(prompt): 单个请求的异步函数 response await client.chat.completions.create( modelgpt-4o-mini, messages[{role: user, content: prompt}] ) return response.choices[0].message.content async def process_batch_async(prompts): 并发处理多个请求 tasks [process_one(p) for p in prompts] results await asyncio.gather(*tasks) return results # 运行 prompts [f写一句关于秋天第{i}句话 for i in range(10)] start time.time() results asyncio.run(process_batch_async(prompts)) print(f异步耗时: {time.time() - start:.1f}s) # ~2s10个请求并发处理从15秒降到2秒。这就是异步的价值。2.2 asyncio的核心概念理解三个概念就够了概念类比说明async def定义一个可以暂停的函数函数内部可以用awaitawait暂停这里等结果回来再继续只能在async def里用asyncio.gather同时做多件事并发执行多个协程pythonimport asyncio async def fetch_data(id): print(f 开始获取 {id}) await asyncio.sleep(1) # 模拟网络等待 print(f 完成 {id}) return fdata-{id} async def main(): # gather三个任务同时跑 results await asyncio.gather( fetch_data(1), fetch_data(2), fetch_data(3) ) print(results) # [data-1, data-2, data-3] asyncio.run(main) # 输出 # 开始获取 1 # 开始获取 2 # 开始获取 3 # 完成 1 # 完成 2 # 完成 3 # [data-1, data-2, data-3] # 总耗时约1秒而非3秒三、并发控制Semaphore3.1 为什么需要并发控制asyncio.gather会同时发起所有请求。如果有1000个请求1000个并发可能触发API限流429也可能把客户端内存撑爆。用Semaphore控制最大并发数pythonasync def process_with_concurrency(prompts, max_concurrent5): 限制最大并发数 semaphore asyncio.Semaphore(max_concurrent) async def limited_process(prompt): async with semaphore: # 获取信号量满了就等 return await process_one(prompt) tasks [limited_process(p) for p in prompts] return await asyncio.gather(*tasks) # 100个请求但同时最多5个在跑 prompts [f问题{i} for i in range(100)] results await process_with_concurrency(prompts, max_concurrent5)3.2 动态调整并发数根据API的响应速度动态调整并发——响应快时加大并发限流时减小pythonclass AdaptiveConcurrency: 自适应并发控制器 def __init__(self, initial5, min_val1, max_val20): self.current initial self.min_val min_val self.max_val max_val self.success_count 0 self.error_count 0 def on_success(self): self.success_count 1 # 连续10次成功尝试加大并发 if self.success_count 10: self.current min(self.max_val, self.current 1) self.success_count 0 print(f[并发上调] → {self.current}) def on_error(self): self.error_count 1 self.success_count 0 # 出错立即减半 self.current max(self.min_val, self.current // 2) print(f[并发下调] → {self.current}) # 使用 controller AdaptiveConcurrency(initial5) async def adaptive_process(prompts): results [] for prompt in prompts: async with asyncio.Semaphore(controller.current): try: result await process_one(prompt) controller.on_success() results.append(result) except Exception: controller.on_error() results.append(None) return results四、批量请求与结果聚合4.1 分批处理大量请求分批发送每批之间有间隔避免持续高并发pythonasync def process_in_batches(prompts, batch_size10, interval0.5): 分批处理每批之间间隔0.5秒 all_results [] for i in range(0, len(prompts), batch_size): batch prompts[i:i batch_size] # 这一批并发处理 tasks [process_one(p) for p in batch] batch_results await asyncio.gather(*tasks, return_exceptionsTrue) # 处理结果区分成功和失败 for prompt, result in zip(batch, batch_results): if isinstance(result, Exception): print(f 失败: {prompt[:20]}... - {result}) all_results.append(None) else: all_results.append(result) # 批次间隔 if i batch_size len(prompts): await asyncio.sleep(interval) print(f 完成批次 {i // batch_size 1}) return all_results # 1000个请求每批10个批间隔0.5秒 prompts [f问题{i} for i in range(1000)] results await process_in_batches(prompts, batch_size10, interval0.5)4.2 流式结果的实时聚合多个流式请求同时进行实时合并输出pythonasync def stream_one(client, prompt, queue, index): 单个流式请求把结果放入队列 stream await client.chat.completions.create( modelgpt-4o-mini, messages[{role: user, content: prompt}], streamTrue ) async for chunk in stream: if chunk.choices and chunk.choices[0].delta.content: await queue.put((index, chunk.choices[0].delta.content)) await queue.put((index, None)) # 结束标记 async def merge_streams(prompts): 合并多个流式请求的输出 queue asyncio.Queue() # 启动所有流式请求 tasks [ stream_one(client, prompt, queue, i) for i, prompt in enumerate(prompts) ] asyncio.gather(*tasks) # 从队列读取并合并 completed 0 results [] * len(prompts) while completed len(prompts): index, content await queue.get() if content is None: completed 1 else: results[index] content # 实时输出可以改成推送到前端 print(f[{index}] {content}, end, flushTrue) return results4.3 带超时的批量处理给每个请求设超时超时的跳过不影响其他请求pythonasync def process_with_timeout(prompts, timeout10): 每个请求最多等10秒 async def timed_process(prompt): try: return await asyncio.wait_for( process_one(prompt), timeouttimeout ) except asyncio.TimeoutError: return f[超时] {prompt[:20]}... tasks [timed_process(p) for p in prompts] return await asyncio.gather(*tasks)五、错误处理与重试5.1 带指数退避的重试pythonasync def process_with_retry(prompt, max_retries3): 带指数退避的重试 last_error None for attempt in range(max_retries): try: return await process_one(prompt) except Exception as e: last_error e wait 2 ** attempt # 1s, 2s, 4s # 429限流时等久一点 if 429 in str(e): wait * 2 print(f 重试 {attempt 1}/{max_retries}等待 {wait}s: {e}) await asyncio.sleep(wait) raise last_error5.2 熔断保护连续失败时暂停请求避免雪崩pythonimport time class CircuitBreaker: def __init__(self, threshold5, reset_time30): self.failures 0 self.threshold threshold self.reset_time reset_time self.last_failure 0 self.state closed # closed / open def can_proceed(self): if self.state open: if time.time() - self.last_failure self.reset_time: self.state half_open return True return False return True def record_success(self): self.failures 0 self.state closed def record_failure(self): self.failures 1 self.last_failure time.time() if self.failures self.threshold: self.state open print(f[熔断] 连续失败 {self.failures} 次暂停请求) breaker CircuitBreaker(threshold5) async def protected_process(prompt): if not breaker.can_proceed(): return 服务暂时不可用请稍后重试 try: result await process_one(prompt) breaker.record_success() return result except Exception as e: breaker.record_failure() raise六、完整的并发管线把前面的组件组合起来构建一个生产可用的并发调用管线pythonimport asyncio import time from dataclasses import dataclass from typing import List, Optional dataclass class BatchConfig: max_concurrent: int 5 # 最大并发 batch_size: int 20 # 每批数量 batch_interval: float 0.3 # 批间隔 timeout: float 15.0 # 单请求超时 max_retries: int 3 # 最大重试 retry_base_delay: float 1.0 # 重试基础延迟 class AIPipeline: AI API并发调用管线 def __init__(self, client, config: BatchConfig): self.client client self.config config self.semaphore asyncio.Semaphore(config.max_concurrent) self.breaker CircuitBreaker(threshold5) self.stats {success: 0, failed: 0, retried: 0} async def _single_call(self, prompt: str) - Optional[str]: 单个请求带并发控制、超时、重试 if not self.breaker.can_proceed(): return None async with self.semaphore: for attempt in range(self.config.max_retries): try: response await asyncio.wait_for( self.client.chat.completions.create( modelgpt-4o-mini, messages[{role: user, content: prompt}] ), timeoutself.config.timeout ) self.breaker.record_success() self.stats[success] 1 return response.choices[0].message.content except asyncio.TimeoutError: self.stats[retried] 1 if attempt self.config.max_retries - 1: await asyncio.sleep(self.config.retry_base_delay * (2 ** attempt)) except Exception as e: self.stats[retried] 1 if 429 in str(e): await asyncio.sleep(2 * (2 ** attempt)) elif attempt self.config.max_retries - 1: await asyncio.sleep(self.config.retry_base_delay * (2 ** attempt)) self.breaker.record_failure() self.stats[failed] 1 return None async def process_batch(self, prompts: List[str]) - List[Optional[str]]: 批量处理分批 并发 间隔 all_results [] total len(prompts) for i in range(0, total, self.config.batch_size): batch prompts[i:i self.config.batch_size] # 并发处理这一批 tasks [self._single_call(p) for p in batch] batch_results await asyncio.gather(*tasks) all_results.extend(batch_results) # 进度报告 done min(i self.config.batch_size, total) print(f 进度: {done}/{total} f(成功:{self.stats[success]} f失败:{self.stats[failed]} f重试:{self.stats[retried]})) # 批间隔 if done total: await asyncio.sleep(self.config.batch_interval) return all_results def get_stats(self): return dict(self.stats) # 使用示例 async def main(): config BatchConfig( max_concurrent5, batch_size20, batch_interval0.3, timeout15.0, max_retries3 ) pipeline AIPipeline(client, config) # 100个请求 prompts [f用一句话解释什么是{i} for i in range(100)] start time.time() results await pipeline.process_batch(prompts) elapsed time.time() - start print(f\n完成! 耗时: {elapsed:.1f}s) print(f统计: {pipeline.get_stats()}) print(f吞吐量: {len(prompts) / elapsed:.1f} req/s) asyncio.run(main())七、性能对比同一批100个请求不同方案的耗时方案耗时说明同步串行~150s一个一个来无限并发~2s但会触发限流Semaphore(5)~30s稳定但不快分批并发重试~25s生产可用自适应并发~18s最优八、总结构建高并发AI API调用管线的五个要点必须用异步——AsyncOpenAIasyncio.gather必须限并发——Semaphore控制防止限流和内存溢出分批间隔——比持续高并发更稳定重试熔断——网络不稳定是常态必须有容错监控统计——成功/失败/重试次数要可见文中代码组装起来就是一个生产可用的并发管线根据自己的调用量调整BatchConfig参数即可。