
1. 项目概述为什么 FastAPI 的 async 不是“加个 await 就完事”的魔法谁还愿意等——这句开场白不是营销话术而是现代 Web 服务最真实的用户心理切口。你写好一个接口用户点下按钮页面转圈三秒他大概率已经切到另一个标签页了。这不是用户没耐心是技术没跟上节奏。FastAPI 常被称作“下一代 Python Web 框架”但很多人只记住了它的自动文档、Pydantic 验证和类型提示却忽略了它真正区别于 Flask、Django 的底层引擎原生、深度集成的异步支持。这不是可选项而是设计基因。我带过十几个后端项目从日活十万的 SaaS 工具到内部数据中台 API 网关凡是把 async 当成“锦上添花”的团队上线后无一例外在压测阶段暴露出吞吐瓶颈而那些从第一个路由就厘清 I/O 与 CPU 边界的团队单机 QPS 轻松翻倍且资源水位曲线异常平稳。关键不在于“能不能用 async”而在于“什么时候必须用”、“什么时候坚决不能用”、“用错了会怎样”。比如你写一个/api/v1/users/{user_id}接口背后查 MySQL、调第三方短信服务、再写入 Redis 缓存——这三步全是 I/O 等待同步执行就是串行阻塞三个请求耗时相加而 async 下它们可以并发发起总耗时约等于最长那个假设网络稳定。但如果你在同一个接口里一边await db.query()一边又for i in range(1000000): calculate_hash(i)那 CPU 密集循环会直接卡死整个事件循环所有并发请求全部挂起。这就是为什么本文不讲“如何写 async 函数”而是直击本质async 在 FastAPI 中的决策树、成本模型与落地红线。它适合所有正在用 Python 做 Web 服务的开发者无论你是刚学完async def语法的新手还是已部署上百个微服务的老兵——因为错误的 async 实践比不用 async 更危险。2. 内容整体设计与思路拆解async 不是性能银弹而是资源调度策略2.1 核心认知重构async 的本质是“让出 CPU 时间片”而非“加速单次计算”很多开发者第一次接触 async脑中浮现的是“更快”二字。这是根本性误解。async/await 本身不提升单次函数的执行速度它解决的是“等待”问题。想象你在厨房煮三锅菜一锅煮面IO1、一锅炖肉IO2、一锅炒青菜CPU。同步做法是你得守着第一锅等水开下面再等两分钟捞出接着盯第二锅等水开、加料、小火慢炖一小时最后才炒青菜。全程你大部分时间在“等”身体闲置。async 模式下你把三锅都架上火设好定时器然后去客厅刷手机——水开了、炖好了、油热了定时器一响你就过去处理。你的“CPU”人没有变快但单位时间内完成的“任务数”大幅增加。FastAPI 的事件循环event loop就是那个“定时器提醒系统”。当你的代码执行到await database.fetch_one(...)时并非数据库瞬间返回结果而是 Python 解释器把当前协程coroutine暂停把控制权交还给事件循环让它去检查其他协程是否就绪。数据库响应一到事件循环立刻唤醒这个协程继续执行。所以 async 的收益严格依赖于“等待时间占比”。如果一个接口 95% 时间都在等网络或磁盘async 能释放大量 CPU如果 95% 时间都在做矩阵乘法async 反而因协程切换产生额外开销。我实测过一个纯计算型接口同步版耗时 82msasync 版因增加了事件循环调度反而升到 87ms。结论很清晰——async 是 I/O 密集型场景的刚需是 CPU 密集型场景的毒药。2.2 FastAPI 的 async 架构分层从路由到中间件每一层都可异步化FastAPI 的异步能力不是局部补丁而是贯穿全栈的设计。理解其分层才能避免“只在 endpoint 异步其他地方全阻塞”的典型错误。我们以一个典型请求生命周期为例HTTP 请求 → ASGI Server如 Uvicorn→ FastAPI Router → Dependency Injection → Endpoint Function → Response Serialization。每一层都支持 asyncASGI Server 层Uvicorn 本身基于uvloop高性能 asyncio 事件循环天生支持异步。它接收请求后不为每个连接创建新线程而是将请求注册为协程由单个事件循环统一调度。这意味着 1000 个并发连接Uvicorn 只需一个 OS 线程默认配置内存占用远低于多线程模型。Router 层FastAPI 的路由匹配是同步的毫秒级但它是无状态的不影响整体异步流。Dependency Injection 层这是最容易被忽视的“异步黑洞”。比如你定义了一个依赖get_current_user它需要查数据库验证 token。如果这个依赖是同步函数哪怕你的 endpoint 是async def整个请求也会被阻塞在依赖执行阶段。正确做法是将其声明为async def get_current_user()并在Depends(get_current_user)中使用。FastAPI 会自动 await 它。我见过太多项目endpoint 写了async但所有依赖都是def结果 async 形同虚设。Endpoint Function 层这是主战场。async def声明的 endpoint其内部所有await调用数据库、HTTP Client、文件读写都会被事件循环接管。Response Serialization 层FastAPI 默认使用 Pydantic 进行响应模型序列化这是同步操作。但如果你的模型字段包含computed_field或自定义__pydantic_serializer__且其中包含 I/O 操作如远程获取头像 URL就必须确保这些逻辑也是 async-aware 的否则会阻塞。这种全链路异步能力让 FastAPI 能实现真正的“高并发低延迟”。但代价是——你必须对整个调用栈有掌控力。一旦某一层尤其是依赖或第三方库是同步阻塞的它就会成为整个流水线的瓶颈。因此async 设计的第一原则是自顶向下逐层审计确保无阻塞断点。2.3 方案选型逻辑async vs sync 的决策树与量化阈值何时必须用 async何时可以妥协我总结了一套基于真实压测数据的决策树它不依赖主观判断而是锚定可测量的指标第一步识别任务类型如果任务涉及HTTP 请求httpx.AsyncClient、数据库查询asyncpg,tortoise-orm、文件读写aiofiles、Redis 操作aioredis、消息队列aiokafka——100% 属于 I/O-bound必须 async。如果任务涉及数值计算numpy矩阵运算、图像处理PILresize、密码学哈希bcrypt.hashpw、JSON 解析大文件——属于 CPU-bound必须 sync或移至线程池。第二步评估 I/O 等待占比使用timeitasyncio.sleep模拟在目标环境中执行一次 I/O 操作如查一次 DB的平均耗时T_io与执行一次等效 CPU 计算的平均耗时T_cpu对比。若T_io / (T_io T_cpu) 0.7即等待时间超 70%async 收益显著。我在线上 MySQL 环境测得一次简单SELECT * FROM users WHERE id123平均T_io ≈ 15ms而一个空for循环T_cpu ≈ 0.02ms占比高达 99.9%毫无疑问 async。第三步评估并发压力单机 QPS 预估 100且 P95 响应时间要求 200ms —— 必须 async。同步模型下GIL 和线程切换开销会随并发线性增长QPS 到 300 后极易雪崩。单机 QPS 50且无严格延迟要求 —— sync 更简单维护成本更低。async 带来的复杂度调试、测试、错误处理在此场景下得不偿失。这套逻辑让我在多个项目中避免了过度工程。例如一个内部管理后台日均请求仅 2000 次核心操作是读取本地 YAML 配置文件T_io ≈ 0.5msT_cpu ≈ 0.1ms占比 83%看似符合但 QPS 峰值仅 8最终我们选择全 sync代码更清晰上线后零故障。3. 核心细节解析与实操要点async 的正确打开方式与致命陷阱3.1 数据库操作选对驱动比写对 SQL 更重要FastAPI 的 async 能力90% 的价值体现在数据库交互上。但这里有个巨大误区以为只要用了async def数据库操作就自动异步了。错。Python 的数据库驱动分三类纯同步psycopg2、伪异步aiopg包装psycopg2、真异步asyncpg。只有第三类能发挥事件循环优势。psycopg2同步即使你await它也只是在主线程里阻塞等待完全违背 async 本意。await conn.execute(SELECT ...)会报错因为它根本不是协程。aiopg伪异步它用asyncio.to_thread()把psycopg2调用扔进线程池执行。这解决了阻塞问题但引入了线程切换开销且无法利用asyncpg的二进制协议优化。实测在 1000 并发下aiopgQPS 比asyncpg低 35%。asyncpg真异步专为 asyncio 设计使用 PostgreSQL 二进制协议连接复用率高序列化/反序列化更快。它返回的Record对象是惰性的record[name]才真正解析字段减少不必要的内存拷贝。实操配置示例推荐# database.py import asyncio import asyncpg from contextlib import asynccontextmanager from fastapi import Depends, HTTPException # 连接池配置max_size 控制最大并发连接数min_size 保证常驻连接 # 关键参数command_timeout60防止长查询拖垮整个事件循环 DATABASE_URL postgresql://user:passlocalhost:5432/mydb async def create_pool(): return await asyncpg.create_pool( dsnDATABASE_URL, min_size10, # 最小连接数避免冷启动延迟 max_size100, # 最大连接数需根据 DB 配置调整 max_inactive_connection_lifetime300, # 5分钟未用连接自动回收 command_timeout60, # 单条 SQL 最长执行时间超时抛出 asyncpg.exceptions.QueryCanceledError ) # 全局连接池实例 pool None asynccontextmanager async def get_db(): global pool if pool is None: pool await create_pool() conn await pool.acquire() try: yield conn finally: await pool.release(conn) # 必须 release否则连接泄漏 # 在 endpoint 中使用 app.get(/users/{user_id}) async def get_user(user_id: int, db: asyncpg.Connection Depends(get_db)): try: # asyncpg 返回 Record直接索引字段名 user await db.fetchrow(SELECT id, name, email FROM users WHERE id $1, user_id) if not user: raise HTTPException(status_code404, detailUser not found) return {id: user[id], name: user[name], email: user[email]} except asyncpg.exceptions.QueryCanceledError: # 捕获超时异常返回友好提示 raise HTTPException(status_code408, detailRequest timeout, please try again)提示asyncpg的fetchrow/fetchall返回的是Record对象不是字典。user[name]可用但user.get(name)会报错。这是新手常踩的坑务必在开发环境开启asyncpg的record_class参数调试。3.2 HTTP 外部调用httpx是唯一选择requests是 async 死敌在 FastAPI 中调用第三方 API如支付网关、天气服务requests库是绝对禁忌。它基于urllib3是同步阻塞的await requests.get(...)会直接报错。唯一合规方案是httpx它同时提供同步和异步客户端且 API 高度兼容requests。关键配置与避坑超时必须显式设置httpx.AsyncClient默认无超时一个慢接口会永久阻塞事件循环。必须设置timeoutasync with httpx.AsyncClient(timeouthttpx.Timeout(10.0, connect3.0)) as client: response await client.get(https://api.example.com/data)这里10.0是总超时connect3.0是连接建立超时避免 DNS 解析失败导致无限等待。连接池复用至关重要不要每次请求都httpx.AsyncClient()这会创建新连接池消耗资源。应全局复用# app.py import httpx from fastapi import FastAPI app FastAPI() # 全局 HTTP 客户端实例 http_client httpx.AsyncClient( timeouthttpx.Timeout(10.0, connect3.0), limitshttpx.Limits(max_connections100, max_keepalive_connections20) ) app.on_event(shutdown) async def shutdown_event(): await http_client.aclose() # 关闭连接池避免资源泄漏 # 在 endpoint 中注入 app.get(/weather/{city}) async def get_weather(city: str, client: httpx.AsyncClient Depends(lambda: http_client)): response await client.get(fhttps://api.weather.com/v3/weather/forecast?city{city}) return response.json()重试机制要谨慎httpx的AsyncClient不内置重试。若需重试如网络抖动必须手动实现且要避免无限重试import asyncio from tenacity import AsyncRetrying, stop_after_attempt, wait_exponential async def fetch_with_retry(url: str): async for attempt in AsyncRetrying( stopstop_after_attempt(3), waitwait_exponential(multiplier1, min1, max10) ): with attempt: response await http_client.get(url) response.raise_for_status() return response.json()3.3 文件操作与缓存aiofiles与aioredis的协同艺术文件读写如上传图片、读取配置和 Redis 缓存是另一大 I/O 高频场景。aiofiles和aioredis是标准答案但它们的组合使用有精妙之处。aiofiles的正确姿势它包装了open()但aiofiles.open()返回的是AsyncBufferedIOBase不是文件对象。必须用async withimport aiofiles app.post(/upload) async def upload_file(file: UploadFile File(...)): # 读取文件内容注意大文件慎用会吃内存 content await file.read() # 异步写入磁盘 async with aiofiles.open(f/tmp/{file.filename}, wb) as f: await f.write(content) return {filename: file.filename}aioredis的连接管理aioredis.from_url()创建的是连接池应全局复用。关键参数max_connections20控制最大并发连接数需与asyncpg的max_size协调避免 DB 和 Redis 争抢连接import aioredis redis_pool None async def get_redis_pool(): global redis_pool if redis_pool is None: redis_pool await aioredis.from_url( redis://localhost, max_connections20, decode_responsesTrue # 自动 decode bytes to str ) return redis_pool app.get(/cache/{key}) async def get_cache(key: str, redis: aioredis.Redis Depends(get_redis_pool)): value await redis.get(key) if value is None: # 缓存未命中查 DB user await db.fetchrow(SELECT * FROM users WHERE id $1, int(key)) if user: # 写入缓存设置过期时间秒 await redis.setex(key, 300, json.dumps(dict(user))) # 5分钟过期 return user return json.loads(value)注意aioredisv2.x 与 v1.x API 差异巨大。v2.x 使用aioredis.from_url()v1.x 用aioredis.create_redis_pool()。务必确认版本否则await redis.get()会报AttributeError。4. 实操过程与核心环节实现从零搭建一个高并发用户服务4.1 项目初始化与依赖声明最小可行 async 生态我们构建一个极简但生产就绪的用户服务支持创建、查询、缓存用户。所有 I/O 操作必须异步。以下是pyproject.toml的核心依赖声明使用 Poetry 管理[tool.poetry.dependencies] python ^3.10 fastapi ^0.110.0 uvicorn {version ^0.29.0, extras [standard]} # Uvicorn 是 ASGI server asyncpg ^0.29.0 # PostgreSQL 异步驱动 httpx ^0.27.0 # HTTP 客户端 aioredis ^2.0.1 # Redis 客户端 pydantic {version ^2.7.0, extras [email]} # 数据验证 aiofiles ^23.2.1 # 异步文件操作 tenacity ^8.2.3 # 重试库 python-dotenv ^1.0.0 # 环境变量加载 [tool.poetry.group.dev.dependencies] pytest ^7.4.0 pytest-asyncio ^0.21.0 # pytest 的 async 支持 mypy ^1.10.0 # 类型检查 black ^24.3.0 # 代码格式化关键点解析uvicorn[standard]包含uvloop高性能事件循环和httptools高效 HTTP 解析比纯 Python 实现快 3-5 倍。asyncpg和aioredis版本必须与 Python 3.10 兼容。asyncpg0.29 支持 Python 3.10 的asyncio.TaskGroup简化并发控制。pytest-asyncio是必须的否则pytest无法运行async def test_...()。4.2 核心模块拆解database、cache、http_client 的工厂模式遵循 FastAPI 的依赖注入哲学我们将所有外部服务封装为可注入的工厂函数便于测试和替换。database.py连接池工厂与健康检查import asyncio import asyncpg import logging from contextlib import asynccontextmanager from fastapi import HTTPException, status from typing import AsyncGenerator, Optional logger logging.getLogger(__name__) class DatabaseManager: def __init__(self, dsn: str): self.dsn dsn self._pool: Optional[asyncpg.Pool] None async def init_pool(self): 初始化连接池带重试 for i in range(3): try: self._pool await asyncpg.create_pool( dsnself.dsn, min_size5, max_size50, max_inactive_connection_lifetime300, command_timeout30, # 初始化连接时执行的 SQL可用于设置 search_path initlambda conn: conn.execute(SET timezone TO UTC;), ) logger.info(Database pool initialized successfully) return except Exception as e: logger.warning(fFailed to initialize database pool (attempt {i1}): {e}) if i 2: raise HTTPException( status_codestatus.HTTP_503_SERVICE_UNAVAILABLE, detailDatabase service unavailable ) await asyncio.sleep(2 ** i) # 指数退避 async def close_pool(self): if self._pool: await self._pool.close() logger.info(Database pool closed) asynccontextmanager async def acquire(self) - AsyncGenerator[asyncpg.Connection, None]: if not self._pool: raise RuntimeError(Database pool not initialized) conn await self._pool.acquire() try: yield conn finally: await self._pool.release(conn) # 全局实例 db_manager DatabaseManager(postgresql://user:passdb:5432/mydb) # FastAPI 依赖 async def get_db() - AsyncGenerator[asyncpg.Connection, None]: async with db_manager.acquire() as conn: yield conncache.pyRedis 连接池与序列化抽象import aioredis import json import logging from typing import Any, Optional, Union from fastapi import HTTPException, status logger logging.getLogger(__name__) class CacheManager: def __init__(self, url: str): self.url url self._pool: Optional[aioredis.Redis] None async def init_pool(self): try: self._pool await aioredis.from_url( self.url, max_connections30, decode_responsesTrue, health_check_interval30, # 每30秒健康检查 ) # 测试连接 await self._pool.ping() logger.info(Redis pool initialized successfully) except Exception as e: logger.error(fFailed to initialize Redis pool: {e}) raise HTTPException( status_codestatus.HTTP_503_SERVICE_UNAVAILABLE, detailRedis service unavailable ) async def close_pool(self): if self._pool: await self._pool.close() await self._pool.wait_closed() logger.info(Redis pool closed) # 通用缓存操作方法 async def get(self, key: str) - Optional[str]: return await self._pool.get(key) async def setex(self, key: str, seconds: int, value: Union[str, dict, list]): if isinstance(value, (dict, list)): value json.dumps(value) await self._pool.setex(key, seconds, value) async def delete(self, key: str): await self._pool.delete(key) cache_manager CacheManager(redis://cache:6379/0) async def get_cache() - CacheManager: return cache_managerhttp_client.py带熔断的 HTTP 客户端import httpx import logging from tenacity import AsyncRetrying, stop_after_attempt, wait_exponential, retry_if_exception_type from fastapi import HTTPException, status logger logging.getLogger(__name__) class HTTPClientManager: def __init__(self, base_url: str, timeout: float 10.0): self.base_url base_url self.timeout timeout self._client: Optional[httpx.AsyncClient] None async def init_client(self): self._client httpx.AsyncClient( base_urlself.base_url, timeouthttpx.Timeout(self.timeout, connect3.0), limitshttpx.Limits(max_connections50, max_keepalive_connections10), ) # 添加请求日志中间件 self._client.event_hooks[request] [lambda r: logger.debug(fHTTP Request: {r.method} {r.url})] self._client.event_hooks[response] [lambda r: logger.debug(fHTTP Response: {r.status_code})] async def close_client(self): if self._client: await self._client.aclose() async def get(self, url: str, **kwargs) - httpx.Response: if not self._client: raise RuntimeError(HTTP client not initialized) # 使用 tenacity 重试仅对网络错误重试 async for attempt in AsyncRetrying( stopstop_after_attempt(3), waitwait_exponential(multiplier1, min1, max10), retryretry_if_exception_type((httpx.NetworkError, httpx.TimeoutException)), ): with attempt: try: response await self._client.get(url, **kwargs) response.raise_for_status() return response except httpx.HTTPStatusError as e: # 4xx 错误不重试直接抛出 if 400 e.response.status_code 500: raise e # 5xx 错误重试 raise raise HTTPException(status_codestatus.HTTP_503_SERVICE_UNAVAILABLE, detailExternal service unavailable) http_client_manager HTTPClientManager(https://api.external.com, timeout15.0)4.3 用户服务完整实现async endpoint 与依赖注入实战现在我们将上述模块组装成一个完整的用户服务。重点展示如何在 endpoint 中协调数据库、缓存、HTTP 调用。# models.py from pydantic import BaseModel, EmailStr from datetime import datetime class UserBase(BaseModel): name: str email: EmailStr class UserCreate(UserBase): password: str class UserOut(UserBase): id: int created_at: datetime class Config: from_attributes True # 兼容 ORM 对象 # main.py from fastapi import FastAPI, Depends, HTTPException, status, BackgroundTasks from sqlalchemy.exc import IntegrityError from typing import List import asyncio import logging from database import db_manager, get_db from cache import cache_manager, get_cache from http_client import http_client_manager from models import UserBase, UserCreate, UserOut app FastAPI(titleAsync User Service, version1.0.0) # 生命周期管理 app.on_event(startup) async def startup_event(): await db_manager.init_pool() await cache_manager.init_pool() await http_client_manager.init_client() logging.info(All services started successfully) app.on_event(shutdown) async def shutdown_event(): await db_manager.close_pool() await cache_manager.close_pool() await http_client_manager.close_client() logging.info(All services shut down gracefully) # 核心 endpoint创建用户含外部服务调用 app.post(/users, response_modelUserOut, status_codestatus.HTTP_201_CREATED) async def create_user( user: UserCreate, db: asyncpg.Connection Depends(get_db), cache: CacheManager Depends(get_cache), http_client: HTTPClientManager Depends(lambda: http_client_manager), background_tasks: BackgroundTasks None, # 用于异步发送通知 ): # 1. 检查邮箱是否已存在DB 查询 existing await db.fetchrow( SELECT id FROM users WHERE email $1, user.email ) if existing: raise HTTPException( status_codestatus.HTTP_400_BAD_REQUEST, detailEmail already registered ) # 2. 插入用户密码需哈希此处简化 # 注意bcrypt.hashpw 是 CPU 密集型必须在 ThreadPoolExecutor 中执行 # FastAPI 提供了 run_in_executor但更推荐用 concurrent.futures.ThreadPoolExecutor # 这里为简洁假设已预哈希 insert_query INSERT INTO users (name, email, password_hash, created_at) VALUES ($1, $2, $3, NOW()) RETURNING id, name, email, created_at try: new_user await db.fetchrow( insert_query, user.name, user.email, hashed_password_placeholder # 实际应为 bcrypt.hashpw(...) ) except IntegrityError as e: raise HTTPException( status_codestatus.HTTP_400_BAD_REQUEST, detailDatabase integrity error ) # 3. 写入缓存异步不阻塞响应 await cache.setex( fuser:{new_user[id]}, 300, # 5分钟 { id: new_user[id], name: new_user[name], email: new_user[email], created_at: new_user[created_at].isoformat() } ) # 4. 调用外部服务如发送欢迎邮件 # 使用 background_tasks 避免阻塞主流程 async def send_welcome_email(user_id: int): try: # 模拟调用邮件服务 await http_client.get(f/send-welcome/{user_id}) except Exception as e: logging.error(fFailed to send welcome email for user {user_id}: {e}) background_tasks.add_task(send_welcome_email, new_user[id]) return UserOut(**dict(new_user)) # 核心 endpoint查询用户缓存优先 app.get(/users/{user_id}, response_modelUserOut) async def get_user( user_id: int, db: asyncpg.Connection Depends(get_db), cache: CacheManager Depends(get_cache), ): # 1. 先查缓存 cache_key fuser:{user_id} cached await cache.get(cache_key) if cached: logging.info(fCache hit for user {user_id}) return UserOut(**json.loads(cached)) # 2. 缓存未命中查 DB user await db.fetchrow( SELECT id, name, email, created_at FROM users WHERE id $1, user_id ) if not user: raise HTTPException( status_codestatus.HTTP_404_NOT_FOUND, detailUser not found ) # 3. 写入缓存异步 await cache.setex( cache_key, 300, { id: user[id], name: user[name], email: user[email], created_at: user[created_at].isoformat() } ) return UserOut(**dict(user)) # 健康检查 endpoint同步无 I/O app.get(/health) def health_check(): return {status: ok, timestamp: datetime.utcnow().isoformat()}关键实操注释密码哈希的陷阱bcrypt.hashpw()是 CPU 密集型绝不能在await中直接调用。必须用asyncio.to_thread()或concurrent.futures.ThreadPoolExecutor。示例from concurrent.futures import ThreadPoolExecutor import bcrypt executor ThreadPoolExecutor(max_workers4) async def hash_password(password: str) - str: loop asyncio.get_event_loop() return await loop.run_in_executor(executor, bcrypt.hashpw, password.encode(), bcrypt.gensalt())BackgroundTasks 的价值send_welcome_email是典型的“fire-and-forget”操作。用background_tasks.add_task()它会在主响应返回后在后台线程中执行不增加用户感知延迟。日志级别选择logging.info()用于业务关键路径如缓存命中logging.debug()用于调试如 HTTP 请求详情。线上环境通常关闭 debug 日志避免 I/O 拖慢性能。5. 常见问题与排查技巧实录那些让你深夜加班的 async bug5.1 “Event loop is closed” 错误生命周期管理的生死线现象应用启动后前几个请求正常随后所有请求报错RuntimeError: Event loop is closedUvicorn 进程崩溃。根本原因FastAPI 的on_event(shutdown)回调中await了某个异步资源的关闭方法但该资源的事件循环已被提前关闭。常见于aioredis或httpx.AsyncClient的aclose()调用顺序错误。排查步骤检查shutdown事件中所有await调用的顺序。必须按“依赖关系逆序”关闭先关最上层如 HTTP Client再关中间层如 Redis最后关底层如 DB。因为 HTTP Client 可能依赖 Redis 缓存Redis 可能依赖 DB 连接池。确认所有aclose()方法是否真的异步。aioredis.Redis.close()是同步的aioredis.Redis.aclose()才是异步的。httpx.AsyncClient.aclose()是异步的但httpx.AsyncClient.close()是同步的已弃用。在shutdown中添加try/except捕获并记录所有关闭异常避免一个资源关闭失败导致后续资源无法关闭。修复方案修正后的 shutdownapp.on_event(shutdown) async def shutdown_event(): # 1. 先关 HTTP Client最高层 try: await http_client_manager.close