哈希算法题目:设计一个基于哈希的分布式队列系统(支持优先级和延迟消息)
字数 479 2025-11-18 21:15:37

哈希算法题目:设计一个基于哈希的分布式队列系统(支持优先级和延迟消息)

题目描述
设计一个支持优先级和延迟消息的分布式队列系统。该系统需要实现以下功能:

  1. 支持普通消息、优先级消息(数值越大优先级越高)和延迟消息(指定未来某个时间点才能被消费)
  2. 多消费者并发消费时保证消息的顺序性
  3. 高吞吐量下的可靠消息投递

解题步骤

1. 系统架构设计

  • 采用多队列架构:普通队列、优先级队列、延迟队列
  • 每个队列使用哈希分片存储,键为队列ID,值为消息链表
  • 引入时间轮(Time Wheel)管理延迟消息

2. 数据结构设计

class Message:
    def __init__(self, id, content, priority=0, delay_until=0):
        self.id = id
        self.content = content
        self.priority = priority  # 默认0为普通消息
        self.delay_until = delay_until  # 延迟到的时间戳

class DistributedPriorityQueue:
    def __init__(self, num_queues=100):
        self.queues = {}  # 哈希分片存储队列
        self.num_queues = num_queues
        self.time_wheel = TimeWheel()  # 时间轮用于延迟消息
        
    def _get_queue_id(self, message_id):
        return hash(message_id) % self.num_queues

3. 消息投递实现

def enqueue(self, message):
    if message.delay_until > 0:
        # 延迟消息进入时间轮
        self.time_wheel.add_message(message)
    elif message.priority > 0:
        # 优先级消息进入优先级队列
        queue_id = self._get_queue_id(message.id)
        if queue_id not in self.queues:
            self.queues[queue_id] = PriorityQueue()
        self.queues[queue_id].put((-message.priority, message))  # 负数实现大优先
    else:
        # 普通消息
        queue_id = self._get_queue_id(message.id)
        if queue_id not in self.queues:
            self.queues[queue_id] = deque()
        self.queues[queue_id].append(message)

4. 时间轮设计

class TimeWheel:
    def __init__(self, slots=60):
        self.slots = [dict() for _ in range(slots)]  # 60个时间槽
        self.current_slot = 0
        self.timer = threading.Timer(1.0, self.advance)  # 每秒前进一格
        
    def add_message(self, message):
        # 计算目标时间槽
        delay = message.delay_until - time.time()
        target_slot = (self.current_slot + int(delay)) % len(self.slots)
        slot_key = hash(message.id)
        self.slots[target_slot][slot_key] = message
        
    def advance(self):
        # 处理当前槽的所有到期消息
        current_messages = self.slots[self.current_slot]
        for msg_key, message in current_messages.items():
            queue_id = hash(message.id) % self.num_queues
            if message.priority > 0:
                self.queues[queue_id].put((-message.priority, message))
            else:
                self.queues[queue_id].append(message)
        self.slots[self.current_slot].clear()
        self.current_slot = (self.current_slot + 1) % len(self.slots)

5. 消息消费实现

def dequeue(self, consumer_id):
    # 按优先级顺序检查队列:高优先级 > 普通消息 > 低优先级
    for queue_id in range(self.num_queues):
        if queue_id in self.queues:
            queue = self.queues[queue_id]
            if isinstance(queue, PriorityQueue) and not queue.empty():
                return queue.get()[1]  # 返回优先级最高的消息
            elif isinstance(queue, deque) and queue:
                return queue.popleft()  # 普通队列FIFO
    return None

6. 并发控制

  • 为每个队列分配独立的锁
  • 使用读写锁优化读多写少的场景
  • 消费者组管理,确保每个消息只被一个消费者处理

7. 哈希分片策略优化

  • 一致性哈希确保扩容时最小化数据迁移
  • 虚拟节点实现负载均衡
  • 基于消息ID的哈希保证相同消息始终路由到同一队列

这个设计通过哈希分片、优先级队列和时间轮的结合,实现了支持优先级和延迟消息的分布式队列系统,能够在高并发场景下保证消息的顺序性和可靠性。

哈希算法题目:设计一个基于哈希的分布式队列系统(支持优先级和延迟消息) 题目描述 : 设计一个支持优先级和延迟消息的分布式队列系统。该系统需要实现以下功能: 支持普通消息、优先级消息(数值越大优先级越高)和延迟消息(指定未来某个时间点才能被消费) 多消费者并发消费时保证消息的顺序性 高吞吐量下的可靠消息投递 解题步骤 : 1. 系统架构设计 采用多队列架构:普通队列、优先级队列、延迟队列 每个队列使用哈希分片存储,键为队列ID,值为消息链表 引入时间轮(Time Wheel)管理延迟消息 2. 数据结构设计 3. 消息投递实现 4. 时间轮设计 5. 消息消费实现 6. 并发控制 为每个队列分配独立的锁 使用读写锁优化读多写少的场景 消费者组管理,确保每个消息只被一个消费者处理 7. 哈希分片策略优化 一致性哈希确保扩容时最小化数据迁移 虚拟节点实现负载均衡 基于消息ID的哈希保证相同消息始终路由到同一队列 这个设计通过哈希分片、优先级队列和时间轮的结合,实现了支持优先级和延迟消息的分布式队列系统,能够在高并发场景下保证消息的顺序性和可靠性。