哈希算法题目:设计一个基于哈希的分布式唯一ID生成器
字数 597 2025-11-03 00:20:06
哈希算法题目:设计一个基于哈希的分布式唯一ID生成器
题目描述
设计一个分布式唯一ID生成器,要求在多台机器上同时生成全局唯一的ID。ID需要满足以下条件:
- 全局唯一性:不同机器生成的ID不能重复
- 粗略有序性:ID应该大致按照时间顺序递增
- 高可用性:系统需要支持高并发请求
- 可扩展性:能够支持机器数量的动态增减
解题过程
第一步:分析需求与约束
- 唯一性是最核心要求,需要避免不同机器产生相同ID
- 有序性有助于数据库索引性能
- 系统需要支持每秒数万次的ID生成请求
- 考虑时钟回拨等异常情况
第二步:雪花算法(Snowflake)原理
采用Twitter的雪花算法思想,将64位ID划分为多个部分:
- 1位符号位(始终为0)
- 41位时间戳(毫秒级,可用约69年)
- 10位机器ID(最多支持1024台机器)
- 12位序列号(每毫秒最多生成4096个ID)
第三步:机器ID分配方案
使用哈希算法解决机器ID的分配问题:
class MachineIDAllocator:
def __init__(self, total_machines=1024):
self.total_machines = total_machines
self.machine_ids = set()
def assign_machine_id(self, machine_ip):
# 使用一致性哈希计算机器ID
hash_value = self.fnv1a_hash(machine_ip)
machine_id = hash_value % self.total_machines
# 处理哈希冲突
while machine_id in self.machine_ids:
machine_id = (machine_id + 1) % self.total_machines
self.machine_ids.add(machine_id)
return machine_id
def fnv1a_hash(self, data):
"""FNV-1a哈希算法,分布均匀"""
hash = 14695981039346656037
for byte in data.encode('utf-8'):
hash ^= byte
hash *= 1099511628211
return hash
第四步:ID生成器核心实现
import time
import threading
class DistributedIDGenerator:
def __init__(self, machine_id):
self.machine_id = machine_id
self.sequence = 0
self.last_timestamp = -1
self.sequence_bits = 12
self.machine_bits = 10
self.max_sequence = (1 << self.sequence_bits) - 1
self.lock = threading.Lock()
def next_id(self):
with self.lock:
timestamp = self.current_time_millis()
# 处理时钟回拨
if timestamp < self.last_timestamp:
raise Exception("时钟回拨异常")
# 同一毫秒内生成多个ID
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & self.max_sequence
if self.sequence == 0: # 序列号用尽,等待下一毫秒
timestamp = self.wait_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
# 组合各个部分生成最终ID
return ((timestamp << (self.machine_bits + self.sequence_bits)) |
(self.machine_id << self.sequence_bits) |
self.sequence)
def wait_next_millis(self, last_timestamp):
timestamp = self.current_time_millis()
while timestamp <= last_timestamp:
time.sleep(0.001)
timestamp = self.current_time_millis()
return timestamp
def current_time_millis(self):
return int(time.time() * 1000)
第五步:处理时钟回拨的增强方案
class EnhancedIDGenerator(DistributedIDGenerator):
def __init__(self, machine_id):
super().__init__(machine_id)
self.clock_offset = 0
def handle_clock_drift(self, current_time):
"""处理时钟回拨,使用历史最大时间戳"""
if current_time < self.last_timestamp:
# 小的回拨,等待时间追平
time_diff = self.last_timestamp - current_time
if time_diff < 1000: # 回拨小于1秒
time.sleep(time_diff / 1000.0)
return self.current_time_millis()
else:
# 大的回拨,需要人工干预或使用备用方案
raise Exception("检测到大的时钟回拨,需要人工处理")
return current_time
第六步:分布式协调与容错
class DistributedCoordinator:
def __init__(self, zookeeper_servers):
self.zk_servers = zookeeper_servers
self.allocated_ids = set()
def acquire_machine_id(self, machine_ip):
"""通过分布式协调服务获取机器ID"""
# 尝试连接ZooKeeper等协调服务
# 在持久节点中注册机器信息
# 使用临时节点处理机器故障
# 返回分配的机器ID
# 简化实现:使用基于IP的哈希分配
base_id = hash(machine_ip) % 1024
return self.find_available_id(base_id)
def find_available_id(self, base_id):
"""寻找可用的机器ID"""
for offset in range(1024):
candidate = (base_id + offset) % 1024
if candidate not in self.allocated_ids:
self.allocated_ids.add(candidate)
return candidate
raise Exception("没有可用的机器ID")
第七步:测试验证
def test_id_generator():
# 模拟多台机器同时生成ID
generators = []
for i in range(3):
allocator = MachineIDAllocator()
machine_id = allocator.assign_machine_id(f"192.168.1.{i+1}")
generator = DistributedIDGenerator(machine_id)
generators.append(generator)
# 并发测试
generated_ids = set()
for i in range(1000):
for gen in generators:
new_id = gen.next_id()
assert new_id not in generated_ids, "ID重复生成!"
generated_ids.add(new_id)
print(f"成功生成 {len(generated_ids)} 个唯一ID")
print("所有ID均唯一,测试通过")
总结
这个基于哈希的分布式唯一ID生成器通过以下方式满足需求:
- 使用机器ID哈希分配确保全局唯一性
- 时间戳前缀保证粗略有序性
- 序列号机制支持高并发
- 机器ID动态分配支持可扩展性
- 时钟回拨处理确保系统稳定性
该设计在Twitter、百度等公司的分布式系统中得到实际应用验证。