RabbitMQ结合FastAPI的使用案例

一、简单消息发送与消费案例

1. 安装依赖
1
pip install fastapi uvicorn pika
2. 代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from fastapi import FastAPI
import pika
import threading
import time
from pika.exceptions import AMQPError

app = FastAPI()

# RabbitMQ连接配置
RABBITMQ_CONFIG = {
"host": "localhost",
"port": 5672,
"username": "guest",
"password": "guest",
"queue": "simple_queue"
}

def connect_rabbitmq():
"""创建RabbitMQ连接和通道"""
credentials = pika.PlainCredentials(
RABBITMQ_CONFIG["username"],
RABBITMQ_CONFIG["password"]
)
parameters = pika.ConnectionParameters(
host=RABBITMQ_CONFIG["host"],
port=RABBITMQ_CONFIG["port"],
credentials=credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=RABBITMQ_CONFIG["queue"], durable=True)
return connection, channel

def message_consumer():
"""消息消费者(后台运行)"""
try:
connection, channel = connect_rabbitmq()

def callback(ch, method, properties, body):
print(f"Received message: {body.decode()}")
# 手动确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)

# 公平分发消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=RABBITMQ_CONFIG["queue"],
on_message_callback=callback
)

print("Consumer started, waiting for messages...")
channel.start_consuming()
except AMQPError as e:
print(f"RabbitMQ error: {e}")
time.sleep(5) # 重试间隔
message_consumer()

# 启动消费者线程
threading.Thread(target=message_consumer, daemon=True).start()

@app.post("/send-message")
async def send_message(message: str):
"""发送消息API"""
try:
connection, channel = connect_rabbitmq()
channel.basic_publish(
exchange="",
routing_key=RABBITMQ_CONFIG["queue"],
body=message.encode(),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
)
)
connection.close()
return {"status": "success", "message": "Message sent"}
except AMQPError as e:
return {"status": "error", "message": f"Failed to send message: {str(e)}"}

if __name__ == "__main__":
import uvicorn
import os

app_modeel_name = os.path.basename(__file__).replace(".py", "")
print(app_modeel_name)
uvicorn.run(f"{app_modeel_name}:app", host='127.0.0.1', reload=True)
3. 运行方式
1
uvicorn main:app --reload

发送消息:POST http://localhost:8000/send-message?message=Hello RabbitMQ

二、死信队列案例

1. 死信队列配置说明
  • 普通队列设置 x-dead-letter-exchangex-dead-letter-routing-key 参数
  • 消息因过期、被拒绝或队列达到长度限制时进入死信队列
2. 代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from fastapi import FastAPI
import pika
import threading
import time
from pika.exceptions import AMQPError

app = FastAPI()

# RabbitMQ配置
RABBITMQ_CONFIG = {
"host": "localhost",
"port": 5672,
"username": "guest",
"password": "guest",
"normal_queue": "normal_queue",
"dead_letter_queue": "dead_letter_queue",
"dead_letter_exchange": "dead_letter_exchange",
"dead_letter_routing_key": "dead_letter_key"
}

def connect_rabbitmq():
"""创建RabbitMQ连接和通道"""
credentials = pika.PlainCredentials(
RABBITMQ_CONFIG["username"],
RABBITMQ_CONFIG["password"]
)
parameters = pika.ConnectionParameters(
host=RABBITMQ_CONFIG["host"],
port=RABBITMQ_CONFIG["port"],
credentials=credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# 声明死信交换机和死信队列
channel.exchange_declare(
exchange=RABBITMQ_CONFIG["dead_letter_exchange"],
exchange_type="direct",
durable=True
)
channel.queue_declare(
queue=RABBITMQ_CONFIG["dead_letter_queue"],
durable=True
)
channel.queue_bind(
queue=RABBITMQ_CONFIG["dead_letter_queue"],
exchange=RABBITMQ_CONFIG["dead_letter_exchange"],
routing_key=RABBITMQ_CONFIG["dead_letter_routing_key"]
)

# 声明普通队列(设置死信参数)
arguments = {
"x-dead-letter-exchange": RABBITMQ_CONFIG["dead_letter_exchange"],
"x-dead-letter-routing-key": RABBITMQ_CONFIG["dead_letter_routing_key"],
"x-message-ttl": 10000, # 消息过期时间(10秒)
"x-max-length": 5 # 队列最大长度限制
}
channel.queue_declare(
queue=RABBITMQ_CONFIG["normal_queue"],
durable=True,
arguments=arguments
)

return connection, channel

def normal_consumer():
"""普通队列消费者"""
try:
connection, channel = connect_rabbitmq()

def callback(ch, method, properties, body):
print(f"Normal queue received: {body.decode()}")
# 拒绝消息(会进入死信队列)
# ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
# 或者不确认消息(连接关闭后消息会进入死信队列)

channel.basic_consume(
queue=RABBITMQ_CONFIG["normal_queue"],
on_message_callback=callback
)

print("Normal consumer started...")
channel.start_consuming()
except AMQPError as e:
print(f"Normal consumer error: {e}")
time.sleep(5)
normal_consumer()

def dead_letter_consumer():
"""死信队列消费者"""
try:
connection, channel = connect_rabbitmq()

def callback(ch, method, properties, body):
print(f"Dead letter queue received: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
queue=RABBITMQ_CONFIG["dead_letter_queue"],
on_message_callback=callback
)

print("Dead letter consumer started...")
channel.start_consuming()
except AMQPError as e:
print(f"Dead letter consumer error: {e}")
time.sleep(5)
dead_letter_consumer()

# 启动消费者线程
threading.Thread(target=normal_consumer, daemon=True).start()
threading.Thread(target=dead_letter_consumer, daemon=True).start()

@app.post("/send-dlq-message")
async def send_dlq_message(message: str):
"""发送到普通队列(会过期进入死信队列)"""
try:
connection, channel = connect_rabbitmq()
channel.basic_publish(
exchange="",
routing_key=RABBITMQ_CONFIG["normal_queue"],
body=message.encode(),
properties=pika.BasicProperties(
delivery_mode=2, # 持久化消息
)
)
connection.close()
return {"status": "success", "message": "Message sent to normal queue"}
except AMQPError as e:
return {"status": "error", "message": f"Failed to send message: {str(e)}"}

if __name__ == "__main__":
import uvicorn
import os

app_modeel_name = os.path.basename(__file__).replace(".py", "")
print(app_modeel_name)
uvicorn.run(f"{app_modeel_name}:app", host='127.0.0.1', reload=True)
2. 死信触发场景
  1. 消息过期:10秒内未被消费自动进入死信队列
  2. 消息被拒绝:调用 basic_rejectbasic_nackrequeue=False
  3. 队列溢出:超过 x-max-length 限制时,最早的消息进入死信队列
3. 运行方式
1
uvicorn main:app --reload

发送消息:POST http://localhost:8000/send-dlq-message?message=This will expire

注意事项

  1. 确保RabbitMQ服务已启动:rabbitmq-server
  2. 管理界面:http://localhost:15672(默认账号guest/guest)
  3. 生产环境需添加连接池管理和错误重试机制
  4. 死信队列常用于消息重试、延迟任务和异常消息处理