哈希算法题目:设计一个基于哈希的分布式唯一ID生成器
字数 597 2025-11-03 00:20:06

哈希算法题目:设计一个基于哈希的分布式唯一ID生成器

题目描述
设计一个分布式唯一ID生成器,要求在多台机器上同时生成全局唯一的ID。ID需要满足以下条件:

  1. 全局唯一性:不同机器生成的ID不能重复
  2. 粗略有序性:ID应该大致按照时间顺序递增
  3. 高可用性:系统需要支持高并发请求
  4. 可扩展性:能够支持机器数量的动态增减

解题过程

第一步:分析需求与约束

  • 唯一性是最核心要求,需要避免不同机器产生相同ID
  • 有序性有助于数据库索引性能
  • 系统需要支持每秒数万次的ID生成请求
  • 考虑时钟回拨等异常情况

第二步:雪花算法(Snowflake)原理
采用Twitter的雪花算法思想,将64位ID划分为多个部分:

  • 1位符号位(始终为0)
  • 41位时间戳(毫秒级,可用约69年)
  • 10位机器ID(最多支持1024台机器)
  • 12位序列号(每毫秒最多生成4096个ID)

第三步:机器ID分配方案
使用哈希算法解决机器ID的分配问题:

class MachineIDAllocator:
    def __init__(self, total_machines=1024):
        self.total_machines = total_machines
        self.machine_ids = set()
    
    def assign_machine_id(self, machine_ip):
        # 使用一致性哈希计算机器ID
        hash_value = self.fnv1a_hash(machine_ip)
        machine_id = hash_value % self.total_machines
        
        # 处理哈希冲突
        while machine_id in self.machine_ids:
            machine_id = (machine_id + 1) % self.total_machines
        
        self.machine_ids.add(machine_id)
        return machine_id
    
    def fnv1a_hash(self, data):
        """FNV-1a哈希算法,分布均匀"""
        hash = 14695981039346656037
        for byte in data.encode('utf-8'):
            hash ^= byte
            hash *= 1099511628211
        return hash

第四步:ID生成器核心实现

import time
import threading

class DistributedIDGenerator:
    def __init__(self, machine_id):
        self.machine_id = machine_id
        self.sequence = 0
        self.last_timestamp = -1
        self.sequence_bits = 12
        self.machine_bits = 10
        self.max_sequence = (1 << self.sequence_bits) - 1
        self.lock = threading.Lock()
    
    def next_id(self):
        with self.lock:
            timestamp = self.current_time_millis()
            
            # 处理时钟回拨
            if timestamp < self.last_timestamp:
                raise Exception("时钟回拨异常")
            
            # 同一毫秒内生成多个ID
            if timestamp == self.last_timestamp:
                self.sequence = (self.sequence + 1) & self.max_sequence
                if self.sequence == 0:  # 序列号用尽,等待下一毫秒
                    timestamp = self.wait_next_millis(self.last_timestamp)
            else:
                self.sequence = 0
            
            self.last_timestamp = timestamp
            
            # 组合各个部分生成最终ID
            return ((timestamp << (self.machine_bits + self.sequence_bits)) |
                   (self.machine_id << self.sequence_bits) |
                   self.sequence)
    
    def wait_next_millis(self, last_timestamp):
        timestamp = self.current_time_millis()
        while timestamp <= last_timestamp:
            time.sleep(0.001)
            timestamp = self.current_time_millis()
        return timestamp
    
    def current_time_millis(self):
        return int(time.time() * 1000)

第五步:处理时钟回拨的增强方案

class EnhancedIDGenerator(DistributedIDGenerator):
    def __init__(self, machine_id):
        super().__init__(machine_id)
        self.clock_offset = 0
    
    def handle_clock_drift(self, current_time):
        """处理时钟回拨,使用历史最大时间戳"""
        if current_time < self.last_timestamp:
            # 小的回拨,等待时间追平
            time_diff = self.last_timestamp - current_time
            if time_diff < 1000:  # 回拨小于1秒
                time.sleep(time_diff / 1000.0)
                return self.current_time_millis()
            else:
                # 大的回拨,需要人工干预或使用备用方案
                raise Exception("检测到大的时钟回拨,需要人工处理")
        return current_time

第六步:分布式协调与容错

class DistributedCoordinator:
    def __init__(self, zookeeper_servers):
        self.zk_servers = zookeeper_servers
        self.allocated_ids = set()
    
    def acquire_machine_id(self, machine_ip):
        """通过分布式协调服务获取机器ID"""
        # 尝试连接ZooKeeper等协调服务
        # 在持久节点中注册机器信息
        # 使用临时节点处理机器故障
        # 返回分配的机器ID
        
        # 简化实现:使用基于IP的哈希分配
        base_id = hash(machine_ip) % 1024
        return self.find_available_id(base_id)
    
    def find_available_id(self, base_id):
        """寻找可用的机器ID"""
        for offset in range(1024):
            candidate = (base_id + offset) % 1024
            if candidate not in self.allocated_ids:
                self.allocated_ids.add(candidate)
                return candidate
        raise Exception("没有可用的机器ID")

第七步:测试验证

def test_id_generator():
    # 模拟多台机器同时生成ID
    generators = []
    for i in range(3):
        allocator = MachineIDAllocator()
        machine_id = allocator.assign_machine_id(f"192.168.1.{i+1}")
        generator = DistributedIDGenerator(machine_id)
        generators.append(generator)
    
    # 并发测试
    generated_ids = set()
    for i in range(1000):
        for gen in generators:
            new_id = gen.next_id()
            assert new_id not in generated_ids, "ID重复生成!"
            generated_ids.add(new_id)
    
    print(f"成功生成 {len(generated_ids)} 个唯一ID")
    print("所有ID均唯一,测试通过")

总结
这个基于哈希的分布式唯一ID生成器通过以下方式满足需求:

  1. 使用机器ID哈希分配确保全局唯一性
  2. 时间戳前缀保证粗略有序性
  3. 序列号机制支持高并发
  4. 机器ID动态分配支持可扩展性
  5. 时钟回拨处理确保系统稳定性

该设计在Twitter、百度等公司的分布式系统中得到实际应用验证。

哈希算法题目:设计一个基于哈希的分布式唯一ID生成器 题目描述 设计一个分布式唯一ID生成器,要求在多台机器上同时生成全局唯一的ID。ID需要满足以下条件: 全局唯一性:不同机器生成的ID不能重复 粗略有序性:ID应该大致按照时间顺序递增 高可用性:系统需要支持高并发请求 可扩展性:能够支持机器数量的动态增减 解题过程 第一步:分析需求与约束 唯一性是最核心要求,需要避免不同机器产生相同ID 有序性有助于数据库索引性能 系统需要支持每秒数万次的ID生成请求 考虑时钟回拨等异常情况 第二步:雪花算法(Snowflake)原理 采用Twitter的雪花算法思想,将64位ID划分为多个部分: 1位符号位(始终为0) 41位时间戳(毫秒级,可用约69年) 10位机器ID(最多支持1024台机器) 12位序列号(每毫秒最多生成4096个ID) 第三步:机器ID分配方案 使用哈希算法解决机器ID的分配问题: 第四步:ID生成器核心实现 第五步:处理时钟回拨的增强方案 第六步:分布式协调与容错 第七步:测试验证 总结 这个基于哈希的分布式唯一ID生成器通过以下方式满足需求: 使用机器ID哈希分配确保全局唯一性 时间戳前缀保证粗略有序性 序列号机制支持高并发 机器ID动态分配支持可扩展性 时钟回拨处理确保系统稳定性 该设计在Twitter、百度等公司的分布式系统中得到实际应用验证。