哈希算法题目:设计一个基于哈希的分布式队列系统(支持优先级和延迟消息)
字数 479 2025-11-18 21:15:37
哈希算法题目:设计一个基于哈希的分布式队列系统(支持优先级和延迟消息)
题目描述:
设计一个支持优先级和延迟消息的分布式队列系统。该系统需要实现以下功能:
- 支持普通消息、优先级消息(数值越大优先级越高)和延迟消息(指定未来某个时间点才能被消费)
- 多消费者并发消费时保证消息的顺序性
- 高吞吐量下的可靠消息投递
解题步骤:
1. 系统架构设计
- 采用多队列架构:普通队列、优先级队列、延迟队列
- 每个队列使用哈希分片存储,键为队列ID,值为消息链表
- 引入时间轮(Time Wheel)管理延迟消息
2. 数据结构设计
class Message:
def __init__(self, id, content, priority=0, delay_until=0):
self.id = id
self.content = content
self.priority = priority # 默认0为普通消息
self.delay_until = delay_until # 延迟到的时间戳
class DistributedPriorityQueue:
def __init__(self, num_queues=100):
self.queues = {} # 哈希分片存储队列
self.num_queues = num_queues
self.time_wheel = TimeWheel() # 时间轮用于延迟消息
def _get_queue_id(self, message_id):
return hash(message_id) % self.num_queues
3. 消息投递实现
def enqueue(self, message):
if message.delay_until > 0:
# 延迟消息进入时间轮
self.time_wheel.add_message(message)
elif message.priority > 0:
# 优先级消息进入优先级队列
queue_id = self._get_queue_id(message.id)
if queue_id not in self.queues:
self.queues[queue_id] = PriorityQueue()
self.queues[queue_id].put((-message.priority, message)) # 负数实现大优先
else:
# 普通消息
queue_id = self._get_queue_id(message.id)
if queue_id not in self.queues:
self.queues[queue_id] = deque()
self.queues[queue_id].append(message)
4. 时间轮设计
class TimeWheel:
def __init__(self, slots=60):
self.slots = [dict() for _ in range(slots)] # 60个时间槽
self.current_slot = 0
self.timer = threading.Timer(1.0, self.advance) # 每秒前进一格
def add_message(self, message):
# 计算目标时间槽
delay = message.delay_until - time.time()
target_slot = (self.current_slot + int(delay)) % len(self.slots)
slot_key = hash(message.id)
self.slots[target_slot][slot_key] = message
def advance(self):
# 处理当前槽的所有到期消息
current_messages = self.slots[self.current_slot]
for msg_key, message in current_messages.items():
queue_id = hash(message.id) % self.num_queues
if message.priority > 0:
self.queues[queue_id].put((-message.priority, message))
else:
self.queues[queue_id].append(message)
self.slots[self.current_slot].clear()
self.current_slot = (self.current_slot + 1) % len(self.slots)
5. 消息消费实现
def dequeue(self, consumer_id):
# 按优先级顺序检查队列:高优先级 > 普通消息 > 低优先级
for queue_id in range(self.num_queues):
if queue_id in self.queues:
queue = self.queues[queue_id]
if isinstance(queue, PriorityQueue) and not queue.empty():
return queue.get()[1] # 返回优先级最高的消息
elif isinstance(queue, deque) and queue:
return queue.popleft() # 普通队列FIFO
return None
6. 并发控制
- 为每个队列分配独立的锁
- 使用读写锁优化读多写少的场景
- 消费者组管理,确保每个消息只被一个消费者处理
7. 哈希分片策略优化
- 一致性哈希确保扩容时最小化数据迁移
- 虚拟节点实现负载均衡
- 基于消息ID的哈希保证相同消息始终路由到同一队列
这个设计通过哈希分片、优先级队列和时间轮的结合,实现了支持优先级和延迟消息的分布式队列系统,能够在高并发场景下保证消息的顺序性和可靠性。