异步编程新纪元:FastAPI与达梦数据库的异步完美融合

在当今高并发的互联网时代,异步编程已成为提升应用性能的关键技术。今天,我们将深入探讨如何将FastAPI与达梦数据库的异步驱动相结合,构建高性能的异步数据服务。这不仅是对传统同步模式的升级,更是对国产数据库技术栈的一次重要革新。

异步驱动的核心优势

相比同步版本,异步编程具有以下显著优势:

  1. 高并发处理:能够处理数千个并发连接而不会阻塞
  2. 资源高效:减少线程切换开销,提高CPU利用率
  3. 响应迅速:I/O等待期间可以处理其他请求
  4. 可扩展性强:更适合现代微服务和云原生架构

环境准备

1. 安装达梦异步数据库驱动

达梦数据库提供了专门的异步Python驱动程序dmAsync,需要从达梦官网下载并安装:

1
2
3
4
5
# 进入达梦数据库安装目录下的异步驱动文件夹
cd /opt/dmdbms/drivers/python/dmAsync

# 执行安装命令
python setup.py install

2. 安装SQLAlchemy及其他依赖

1
2
pip install fastapi uvicorn sqlalchemy pydantic
pip install sqlalchemy[asyncio] # 安装SQLAlchemy异步支持

核心代码实现

异步数据库连接配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, select, FetchedValue
from sqlalchemy.orm import declarative_base
import asyncio

# 创建异步数据库引擎
# 注意:使用dm+dmAsync协议,这是达梦异步驱动的特有格式
engine = create_async_engine(
"dm+dmAsync://SYSDBA:xxxxx@localhost:5236",
echo=True, # 开启SQL语句日志输出
pool_size=20, # 连接池大小
max_overflow=10, # 最大溢出连接数
pool_pre_ping=True # 连接池预检,确保连接有效
)

# 创建异步会话工厂
AsyncSessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
class_=AsyncSession, # 指定使用异步Session类
expire_on_commit=False # 提交后不使对象过期
)

# 异步依赖注入:获取数据库会话
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()

Base = declarative_base()

异步数据模型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Item(Base):
__tablename__ = "items"

# 使用server_default处理自增列
id = Column(Integer, primary_key=True, autoincrement=True, server_default=FetchedValue())

# 明确设置字段长度和约束
name = Column(String(255), nullable=False, comment="商品名称")
description = Column(String(500), comment="商品描述")
price = Column(Integer, comment="商品价格")

# 添加时间戳字段(可选)
created_at = Column(String(20), server_default=FetchedValue(), comment="创建时间")
updated_at = Column(String(20), server_default=FetchedValue(), comment="更新时间")

Pydantic模型定义(支持异步验证)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from pydantic import BaseModel, ConfigDict, Field
from typing import List, Optional
from datetime import datetime

class ItemBase(BaseModel):
name: str = Field(..., min_length=1, max_length=255, description="商品名称")
description: Optional[str] = Field(None, max_length=500, description="商品描述")
price: int = Field(..., ge=0, description="商品价格")

class ItemCreate(ItemBase):
pass

class ItemUpdate(ItemBase):
name: Optional[str] = Field(None, min_length=1, max_length=255)
price: Optional[int] = Field(None, ge=0)

class ItemResponse(ItemBase):
id: int
created_at: Optional[str] = None
updated_at: Optional[str] = None
model_config = ConfigDict(from_attributes=True)

FastAPI异步应用实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from fastapi import FastAPI, Depends, HTTPException, Query, status
from contextlib import asynccontextmanager
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 使用生命周期管理器的现代方式
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时:创建数据库表
logger.info("Starting up... Creating database tables")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# 关闭时:清理资源
logger.info("Shutting down... Closing database connections")
await engine.dispose()

app = FastAPI(
title="达梦异步API服务",
description="基于FastAPI和达梦异步驱动的RESTful API",
version="1.0.0",
lifespan=lifespan # 使用异步生命周期管理器
)

异步API端点实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
@app.post("/items/create_item", 
response_model=ItemResponse,
status_code=status.HTTP_201_CREATED,
summary="创建商品",
description="创建一个新的商品记录")
async def create_item(item: ItemCreate, db: AsyncSession = Depends(get_db)):
"""
创建新的商品

- **name**: 商品名称(必填)
- **description**: 商品描述(可选)
- **price**: 商品价格(必须大于等于0)
"""
try:
# 构建新商品对象
db_item = Item(
name=item.name,
description=item.description,
price=item.price
)

# 添加到会话
db.add(db_item)

# 提交事务
await db.commit()

# 刷新获取自增ID
await db.refresh(db_item)

logger.info(f"商品创建成功: ID={db_item.id}, Name={db_item.name}")
return db_item

except Exception as e:
await db.rollback()
logger.error(f"商品创建失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"创建商品失败: {str(e)}"
)

@app.get("/items/read_items",
response_model=List[ItemResponse],
summary="获取商品列表",
description="分页获取所有商品列表")
async def read_items(
skip: int = Query(0, ge=0, description="跳过记录数"),
limit: int = Query(100, ge=1, le=1000, description="每页记录数"),
db: AsyncSession = Depends(get_db)
):
"""
获取商品列表,支持分页

- **skip**: 跳过的记录数(默认0)
- **limit**: 每页记录数(默认100,最大1000)
"""
try:
# 执行异步查询
result = await db.execute(
select(Item)
.order_by(Item.id)
.offset(skip)
.limit(limit)
)
items = result.scalars().all()

# 获取总数(可选)
count_result = await db.execute(select(func.count()).select_from(Item))
total = count_result.scalar()

logger.info(f"获取商品列表成功: 总数={total}, 本次返回={len(items)}")
return items

except Exception as e:
logger.error(f"获取商品列表失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"读取商品列表失败: {str(e)}"
)

@app.get("/items/read_item/{item_id}",
response_model=ItemResponse,
summary="获取单个商品",
description="根据ID获取单个商品的详细信息")
async def read_item(
item_id: int = Query(..., ge=1, description="商品ID"),
db: AsyncSession = Depends(get_db)
):
"""
根据ID获取单个商品详情
"""
try:
# 使用异步get方法
item = await db.get(Item, item_id)
if item is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="商品不存在"
)

logger.info(f"获取商品成功: ID={item_id}")
return item

except HTTPException:
raise
except Exception as e:
logger.error(f"获取商品失败: ID={item_id}, 错误={str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"读取商品失败: {str(e)}"
)

@app.put("/items/update_item/{item_id}",
response_model=ItemResponse,
summary="更新商品",
description="根据ID更新商品信息")
async def update_item(
item_id: int,
item: ItemUpdate,
db: AsyncSession = Depends(get_db)
):
"""
更新商品信息

支持部分更新,只更新提供的字段
"""
try:
# 获取现有商品
db_item = await db.get(Item, item_id)
if db_item is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="商品不存在"
)

# 只更新提供的字段
update_data = item.dict(exclude_unset=True)
for field, value in update_data.items():
if value is not None:
setattr(db_item, field, value)

# 标记为已修改
db.add(db_item)
await db.commit()
await db.refresh(db_item)

logger.info(f"商品更新成功: ID={item_id}")
return db_item

except HTTPException:
raise
except Exception as e:
await db.rollback()
logger.error(f"商品更新失败: ID={item_id}, 错误={str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"更新商品失败: {str(e)}"
)

@app.delete("/items/delete_item/{item_id}",
summary="删除商品",
description="根据ID删除商品")
async def delete_item(
item_id: int,
db: AsyncSession = Depends(get_db)
):
"""
删除指定ID的商品
"""
try:
db_item = await db.get(Item, item_id)
if db_item is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="商品不存在"
)

await db.delete(db_item)
await db.commit()

logger.info(f"商品删除成功: ID={item_id}")
return {
"message": "商品删除成功",
"item_id": item_id,
"timestamp": datetime.now().isoformat()
}

except HTTPException:
raise
except Exception as e:
await db.rollback()
logger.error(f"商品删除失败: ID={item_id}, 错误={str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"删除商品失败: {str(e)}"
)

高级功能:批量操作与事务管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from sqlalchemy import insert, update, delete
from fastapi import BackgroundTasks

@app.post("/items/batch_create",
summary="批量创建商品",
description="一次性创建多个商品")
async def batch_create_items(
items: List[ItemCreate],
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""
批量创建商品,提高效率
"""
try:
db_items = [
Item(**item.dict())
for item in items
]

db.add_all(db_items)
await db.commit()

# 为每个新商品刷新
for db_item in db_items:
await db.refresh(db_item)

# 异步后处理(示例)
background_tasks.add_task(process_items_async, db_items)

logger.info(f"批量创建成功: 数量={len(db_items)}")
return db_items

except Exception as e:
await db.rollback()
logger.error(f"批量创建失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"批量创建失败: {str(e)}"
)

async def process_items_async(items: List[Item]):
"""异步后处理任务"""
# 这里可以添加索引更新、缓存刷新等异步操作
logger.info(f"开始异步处理 {len(items)} 个商品")
await asyncio.sleep(1) # 模拟异步操作
logger.info(f"异步处理完成")

# 使用事务装饰器的高级用法
from sqlalchemy.ext.asyncio import async_transaction

@app.post("/items/transaction_example")
async def transaction_example(db: AsyncSession = Depends(get_db)):
"""
复杂事务操作示例
"""
try:
# 开始事务
async with async_transaction(db):
# 操作1
item1 = Item(name="商品1", price=100)
db.add(item1)

# 操作2
item2 = Item(name="商品2", price=200)
db.add(item2)

# 如果所有操作成功,事务自动提交
# 如果出现异常,事务自动回滚

return {"message": "事务执行成功"}

except Exception as e:
logger.error(f"事务执行失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"事务执行失败: {str(e)}"
)

运行异步应用

1
2
3
4
5
6
# 启动异步FastAPI应用
# --workers 参数可以指定工作进程数
uvicorn main:app --reload --host 0.0.0.0 --port 8000 --workers 4

# 生产环境推荐使用
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 --loop asyncio

性能测试与对比

同步 vs 异步性能对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 性能测试脚本示例
import asyncio
import aiohttp
import time
import httpx

async def async_test():
"""异步请求测试"""
async with httpx.AsyncClient() as client:
tasks = []
for i in range(100):
task = client.post(
"http://localhost:8000/items/create_item",
json={"name": f"测试商品{i}", "price": i * 10}
)
tasks.append(task)

start = time.time()
responses = await asyncio.gather(*tasks)
end = time.time()

print(f"异步100次请求耗时: {end - start:.2f}秒")

def sync_test():
"""同步请求测试"""
import requests
start = time.time()
for i in range(100):
requests.post(
"http://localhost:8000/items/create_item",
json={"name": f"测试商品{i}", "price": i * 10}
)
end = time.time()
print(f"同步100次请求耗时: {end - start:.2f}秒")

最佳实践建议

1. 连接池优化

1
2
3
4
5
6
7
8
engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_timeout=30,
pool_recycle=3600,
pool_pre_ping=True
)

2. 错误重试机制

1
2
3
4
5
6
7
8
9
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def safe_db_operation():
# 数据库操作
pass

3. 监控与日志

1
2
3
4
5
# 添加SQL查询日志
import logging

logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

4. 健康检查端点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@app.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
"""数据库健康检查"""
try:
# 执行简单查询检查连接
result = await db.execute(select(1))
return {
"status": "healthy",
"database": "connected",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"数据库连接失败: {str(e)}"
)

常见问题与解决方案

问题1:连接超时

解决方案:调整连接池参数和超时设置

问题2:自增ID获取

解决方案:使用server_default=FetchedValue()配合refresh()

问题3:事务管理

解决方案:使用async_transaction上下文管理器

问题4:性能瓶颈

解决方案

  1. 启用查询缓存
  2. 使用批量操作
  3. 优化数据库索引

总结

通过本文的异步实现,我们展示了FastAPI与达梦数据库异步驱动的完美结合。这种架构不仅提升了系统的并发处理能力,还为构建高性能、高可用的现代应用提供了坚实的基础。

核心价值

  • ✅ 完全异步,支持高并发
  • ✅ 国产数据库兼容,符合信创要求
  • ✅ 现代化API设计,开发效率高
  • ✅ 完善的错误处理和事务管理
  • ✅ 易于扩展和维护

异步编程是现代Web开发的必然趋势,而FastAPI与达梦数据库的结合为国产技术栈的发展开辟了新的道路。无论你是构建微服务、数据中台还是企业级应用,这套技术组合都能提供强大的支持。

赶快尝试这个异步方案,体验前所未有的性能和开发效率吧!