FastAPI学习笔记:二、ORM ORMObject-RelationalMapping对象关系映射是一种编程技术用于在面向对象编程语言和关系型数据库之间建立映射。它允许开发者通过操作对象的方式与数据库进行交互而无需直接编写复杂的SQL语句。优势减少重复的 SQL 代码代码更简洁易读自动处理数据库连接和事务自动防止 SQL 注入攻击ORM 分类表排名ORM 工具特点适应场景1SQLAlchemy ORM功能最强、最灵活、企业级各类 API、微服务、数据应用2Django ORM封装好、上手快Django 项目、管理后台3Tortoise ORM全异步异步 Web 服务、高并发 API1、ORM 使用流程安装依赖打开终端输入命令pip install sqlalchemy[asyncio] aiomysqlsqlalchemy[asyncio]SQLAlchemy 的异步支持版本适配 FastAPI 异步场景aiomysqlMySQL 的异步数据库驱动和异步 ORM 配套使用1. 创建异步引擎、建库、建表创建异步引擎from sqlalchemy.ext.asyncio import create_async_engine ​ ASYNC_DATABASE_URL mysqlaiomysql://root:123456localhost:3306/fastAPI_first?charsetutf8 ​ # 创建异步引擎 async_engine create_async_engine( ASYNC_DATABASE_URL, echoTrue, # 可选输出SQL日志 pool_size10, # 设置连接池中保持的持久连接数 max_overflow20 # 设置连接池允许创建的额外连接数 )(1)create_async_engine是什么它是 SQLAlchemy 提供的异步数据库引擎创建方法专门用于 FastAPI 这类异步 Web 项目配合aiomysql驱动实现异步数据库操作。(2)数据库 URL 解析mysqlaiomysql://root:123456localhostmysqlaiomysql指定使用 MySQL 数据库 异步aiomysql驱动root:123456数据库账号密码localhost:3306数据库地址和端口fastAPI_first目标数据库名charsetutf8指定字符集3参数参数作用场景echoTrue控制台打印所有执行的 SQL 语句开发调试用生产环境建议关闭pool_size10连接池保持的持久连接数高并发场景建议根据业务量调大max_overflow20连接池允许的临时额外连接数峰值流量时可临时扩容用完自动回收2. 建库建表首先自己在数据库手动创建一个名为fastAPI_first的库接着from datetime import datetime from fastapi import FastAPI from sqlalchemy import func, Float, DateTime, String from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column ​ from sqlalchemy.ext.asyncio import create_async_engine ​ ASYNC_DATABASE_URL mysqlaiomysql://root:1234localhost:3306/fastAPI_first?charsetutf8 #记得更换为自己的数据库账号密码 ​ app FastAPI() ​ # 1、 创建异步引擎 async_engine create_async_engine( ASYNC_DATABASE_URL, echoTrue, # 可选输出SQL日志 pool_size10, # 设置连接池中保持的持久连接数 ) ​ ​ # ---------------------------------------------------------------------- ​ # 2. 定义模型类 基类 表对应的模型类 # 基类创建时间、更新时间书籍表id、书名、作者、价格、出版社 ​ class Base(DeclarativeBase): create_time: Mapped[datetime] mapped_column(DateTime, insert_defaultfunc.now(), defaultfunc.now, comment创建时间) update_time: Mapped[datetime] mapped_column(DateTime, insert_defaultfunc.now(), defaultfunc.now, onupdatefunc.now()) # 定义公共基类 Base # 定义了所有表的公共字段create_time创建时间、update_time更新时间 # 配置了时间自动管理 # insert_defaultfunc.now()插入数据时自动填充当前时间 # onupdatefunc.now()数据更新时自动刷新时间 # 其他表模型继承 Base 后会自动带上这两个字段避免重复代码 ​ ​ class Book(Base): __tablename__ book ​ id: Mapped[int] mapped_column(primary_keyTrue, comment书籍id) bookname: Mapped[str] mapped_column(String(255), comment书名) author: Mapped[str] mapped_column(String(255), comment作者) price: Mapped[float] mapped_column(Float, comment价格) publisher: Mapped[str] mapped_column(String(255), comment出版社) ​ ​ # __tablename__ book声明该类对应数据库中的 book 表 # 定义了书籍表的业务字段id主键、书名、作者、价格、出版社 # 继承了 Base 的 create_time 和 update_time 字段 # 用 comment 给每个字段添加了数据库注释方便维护 # ---------------------------------------------------------------------- ​ # 3. 建表定义函数建表 → FastAPI 启动的时候调用建表的函数 async def create_tables(): # 获取异步引擎创建事务 - 建表 async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # Base 模型类的元数据创建 ​ ​ # async with async_engine.begin()开启一个数据库事务保证建表操作的原子性 # conn.run_sync(Base.metadata.create_all) # Base.metadata 是所有 ORM 模型比如写的 Book 表的元数据集合 # create_all 会根据这些元数据自动生成并执行 CREATE TABLE SQL 语句在数据库中创建所有定义好的表 # run_sync 是异步 SQLAlchemy 的写法用来在异步环境中执行同步的建表操作 ​ app.on_event(startup) async def startup_event(): await create_tables() ​ ​ # app.on_event(startup) 是 FastAPI 的启动事件钩子 # 作用在服务启动时自动调用 create_tables() 建表 # 效果你一运行 FastAPI 服务它就会自动检查并创建所有模型对应的表不用手动执行 SQL app.get(/) async def root(): return {message: Hello World}run_sync(Base.metadata.create_all)是关键它会把 ORM 模型同步转换为数据库表结构自动执行建表语句。接着启动FastAPI程序然后登录数据库查看表是否成功创建。注意因为这里的on_event 在 FastAPI 的新版本中已经被弃用了。应该使用 lifespan 事件处理器来替代。3. 操作数据CRUDORM 的核心能力就是用 Python 代码替代原生 SQL完成数据操作查询用select()语法筛选数据无需写SELECT新增创建模型对象添加到会话并提交修改查询到对象后修改属性提交会话删除查询到对象后从会话删除并提交2、路由匹配中使用 ORM我们有一个搜索的功能用户点击搜索的时候完成的是数据库查询的工作。那么当我们点击注册的功能时需要在数据库表中新出入一条数据。那么在一个项目当时都是通过一个接口来实现数据库的增删改查的。接下来就用到了路由匹配中使用ORM了核心创建依赖项使用 Depends 注入到处理函数创建方式如下# 创建异步会话工厂 AsyncSessionLocal async_sessionmaker( bindasync_engine, # 绑定数据库引擎 class_AsyncSession, # 指定会话类 expire_on_commitFalse # 会话对象不过期不重新查询数据库 ) # 依赖项用于获取数据库会话 async def get_database(): async with AsyncSessionLocal() as session: try: yield session # 返回数据库会话给路由处理函数 await session.commit() # 无异常提交事务 except Exception: await session.rollback() # 有异常则回滚 raise finally: await session.close() # 关闭会话 app.get(/book/books) async def get_book_list( db: AsyncSession Depends(get_database)#依赖注入数据库对象 ): # 查询所有书籍 result await db.execute(select(Book)) # Book 模型类 user result.scalars().all()#查询所有 return user直接查询book表中的数据from datetime import datetime from contextlib import asynccontextmanager from fastapi import FastAPI, Depends from sqlalchemy import DateTime, func, String, Float, select from sqlalchemy.ext.asyncio import create_async_engine,async_sessionmaker,AsyncSession from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column # 创建 FastAPI 应用实例使用 lifespan 管理应用生命周期 asynccontextmanager async def lifespan(app: FastAPI): 应用生命周期管理器 - yield 之前应用启动时执行如数据库初始化、连接池创建等 - yield 之后应用关闭时执行如关闭数据库连接、清理资源等 # 启动时创建数据库表 await create_tables() yield # 如果有关闭时需要执行的清理操作可以在这里添加 app FastAPI(lifespanlifespan) # 1、创建异步引擎 ASYNC_DATABASE_URL mysqlaiomysql://root:123456localhost:3306/fastapi_first?charsetutf8 async_engine create_async_engine( ASYNC_DATABASE_URL, echoTrue, # 可选输出SQL日志 pool_size10, # 设置连接池活跃的连接数 max_overflow20 # 允许额外的连接数 ) # 2、定义模型类 基类 表对应的模型类 # 基类创建时间、更新时间书籍表id、书名、作者、价格、出版社 class Base(DeclarativeBase): # 创建时间字段插入时自动设置为当前时间默认值为当前时间 create_time: Mapped[datetime] mapped_column(DateTime, insert_defaultfunc.now(), defaultfunc.now, comment创建时间) # 更新时间字段插入和更新时自动设置为当前时间 update_time: Mapped[datetime] mapped_column(DateTime, insert_defaultfunc.now(), defaultfunc.now, onupdatefunc.now(), comment更新时间) class Book(Base): 书籍表模型类 __tablename__ book # 数据库表名 id: Mapped[int] mapped_column(primary_keyTrue, comment书籍id) # 主键 bookname: Mapped[str] mapped_column(String(255), comment书名) # 书名 author: Mapped[str] mapped_column(String(255), comment作者) # 作者 price: Mapped[float] mapped_column(Float, comment价格) # 价格 publisher: Mapped[str] mapped_column(String(255), comment出版社) # 出版社 # 3、建表定义函数建表-FastAPI启动时调用建表的函数 async def create_tables(): 创建数据库表 使用异步引擎创建所有继承自 Base 的模型类对应的表 # 获取异步引擎创建事务-建表 async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # Base 模型类的元数据创建 app.get(/) async def root(): 根路径接口 return {message: Hello World} # 需求查询功能的接口查询图书-依赖注入创建依赖项获取数据库会话 Depends 注入路由处理函数 # 创建异步会话工厂 AsyncSessionLocal async_sessionmaker( bind async_engine, # 绑定上面的数据库引擎 class_AsyncSession, # 指定会话类 expire_on_commitFalse # 提交后不关闭会话不会重新查询数据库 ) # 创建依赖项获取数据库会话 async def get_database(): async with AsyncSessionLocal() as session: try: yield session # 返回数据会话给路由处理函数 await session.commit() # 提交事务 except Exception as e: await session.rollback() # 有异常则回滚事务 raise e finally: await session.close() # 关闭会话 app.get(/book/books) async def get_book_list(db: AsyncSession Depends(get_database)): 就是这里为什么传递的依赖是get_database而不是get_database()函数呢 ✅ 不加括号传递函数本身让 FastAPI 在合适的时机调用 ❌ 加括号立即执行函数传递的是返回值这会导致错误 这是 FastAPI 依赖注入的核心机制通过传递函数引用框架可以控制何时调用、如何管理生命周期比如处理 yield 前后的逻辑。 # 查询 result await db.execute(select(Book)) book result.scalars().all() return book这里是空的因为我们还没有往数据库表中插入数据在PyCharm中连接数据库然后使用图形化工具插入一条数据然后重新进入到接口文档刷新再次发起请求进行查询3、ORM数据库操作先插入几条书籍的数据INSERT INTO fastapi_first.book (bookname, author, price, publisher, create_time, update_time) VALUES (活着, 余华, 39.0, 作家出版社, NOW(), NOW()), (平凡的世界, 路遥, 78.0, 北京十月文艺出版社, NOW(), NOW()), (百年孤独, 加西亚·马尔克斯, 56.8, 南海出版公司, NOW(), NOW()), (解忧杂货店, 东野圭吾, 42.0, 南海出版公司, NOW(), NOW());(1)查询核心语句await db.execute( select(模型类) )返回一个 ORM 对象获取所有数据scalars().all()app.get(/book/get_books) async def get_book_list(db: AsyncSessionDepends(get_database)): result await db.execute(select(Book)) book result.scalars().all() return book获取单条数据scalars().first()get(模型类, 主键值)app.get(/book/get_book) async def get_book(db: AsyncSessionDepends(get_database)): # result await db.execute(select(Book))#获取DEM对象 # bookresult.scalars().first()#获取对象第一条数据 book await db.get(Book, 1)#根据主键来获取单条数据 return book更改这个方法get_book_listapp.get(/book/books) async def get_book_list(db: AsyncSession Depends(get_database)): # 查询所有图书 result await db.execute(select(Book)) books result.scalars().all() return books查询全部数据、查询第一条数据app.get(/book/books) async def get_book_list(db: AsyncSession Depends(get_database)): # 查询指定的图书 result await db.execute(select(Book)) # books result.scalars().all() # 获取所有结果 books result.scalars().first() return books查询指定的图书app.get(/book/books) async def get_book_list(db: AsyncSession Depends(get_database)): # 查询指定的图书 # result await db.execute(select(Book)) # books result.scalars().all() # 获取所有结果 # books result.scalars().first() book await db.get(Book,4) return book查询条件select(Book).where(条件, 条件2, ...)条件比较判断; ; ; ; 等模糊查询like()与非查询; |; ~包含查询in_()比较判断比较判断; ; ; ; 等# 需求路径参数 书籍id app.get(/book/get_book/{book_id}) async def get_book_list(book_id,db: AsyncSession Depends(get_database)): result await db.execute(select(Book).where(Book.id book_id)) book result.scalar_one_or_none() return book # 需求条件 价格大于等于200 app.get(/book/get_book_price/{price}) async def get_book_price(price,db: AsyncSession Depends(get_database)): result await db.execute(select(Book).where(Book.price price)) books result.scalars().all() return books模糊查询模糊查询like()%零个、一个或多个字符_一个单个字符# 需求作者以 曹 开头 % _ app.get(/book/get_book_by_author) async def get_book_list(db: AsyncSession Depends(get_database)): result await db.execute(select(Book).where(Book.author.like(曹%))) book result.scalars().all() return book app.get(/book/get_book_by_author) async def get_book_list(db: AsyncSession Depends(get_database)): result await db.execute(select(Book).where(Book.author.like(曹_))) # 这个只能查询两个字并且姓曹的人 book result.scalars().all() return book与非查询与非查询与|或~非# 需求作者以 曹 开头 % _ app.get(/book/get_book_by_author) async def get_book_list(db: AsyncSession Depends(get_database)): # result await db.execute(select(Book).where(Book.author.like(曹%))) result await db.execute(select(Book).where(Book.author.like(曹%) (Book.price100))) result await db.execute(select(Book).where(Book.author.like(曹%) | (Book.price100))) result await db.execute(select(Book).where(~Book.author.like(曹%))) book result.scalars().all() return book包含查询in_()app.get(/book/get_book_by_author) async def get_book_list(db: AsyncSession Depends(get_database)): # result await db.execute(select(Book).where(Book.author.like(曹%))) # result await db.execute(select(Book).where(Book.author.like(曹%) (Book.price100))) # result await db.execute(select(Book).where(Book.author.like(曹%) | (Book.price100))) # result await db.execute(select(Book).where(~Book.author.like(曹%))) # 需求书籍id列表数据库里面的id如果在 id列表里面就返回 id_list [1,2,3] result await db.execute(select(Book).where(Book.id.in_(id_list))) book result.scalars().all() return book聚合查询聚合计算func.方法(模型类.属性)count统计行数量avg求平均值max求最大值min求最小值sum求和app.get(/book/count) async def get_count(db: AsyncSession Depends(get_database)): # 聚合查询 select(func.方法名(模型类.属性)) # result await db.execute(select(func.count(Book.id))) # result await db.execute(select(func.max(Book.price))) # result await db.execute(select(func.sum(Book.price))) result await db.execute(select(func.avg(Book.price))) count result.scalar() # 用来提取一个数值-标量值 return count分页查询分页查询select().offset().limit()offset跳过的记录数limit返回的记录数app.get(/book/get_books) async def get_book_list( page: int 1, # 需要查询哪一页 page_size: int 3, # 每页展示多少条数据 db: AsyncSession Depends(get_database) ): # 跳过的多少条数据 skip (page-1) * page_size # 查询【第 page 页】的那一页数据offset 跳过的记录数limit 每页的记录数 stmt select(Book).offset(skip).limit(page_size) result await db.execute(stmt) books result.scalars().all() return {books: books}查询总结核心思路select() → db.execute() → 从 ORM 对象获取数据 → 响应结果db.get(模型类, 主键值)从 ORM 对象获取数据的方式获取所有数据 scalars().all()获取单条数据 scalars().first(): 提取第一个数据scalar_one_or_none(): 提取一个或 nullscalar(): 提取标量值配合聚合查询使用(2)新增步骤定义 ORM 对象 → 添加对象到事务add(添加) → commit 提交到数据库from pydantic import BaseModel # 导入基类 # 需求用户输入图书信息(id、书名、作者、价格、出版社)-新增 # 用书输入-参数-请求体 # 定义需要新增的数据类 BookBase继承BaseModel是 Pydantic 请求体模型用来接收、校验前端传参相当于 Java 里的 DTO。 Book继承DeclarativeBase是 SQLAlchemy ORM 模型对应数据库表相当于 Java 里的 Entity。 新增时先把请求体数据转成 ORM 对象再通过 ORM 方法写入数据库。 class BookBase(BaseModel): id: int bookname: str author: str price: float publisher: str app.post(/book/add_book) async def add_book(book: BookBase, db: AsyncSession Depends(get_database)):#获取数据库连接 # ORM对象-add-commit # 获取 book 参数创建图书对象__dict__ 返回 book 对象的属性字典 book_obj Book(**book.__dict__) # 先把book通过__dict__转化为字典然后**解包对字典进行展开 db.add(book_obj) await db.commit() return book(3)更新步骤查询 get → 属性重新赋值 → commit 提交到数据库from pydantic import BaseModel # 导入基类 from fastapi import FastAPI,Depends,HTTPException # 需求修改图书信息先查再改 # 设计思路路径参数书籍id作用是查找请求体参数作用是新数据书名、作者、价格、出版社 class BookUpdate(BaseModel): # bookname: str # author: str # price: float # publisher: str #可选字段 bookname: Optional[str] None author: Optional[str] None price: Optional[float] None publisher: Optional[str] None app.put(/book/update_book/{book_id}) # 通过主键id来找同时也需要准备一个请求体参数BookUpdate async def update_book(book_id: int, data: BookUpdate, db: AsyncSession Depends(get_database)): # 1. 查询 book await db.get(Book, book_id) # 如果未找到抛出异常 if book is None: raise HTTPException(status_code404, detailBook not found) # 2. 修改属性重新赋值 # book.bookname data.bookname # book.author data.author # book.price data.price # book.publisher data.publisher # 2. 只更新前端传了的字段没传的不修改 update_data data.dict(exclude_unsetTrue) # 只保留前端真正传过来的字段忽略没传的字段 for key, value in update_data.items(): setattr(book, key, value) # 3. 提交 await db.commit() return book(4)删除步骤查询 get → delete 删除 → commit 提交到数据库app.delete(/book/delete_book/{book_id}) async def delete_book(book_id: int, db: AsyncSession Depends(get_database)): # 先查再删 提交 db_book await db.get(Book, book_id) # 如果查不到抛出异常 if db_book is None: raise HTTPException(status_code404, detailBook not found) # 查到谁就删除谁 await db.delete(db_book) await db.commit() return {message: Book deleted}总结安装sqlalchemy[asyncio]、aiomysql两个包提供异步 ORM 与 MySQL 异步驱动。建表①create_async_engine创建异步连接引擎② 继承DeclarativeBase编写公共基类与业务表模型③run_sync(Base.metadata.create_all)配合启动事件自动建表操作数据①Depends注入数据库会话② 查询用select()、新增用add()、更新直接赋值、删除用delete()操作后 commit 提交事务