1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
| from fastapi import FastAPI import pika import threading import time from pika.exceptions import AMQPError
app = FastAPI()
RABBITMQ_CONFIG = { "host": "localhost", "port": 5672, "username": "guest", "password": "guest", "normal_queue": "normal_queue", "dead_letter_queue": "dead_letter_queue", "dead_letter_exchange": "dead_letter_exchange", "dead_letter_routing_key": "dead_letter_key" }
def connect_rabbitmq(): """创建RabbitMQ连接和通道""" credentials = pika.PlainCredentials( RABBITMQ_CONFIG["username"], RABBITMQ_CONFIG["password"] ) parameters = pika.ConnectionParameters( host=RABBITMQ_CONFIG["host"], port=RABBITMQ_CONFIG["port"], credentials=credentials ) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare( exchange=RABBITMQ_CONFIG["dead_letter_exchange"], exchange_type="direct", durable=True ) channel.queue_declare( queue=RABBITMQ_CONFIG["dead_letter_queue"], durable=True ) channel.queue_bind( queue=RABBITMQ_CONFIG["dead_letter_queue"], exchange=RABBITMQ_CONFIG["dead_letter_exchange"], routing_key=RABBITMQ_CONFIG["dead_letter_routing_key"] ) arguments = { "x-dead-letter-exchange": RABBITMQ_CONFIG["dead_letter_exchange"], "x-dead-letter-routing-key": RABBITMQ_CONFIG["dead_letter_routing_key"], "x-message-ttl": 10000, "x-max-length": 5 } channel.queue_declare( queue=RABBITMQ_CONFIG["normal_queue"], durable=True, arguments=arguments ) return connection, channel
def normal_consumer(): """普通队列消费者""" try: connection, channel = connect_rabbitmq() def callback(ch, method, properties, body): print(f"Normal queue received: {body.decode()}") channel.basic_consume( queue=RABBITMQ_CONFIG["normal_queue"], on_message_callback=callback ) print("Normal consumer started...") channel.start_consuming() except AMQPError as e: print(f"Normal consumer error: {e}") time.sleep(5) normal_consumer()
def dead_letter_consumer(): """死信队列消费者""" try: connection, channel = connect_rabbitmq() def callback(ch, method, properties, body): print(f"Dead letter queue received: {body.decode()}") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume( queue=RABBITMQ_CONFIG["dead_letter_queue"], on_message_callback=callback ) print("Dead letter consumer started...") channel.start_consuming() except AMQPError as e: print(f"Dead letter consumer error: {e}") time.sleep(5) dead_letter_consumer()
threading.Thread(target=normal_consumer, daemon=True).start() threading.Thread(target=dead_letter_consumer, daemon=True).start()
@app.post("/send-dlq-message") async def send_dlq_message(message: str): """发送到普通队列(会过期进入死信队列)""" try: connection, channel = connect_rabbitmq() channel.basic_publish( exchange="", routing_key=RABBITMQ_CONFIG["normal_queue"], body=message.encode(), properties=pika.BasicProperties( delivery_mode=2, ) ) connection.close() return {"status": "success", "message": "Message sent to normal queue"} except AMQPError as e: return {"status": "error", "message": f"Failed to send message: {str(e)}"} if __name__ == "__main__": import uvicorn import os
app_modeel_name = os.path.basename(__file__).replace(".py", "") print(app_modeel_name) uvicorn.run(f"{app_modeel_name}:app", host='127.0.0.1', reload=True)
|