ZeroMQ,是一个高性能的异步消息库或并发框架。将复杂的底层网络通信细节抽象化,提供了一系列灵活的消息模式,让构建复杂的分布式应用变得更简单。
与传统的中心化消息 Broker 不同,ZeroMQ 倡导一种更加去中心化 (brokerless) 或分布式的设计理念(当然,也可以基于它构建 Broker)。它的核心优势在于:
消息模式 (Messaging Patterns): ZeroMQ 没有让用户直接操作原始的 Socket 去头疼连接、发送、接收、错误处理、重试等细节。相反,它提供了几种经典的、开箱即用的消息模式。每种模式都内置了特定的通信逻辑和扩展性能力,只需要选择适合场景的模式即可。
Socket 的增强 (Sockets on Steroids): ZeroMQ 的 Socket 和传统的 Socket 不是一回事。它们是模式的端点。用户不需要关心底层的连接建立、断开、消息队列管理、错误处理等,ZeroMQ 在内部帮你搞定了。只需要 bind
或 connect
到指定的地址,然后 send
或 recv
消息。
高性能与可伸缩性: ZeroMQ 设计之初就考虑了性能。使用异步 I/O,智能的消息批处理和路由,避免许多传统消息队列的瓶颈。其去中心化的特性也意味着没有中心 Broker 的单点压力和故障风险(除非选择构建 Broker)。
Context (上下文):
context.socket(...)
)。Socket (套接字):
zmq.REQ
, zmq.PUB
等),这个类型决定 Socket 遵循哪种消息模式。send()
和 recv()
方法来发送和接收消息。ZeroMQ 的消息是字节串 (bytes),send_string()
/recv_string()
提供方便的字符串处理。消息可以由多个帧组成 (send_multipart()
/recv_multipart()
)。socket.send()
和 socket.recv()
是阻塞的。如果缓冲区满或没有消息,它们会暂停当前线程的执行。Poller (轮询器):
recv()
会阻塞住,则无法处理其他 Socket 的消息。zmq.Poller
就是用来解决这个问题的。可以向 Poller 注册多个关心的 Socket 和事件(比如 zmq.POLLIN
表示有消息可读)。poller.poll(timeout)
方法。这个方法会阻塞,但它同时监控所有注册的 Socket。一旦有 Socket 发生了关心的事件,poll()
就会返回,告知哪些 Socket 已经准备好了(比如可以调用 recv()
了)。zmq.asyncio
) 时,通常不需要直接使用 zmq.Poller
,因为异步框架(asyncio 事件循环)会负责底层的事件监控和调度。tcp://
:
tcp://host:port
(如 tcp://127.0.0.1:5555
或 tcp://*:5555
)。ipc://
:
tcp://
更快。ipc://pathname
(如 ipc:///tmp/my_socket
).inproc://
:
bind
的 inproc://
地址,在其他进程中是完全不可见且无法连接的。inproc://transport_name
(如 inproc://my_internal_channel
).# zmq_server.py - ZeroMQ 请求应答模式的服务器端
import zmq
import time
# 1. 创建 ZeroMQ Context 对象
# Context 是 ZeroMQ 运行时环境的管理者
context = zmq.Context()
# 2. 创建一个 REP (Reply) Socket
# REP Socket 用于接收请求并发送应答
socket = context.socket(zmq.REP)
# 3. 将 Socket 绑定到一个地址
# "tcp://*:5555" 表示使用 TCP 协议,绑定到所有可用网络接口的 5555 端口
# "*" 表示绑定到所有本地地址,方便客户端连接
bind_address = "tcp://localhost:5555"
socket.bind(bind_address)
print(f"ZeroMQ REP 服务器已启动,绑定在 {bind_address}")
print("等待接收请求...")
try:
# 服务器通常在一个无限循环中运行,不断接收和处理请求
while True:
# 4. 接收请求
# socket.recv_string() 会阻塞,直到收到一个字符串消息
message = socket.recv_string()
print(f"收到请求: '{message}'")
# 模拟处理请求的过程
time.sleep(1) # 假装服务器需要处理一下
# 5. 准备应答消息
reply_message = f"服务器收到你的消息: '{message}'"
# 6. 发送应答
# socket.send_string() 会发送一个字符串应答
# 在 REP-REQ 模式中,REP Socket 必须在发送应答前先接收一个请求
socket.send_string(reply_message)
print(f"发送应答: '{reply_message}'")
except KeyboardInterrupt:
print("\n检测到 Ctrl+C,正在关闭服务器...")
finally:
# 清理 ZeroMQ 资源
socket.close()
context.term()
print("服务器已安全关闭。")
# zmq_client.py - ZeroMQ 请求应答模式的客户端
import zmq
# 1. 创建 ZeroMQ Context 对象
context = zmq.Context()
# 2. 创建一个 REQ (Request) Socket
# REQ Socket 用于发送请求并接收应答
socket = context.socket(zmq.REQ)
# 3. 连接到服务器的地址
# "tcp://localhost:5555" 表示使用 TCP 协议,连接到本地的 5555 端口
# 如果服务器在另一台机器上,请将 'localhost' 替换为服务器的实际 IP 地址
connect_address = "tcp://localhost:5555"
socket.connect(connect_address)
print(f"ZeroMQ REQ 客户端已启动,连接到 {connect_address}")
print("你可以输入消息发送给服务器,输入 'quit' 退出。")
try:
# 客户端通常在一个循环中运行,允许发送多条消息
while True:
# 获取用户输入
user_input = input("请输入消息: ")
# 检查是否输入 'quit'
if user_input.lower() == 'quit':
break
# 4. 发送请求
# socket.send_string() 会发送用户输入的字符串消息
# 在 REQ-REP 模式中,REQ Socket 必须在发送请求后等待一个应答,不能连续发送请求
print(f"发送请求: '{user_input}'")
socket.send_string(user_input)
# 5. 接收应答
# socket.recv_string() 会阻塞,直到收到服务器的应答消息
reply_message = socket.recv_string()
print(f"收到应答: '{reply_message}'")
print("-" * 20) # 分隔线
except KeyboardInterrupt:
print("\n检测到 Ctrl+C,正在关闭客户端...")
finally:
# 清理 ZeroMQ 资源
socket.close()
context.term()
print("客户端已安全关闭。")
默认的 ZeroMQ Socket 是阻塞的。如果Python 应用是基于 asyncio
构建的,那么在一个协程中调用阻塞的 socket.recv()
或 socket.send()
会暂停整个事件循环,导致其他所有协程都无法运行,异步的优势荡然无存。
zmq.asyncio
子模块就是为了解决这个问题而生的。它提供了 ZeroMQ Socket 的异步版本,其 send()
和 recv()
方法变成了可等待的 (awaitable)。
使用 zmq.asyncio
:
zmq.asyncio
,通常取别名 azmq
:import zmq.asyncio as azmq
。context = azmq.Context()
。这个 Context 会自动感知并集成当前的 asyncio
事件循环。socket = context.socket(socket_type)
。从这个 Context 创建的 Socket 具有异步特性。await
调用异步 Socket 方法:await socket.send(...)
, await socket.recv(...)
。await
一个异步 Socket 操作时,如果该操作不能立即完成(比如没有收到消息),当前协程会暂停并让出控制权给 asyncio
事件循环,允许事件循环去执行其他准备好的协程。当 Socket 操作完成后,事件循环会通知并恢复该协程。异步 (Asyncio) Socket 示例 (简版 inproc 通信):
# inproc_asyncio_example.py - 在同一个进程内使用 inproc:// 传输协议
import asyncio
import zmq
import zmq.asyncio as azmq # 使用异步版本
# 定义 inproc 地址
INPROC_ADDRESS = "inproc://my_async_channel"
# 异步 REP Worker 协程 (运行在同一个进程内)
async def async_rep_worker(context: azmq.Context):
# 从传入的异步 Context 创建 REP Socket
socket = context.socket(zmq.REP)
# 绑定到 inproc 地址
socket.bind(INPROC_ADDRESS)
print(f"REP Worker (进程内) 已启动,绑定到 {INPROC_ADDRESS}")
try:
while True:
# 异步接收请求
message = await socket.recv_string()
print(f"REP Worker (进程内) 收到请求: '{message}'")
# 模拟处理
await asyncio.sleep(0.5)
reply = f"REP Worker (进程内) 收到并处理了: '{message}'"
# 异步发送应答
await socket.send_string(reply)
print(f"REP Worker (进程内) 发送应答: '{reply}'")
except asyncio.CancelledError:
print("\nREP Worker (进程内) 被取消,正在退出...")
finally:
socket.close()
print("REP Worker (进程内) Socket 已关闭。")
# 异步 REQ Client 协程 (运行在同一个进程内)
async def async_req_client(context: azmq.Context):
# 从传入的异步 Context 创建 REQ Socket
socket = context.socket(zmq.REQ)
# 连接到 inproc 地址 (注意:这个地址必须在同一个进程内已经被某个 socket 绑定了)
socket.connect(INPROC_ADDRESS)
print(f"REQ Client (进程内) 已启动,连接到 {INPROC_ADDRESS}")
try:
for i in range(3):
request = f"进程内请求 {i+1}"
print(f"REQ Client (进程内) 发送请求: '{request}'")
# 异步发送请求
await socket.send_string(request)
# 异步接收应答
reply = await socket.recv_string()
print(f"REQ Client (进程内) 收到应答: '{reply}'")
await asyncio.sleep(0.1) # 稍微等一下再发下一个请求
finally:
socket.close()
print("REQ Client (进程内) Socket 已关闭。")
# 主异步函数,启动和管理 Worker 和 Client 协程
async def main():
# 在主函数中创建唯一一个异步 Context
# 这个 Context 将用于创建所有需要通过 inproc 通信的 Socket
context = azmq.Context()
print("主程序: Asyncio Context 已创建")
# 使用 asyncio.create_task 启动 Worker 和 Client 协程
# 它们将在同一个事件循环、同一个进程内并发运行
worker_task = asyncio.create_task(async_rep_worker(context))
client_task = asyncio.create_task(async_req_client(context))
# 等待 Client 任务完成它的请求
await client_task
print("主程序: Client 任务完成。")
# Client 任务完成后,取消 Worker 任务以退出
worker_task.cancel()
try:
await worker_task # 等待 Worker 任务响应取消信号
except asyncio.CancelledError:
print("主程序: Worker 任务已被取消。")
# Context 的清理通常由 asyncio.run() 负责,或者可以手动 context.term()
# context.term() # 如果手动管理循环,需要 term
# 程序的入口点
if __name__ == "__main__":
print("--- 进程内 ZeroMQ (inproc) 示例启动 ---")
# 使用 asyncio.run 运行主异步函数
# 这将启动事件循环,并在同一个进程中调度 worker_task 和 client_task
asyncio.run(main())
print("--- 进程内 ZeroMQ (inproc) 示例结束 ---")
此文由 Mix Space 同步更新至 xLog
原始链接为 https://blog.kanes.top/posts/default/zeromq-practice