FastAPI 学习 路由路由时url地址和处理函数之间的映射关系app.get(/hello) async def read_hello(): return {msg: hello world}路径参数指向唯一的特定的资源 。方法getapp.get(/items/{item_id}) async def read_item(item_id: int): return {item_id: item_id}参数限制 Path(from fastapi import FastAPI,Path)Path 的一些参数description 对参数的描述lt 参数需要小于的值gt 参数需要大于的值max_length 字符串最长长度min_length 字符串最短长度app.get(/items/{item_id}) async def read_item(item_id: int Path(...,gt 0,lt 101,description范围为1到100)): return {item_id: item_id} app.get(/items/{name}) async def read_item(name: str Path(...,max_length100,min_length10)): return {name: f这是{name}}查询参数查询参数就是附加在网址末尾的条件指令用来精确指挥服务器帮你实现数据的搜索、过滤和翻页。app.get(/news/news_list) async def get_news_list(skip: int 0,limit:int 10): return {skip: skip, limit: limit}对于限制为 Query 其参数与Path 一样app.get(/news/news_list) async def get_news_list(skip: int Query(0, description跳过的记录记录数),limit:int 10): return {skip: skip, limit: limit}请求体参数(from pydantic import BaseModel)方法 Post请求体就是藏在网络请求内部的“数据包裹”专门用来安全地向服务器提交大量、复杂或私密的信息如账号密码、长文本和文件通过基础BaseModel来定义想要的值class User(BaseModel): username: str password: str app.post(/register) async def register_user(user: User): return {username: user.username, password: user.password}限制方法 Fieldfrom fastapi import FastAPI,Path,Request,Query from pydantic import BaseModel,Field class User(BaseModel): username: str Field(default 张三,min_length2,max_length10,description你的名字) password: str Field(min_length 3,max_length 12) app.post(/register) async def register_user(user: User): return {username: user.username, password: user.password}响应类型FastAPI 默认返回json格式指定响应类from fastapi.responses import HTMLResponse app.get(/html,response_classHTMLResponse) def read_html(): return h1这是1级标题h1 from fastapi.responses import FileResponse app.get(/files) async def get_files(): path ./files/test.txt return FileResponse(path path)自定义响应数据格式使用继承BaseModel进行包装class News(BaseModel): id :int name: str content: str app.post(/news/{id},response_modelNews) async def create_news(id:int): return { id:id, name:f这是第{id}本书, content:f这是一本好书 }中间件作用为每个请求添加统一的处理逻辑多个中间件执行逻辑为自下而上async def middleware(request: Request,call_next): print(中间件开始1) response await call_next(request) print(中间件结束1) return response app.middleware(http) async def middleware2(request: Request,call_next): print(中间件开始2) response await call_next(request) print(中间件结束2) return response app.middleware(http) async def middleware3(request: Request,call_next): print(中间件开始3) response await call_next(request) print(中间件结束3) return response依赖注入抽取可复用的组件实现代码复用解耦且可轻松替换依赖项进行测试from fastapi import Depends async def common_parameters( skip: int Query(0, ge0), limit: int Query(10, ge60), ): return {skip: skip, limit: limit} app.get(/news/news_list) async def get_news_list(commons Depends(common_parameters)): return commonsORM定义模型类基类 继承DeclarativeBase创建数据库表从链接池获取异步链接开启事务执行ORM操作FastAPI 应用是开始建表import datetime from fastapi import FastAPI,Path,Request,Query from pydantic import BaseModel,Field from starlette.responses import HTMLResponse # from sqlalchemy.ext.asyncio import create_async_engine ASYNC_DATABASE_URL postgresqlasyncpg://john:jxp317777localhost/Fast async_engine create_async_engine( ASYNC_DATABASE_URL, echo True, pool_size 10, max_overflow 5, ) # 定义模型类 from sqlalchemy.orm import DeclarativeBase,Mapped,mapped_column from sqlalchemy import DateTime,func,String,Float from datetime import datetime class Base(DeclarativeBase): create_time : Mapped[datetime] mapped_column(DateTime,insert_defaultfunc.now(),defaultfunc.now(),comment创建时间) update_time : Mapped[datetime] mapped_column(DateTime,insert_defaultfunc.now(),defaultfunc.now(),onupdatefunc.now(),comment更新时间) class Book(Base): __tablename__ book id:Mapped[int] mapped_column(primary_keyTrue,comment书籍id) bookname: Mapped[str] mapped_column(String(255),comment书名) author: Mapped[str] mapped_column(String(255),comment作者) price: Mapped[float] mapped_column(Float,comment价格) publisher:Mapped[str] mapped_column(String(255),comment出版社) # 建表 from contextlib import asynccontextmanager async def create_table(): async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) asynccontextmanager async def lifespan(app: FastAPI): await create_table() app.state.engine async_engine yield await async_engine.dispose() # 初始化 FastAPI 应用 app FastAPI( title我的简易 API, description这是一个非常简单的 FastAPI 基础框架, version1.0.0, lifespanlifespan )路由注入会话# 创造会话工厂 AsyncSessionLocal async_sessionmaker( bind async_engine,#绑定数据库引擎 class_ AsyncSession,#指定会话类 expire_on_commit False,#提交后会话不过期不会重新查询数据库 ) async def get_database(): async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception as e: await session.rollback() raise e finally: await session.close() # 初始化 FastAPI 应用 app FastAPI( title我的简易 API, description这是一个非常简单的 FastAPI 基础框架, version1.0.0, lifespanlifespan ) app.get(/book/books) async def get_books_list(db: AsyncSession Depends(get_database)): result await db.execute(select(Book)) book result.scalars().all() return bookORM的数据库操作查询async def get_books_list(db: AsyncSession Depends(get_database)): # result await db.execute(select(Book)) # book result.scalars().all() book await db.get(Book,5) return book条件查询async def get_books_list(book_id:int,db: AsyncSession Depends(get_database)): result await db.execute(select(Book).where(Book.id book_id)) book result.scalars().all() return book app.get(/book/search_book) async def search_book(db: AsyncSession Depends(get_database)): # result await db.execute(select(Book).where(Book.author.like(曹%)))#%可以代表1个或多个字符,_代表一个字符 # result await db.execute(select(Book).where(Book.author.like(曹%) Book.price 100)) id_list[1,2,4,6] result await db.execute(select(Book).where(Book.id.in_(id_list))) book result.scalars().all() return book聚合查询聚合计算func.方法模型类.属性async def get_book_count(db: AsyncSession Depends(get_database)): result await db.execute(select(Book.count(Book.id))) num result.scalars() return num分页查询这里指连续页面中每个页面需要的数据async def get_book_list( page:int 1, page_size:int 3, db: AsyncSession Depends(get_database) ): skip (page-1)*page_size stmt select(Book).offset(skip).limit(page_size) result await db.execute(stmt) books result.scalars().all() return books增加数据步骤定义ORM对象-添加对象到事物add对象- commit提交到数据库app.post(/book/add_book) async def add_book(book:BookBase,db: AsyncSession Depends(get_database)): book_obj Book(**book.__dict__) db.add(book_obj) await db.commit() return book_obj修改数据步骤查询get-属性重新赋值-commit提交到数据库app.put(/book/update_book/{book_id}) async def update_book(book_id:int,data:Bookdatabase,db: AsyncSession Depends(get_database)): db_book await db.get(Book,book_id) if db_book is None: raise HTTPException(status_code404,detailBook not found) else: db_book.bookname data.bookname db_book.author data.author db_book.price data.price db_book.publisher data.publisher await db.commit() return db_book删除数据步骤查询get-delete(查到的书)-commit提交到数据库app.delete(/book/delete_book/{book_id}) async def delete_book(book_id:int,db: AsyncSession Depends(get_database)): db_book await db.get(Book,book_id) if db_book is None: raise HTTPException(status_code404,detailBook not found) else: await db.delete(db_book) await db.commit() return {message:Book deleted}