🚀 FastAPI + SQLAlchemy 异步事务完全指南|一篇搞定提交/回滚/异常处理

💡 收藏级教程:从原理到实践,手把手教你写出健壮的异步数据库事务代码


📌 开篇:为什么异步事务这么重要?

在现代高并发应用中,数据一致性是生命线。而事务(Transaction)正是保障一致性的核心机制。

当 FastAPI 遇上 SQLAlchemy 的异步模式,很多开发者会困惑:

  • commit()flush() 有什么区别?
  • ❓ 异常时如何正确回滚?
  • async with db.begin() 真的能自动管理事务吗?

今天,我们就用一个完整可运行的示例,彻底搞懂 FastAPI 中的异步事务!🎯


🗂️ 项目结构一览

1
2
3
4
5
6
project/
├── main.py # 路由与事务逻辑(核心⭐)
├── database.py # 数据库连接配置
├── models.py # SQLAlchemy 模型定义
├── schemas.py # Pydantic 数据校验
└── requirements.txt # 依赖清单

✅ 建议按此结构组织项目,职责清晰,便于维护


⚙️ 第一步:环境准备

1
2
3
4
5
6
# requirements.txt
fastapi
uvicorn
sqlalchemy[asyncio] # 关键!启用异步支持
aiosqlite # SQLite 的异步驱动
pydantic
1
pip install -r requirements.txt

🔔 注意:sqlalchemy[asyncio][asyncio] extras 不能少,否则异步功能无法使用!


🔗 第二步:数据库配置(database.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase

DATABASE_URL = "sqlite+aiosqlite:///./test.db"

# 🎯 创建异步引擎
engine = create_async_engine(DATABASE_URL, echo=True, future=True)

# 🎯 创建会话工厂
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False # 避免提交后访问已过期对象
)

class Base(DeclarativeBase):
pass

# 🎯 初始化表(开发环境用)
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

🔑 关键参数解析

参数 作用 建议
echo=True 打印 SQL 语句,便于调试 开发开启,生产关闭
future=True 启用 2.0 风格 API ✅ 始终开启
expire_on_commit=False 提交后不自动过期对象 ✅ 避免 Unexpected 错误

📦 第三步:模型与 Schema(models.py + schemas.py)

1
2
3
4
5
6
7
8
9
# models.py
from sqlalchemy import Column, Integer, String
from database import Base

class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, index=True) # 🔒 唯一约束
email = Column(String, unique=True, index=True) # 🔒 唯一约束
1
2
3
4
5
6
7
8
9
10
11
12
13
# schemas.py
from pydantic import BaseModel

class UserCreate(BaseModel):
name: str
email: str

class UserResponse(BaseModel):
id: int
name: str
email: str
class Config:
from_attributes = True # ✅ SQLAlchemy 2.0 兼容写法

💡 from_attributes = True 替代了旧的 orm_mode = True,注意版本兼容性!


🎬 第四步:事务实战三部曲(main.py)

🌟 示例 1:基础提交事务

1
2
3
4
5
6
7
8
@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate, db: AsyncSession = Depends(get_db)):
db_user = User(name=user.name, email=user.email)
db.add(db_user)

await db.commit() # ✅ 提交事务
await db.refresh(db_user) # ✅ 刷新获取最新数据(如自增 ID)
return db_user

📝 记忆点:add → commit → refresh,三步走稳如老狗🐶


🔄 示例 2:手动回滚 + 异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@app.post("/users/rollback/")
async def create_user_with_rollback(user: UserCreate, db: AsyncSession = Depends(get_db)):
try:
db_user = User(name=user.name, email=user.email)
db.add(db_user)

await db.flush() # ⚠️ 提前刷写,可捕获唯一键冲突等约束错误

# 🎭 模拟业务异常
raise ValueError("模拟错误,触发回滚")

await db.commit() # 这行不会执行
return {"message": "用户创建成功"}

except Exception:
await db.rollback() # ✅ 关键!手动回滚
raise HTTPException(status_code=500, detail="事务已回滚")

🔍 flush() vs commit() 对比

方法 作用 是否结束事务 典型场景
flush() 将变更写入 DB,但不提交 ❌ 否 提前检测约束错误、获取自增 ID
commit() 提交事务,变更永久生效 ✅ 是 正常业务结束

💡 技巧:在 commit() 前用 flush() 做”预检”,可提前捕获唯一键冲突等错误,避免部分写入!


🎁 示例 3:上下文管理器自动事务(推荐⭐)

1
2
3
4
5
6
7
8
@app.post("/users/context/")
async def create_user_context(user: UserCreate, db: AsyncSession = Depends(get_db)):
async with db.begin(): # ✅ 自动 begin/commit/rollback
db_user = User(name=user.name, email=user.email)
db.add(db_user)
# 正常结束 → 自动 commit
# 抛出异常 → 自动 rollback
return {"message": "用户创建成功(自动事务)"}

✨ 优势:代码更简洁,永不忘记回滚,推荐作为默认写法!


🧠 事务使用 Checklist(收藏备用✅)

✅ 正确姿势

  • 使用 async with async_session_maker() 获取会话,确保自动关闭
  • 写操作后必须 await db.commit()
  • 异常捕获中必须 await db.rollback()
  • 优先使用 async with db.begin() 自动管理事务
  • 需要提前校验约束时,用 flush() 预刷

⚠️ 避坑指南

  • SQLite 并发能力弱,生产请用 PostgreSQL/MySQL
  • 事务中避免 HTTP 请求、文件 IO 等耗时操作,防止锁等待
  • refresh() 需在 commit() 之后调用,否则可能拿不到最新值
  • 不要复用已关闭的 Session,每次请求新建最安全

🎯 记忆口诀(30 秒记住核心)

1
2
3
4
5
🔹 会话获取用 async with
🔹 写操作后记得 commit
🔹 异常发生立即 rollback
🔹 能用 begin() 就别手动管
🔹 flush 预检,commit 落盘

🗣️ 默念三遍,事务不再翻车!


🧪 运行与测试

1
2
3
4
5
# 启动服务
uvicorn main:app --reload

# 访问文档
👉 http://127.0.0.1:8000/docs

测试建议:

  1. 调用 /users/ 正常创建用户 ✅
  2. 调用 /users/rollback/ 观察回滚效果 + 500 响应 ✅
  3. 调用 /users/context/ 体验自动事务的简洁 ✅
  4. 重复提交相同 name/email,观察唯一约束错误处理 ✅

📚 延伸学习资源


💬 互动时间

你在异步事务中踩过哪些坑?
欢迎在评论区分享你的经验👇
👍 点赞 + ⭐ 收藏,下次写事务不迷路!


小结卡片(截图保存📸)

1
2
3
4
5
6
7
8
9
┌─────────────────────────────┐
│ FastAPI 异步事务三要素 │
├─────────────────────────────┤
│ 1️⃣ 获取会话:async with │
│ 2️⃣ 提交变更:await commit │
│ 3️⃣ 异常回滚:await rollback│
│ │
│ 🎁 进阶:db.begin() 自动管 │
└─────────────────────────────┘

记住这张卡片,事务编写如有神助🚀