🐍 10 分钟上手 Kafka:用 Python 实现生产者与消费者
在微服务、日志收集、事件驱动架构中,Kafka 几乎成了标配。但很多开发者面对 Kafka 仍觉得“高深莫测”。
别担心!今天我们就用 不到 100 行 Python 代码,带你从零跑通 Kafka 的 创建 Topic、发送消息、消费消息 全流程,真正实现“跑起来再说”!
✅ 为什么选 confluent-kafka?
Kafka 的 Python 客户端主要有两个:
kafka-python:纯 Python 实现,安装简单但性能一般
confluent-kafka:基于 C 的 librdkafka 封装,官方推荐、性能极佳
本文采用 confluent-kafka,兼顾开发效率与生产可用性。
安装命令:
1
| pip install confluent-kafka
|
(如果你用 uv,也可以 uv pip install confluent-kafka)
🛠 前提准备
确保本地 Kafka 服务已启动。最简单方式是用 Docker:
如果你已有 Kafka(比如通过 Docker Compose 部署),跳过此步即可。
🧩 完整代码:生产 + 消费一键跑通
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| import json import time from confluent_kafka import Producer, Consumer from confluent_kafka.admin import AdminClient, NewTopic
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092" TOPIC_NAME = "test-topic"
def create_topic(): admin = AdminClient({"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS}) topic = NewTopic(TOPIC_NAME, num_partitions=1, replication_factor=1) fs = admin.create_topics([topic]) for _, f in fs.items(): try: f.result() print("✅ Topic 创建成功") except Exception as e: if "TOPIC_ALREADY_EXISTS" in str(e): print("✅ Topic 已存在") else: print("❌ 创建失败:", e)
def produce_messages(): p = Producer({"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS}) messages = [ {"user_id": 1, "action": "login"}, {"user_id": 2, "action": "purchase"}, {"user_id": 3, "action": "logout"}, ] for msg in messages: p.produce(TOPIC_NAME, key=str(msg["user_id"]), value=json.dumps(msg)) print(f"📤 发送: {msg}") p.flush()
def consume_messages(): c = Consumer({ "bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS, "group.id": "demo-group", "auto.offset.reset": "earliest" }) c.subscribe([TOPIC_NAME]) print("👂 开始监听消息(10秒后自动退出)...") start = time.time() while time.time() - start < 10: msg = c.poll(1.0) if msg is None: continue if msg.error(): print("❌ 消费错误:", msg.error()) continue print(f"📥 收到: key={msg.key().decode()}, value={msg.value().decode()}") c.close()
if __name__ == "__main__": create_topic() produce_messages() consume_messages()
|
▶️ 运行 & 效果
执行脚本:
输出示例:
1 2 3 4 5 6 7 8
| ✅ Topic 已存在 📤 发送: {'user_id': 1, 'action': 'login'} 📤 发送: {'user_id': 2, 'action': 'purchase'} 📤 发送: {'user_id': 3, 'action': 'logout'} 👂 开始监听消息(10秒后自动退出)... 📥 收到: key=1, value={"user_id": 1, "action": "login"} 📥 收到: key=2, value={"user_id": 2, "action": "purchase"} 📥 收到: key=3, value={"user_id": 3, "action": "logout"}
|
完美!消息从生产到消费,一气呵成。
💡 小贴士
- 生产环境不要在代码里创建 Topic,应由运维统一管理。
group.id 是消费者组标识,同一组内消息只能被一个实例消费。
- 消息格式建议用 JSON,方便后续处理与兼容。
- 如果连接远程 Kafka,记得检查
advertised.listeners 配置是否允许外部访问。