哈希算法题目:设计一个基于哈希的分布式唯一ID生成器(雪花算法变种)
字数 534 2025-11-03 08:34:44
哈希算法题目:设计一个基于哈希的分布式唯一ID生成器(雪花算法变种)
题目描述
设计一个分布式唯一ID生成器,要求生成的ID全局唯一、趋势递增,且能适应高并发场景。系统需要在多台机器上运行,每台机器有唯一的机器ID。ID结构为64位,包含时间戳、机器ID和序列号。需要解决时钟回拨问题。
解题过程
步骤1:理解雪花算法基础结构
雪花算法的标准ID结构(64位):
- 符号位(1位):固定为0,保证ID为正数
- 时间戳(41位):记录ID生成的时间(毫秒级)
- 机器ID(10位):最多支持1024台机器
- 序列号(12位):每毫秒最多生成4096个ID
我们需要设计一个变种,解决时钟回拨问题。
步骤2:设计基础ID生成逻辑
class SnowflakeIDGenerator:
def __init__(self, machine_id):
self.machine_id = machine_id # 机器ID(0-1023)
self.sequence = 0 # 序列号
self.last_timestamp = -1 # 上次生成ID的时间戳
# 位分配
self.timestamp_bits = 41
self.machine_id_bits = 10
self.sequence_bits = 12
# 最大值计算
self.max_sequence = (1 << self.sequence_bits) - 1
self.max_machine_id = (1 << self.machine_id_bits) - 1
# 位偏移量
self.timestamp_shift = self.sequence_bits + self.machine_id_bits
self.machine_id_shift = self.sequence_bits
步骤3:实现基础ID生成方法
def generate_id(self):
current_timestamp = self.get_current_timestamp()
if current_timestamp < self.last_timestamp:
# 时钟回拨,需要特殊处理
return self.handle_clock_backward(current_timestamp)
if current_timestamp == self.last_timestamp:
# 同一毫秒内,递增序列号
self.sequence = (self.sequence + 1) & self.max_sequence
if self.sequence == 0:
# 序列号用尽,等待下一毫秒
current_timestamp = self.wait_next_millis(self.last_timestamp)
else:
# 新的毫秒,重置序列号
self.sequence = 0
self.last_timestamp = current_timestamp
# 组合ID
id = (current_timestamp << self.timestamp_shift) | \
(self.machine_id << self.machine_id_shift) | \
self.sequence
return id
步骤4:解决时钟回拨问题
时钟回拨是指系统时间突然倒退,可能导致ID重复。我们采用以下策略:
def handle_clock_backward(self, current_timestamp):
# 计算回拨的时间差
backward_offset = self.last_timestamp - current_timestamp
if backward_offset <= 5: # 回拨5毫秒以内,等待
time.sleep(backward_offset / 1000.0)
return self.generate_id()
else:
# 回拨超过5毫秒,使用备用方案
return self.generate_backup_id(current_timestamp)
def generate_backup_id(self, current_timestamp):
# 方案1:使用扩展位记录回拨信息
# 在序列号高位设置回拨标志位
backup_flag = 1 # 回拨标志
backup_sequence = (backup_flag << (self.sequence_bits - 1)) | self.sequence
id = (current_timestamp << self.timestamp_shift) | \
(self.machine_id << self.machine_id_shift) | \
backup_sequence
return id
步骤5:添加时间戳优化
为了延长系统使用寿命,我们可以使用相对时间戳:
def __init__(self, machine_id, epoch=1609459200000): # 2021-01-01 00:00:00
self.epoch = epoch # 自定义纪元时间
# 其他初始化代码...
def get_current_timestamp(self):
# 返回相对于纪元的毫秒数
return int(time.time() * 1000) - self.epoch
步骤6:实现分布式协调
对于机器ID的分配,可以使用ZooKeeper或Redis进行协调:
def acquire_machine_id(self, zk_hosts):
# 使用ZooKeeper分配机器ID
from kazoo.client import KazooClient
zk = KazooClient(hosts=zk_hosts)
zk.start()
# 创建临时顺序节点
path = zk.create("/snowflake/machine_", ephemeral=True, sequence=True)
machine_id = int(path.split('_')[-1]) % (self.max_machine_id + 1)
return machine_id
步骤7:完整实现与测试
import time
import threading
class DistributedSnowflake:
def __init__(self, machine_id, epoch=1609459200000):
self.machine_id = machine_id
self.epoch = epoch
self.sequence = 0
self.last_timestamp = -1
self.lock = threading.Lock()
# 位配置
self.timestamp_bits = 41
self.machine_id_bits = 10
self.sequence_bits = 12
self.max_sequence = (1 << self.sequence_bits) - 1
self.max_machine_id = (1 << self.machine_id_bits) - 1
self.timestamp_shift = self.sequence_bits + self.machine_id_bits
self.machine_id_shift = self.sequence_bits
def generate_id(self):
with self.lock:
current_timestamp = self.get_current_timestamp()
if current_timestamp < self.last_timestamp:
return self.handle_clock_backward(current_timestamp)
if current_timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & self.max_sequence
if self.sequence == 0:
current_timestamp = self.wait_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = current_timestamp
return (current_timestamp << self.timestamp_shift) | \
(self.machine_id << self.machine_id_shift) | \
self.sequence
def get_current_timestamp(self):
return int(time.time() * 1000) - self.epoch
def wait_next_millis(self, last_timestamp):
timestamp = self.get_current_timestamp()
while timestamp <= last_timestamp:
time.sleep(0.001)
timestamp = self.get_current_timestamp()
return timestamp
def handle_clock_backward(self, current_timestamp):
backward_offset = self.last_timestamp - current_timestamp
if backward_offset <= 5:
time.sleep(backward_offset / 1000.0)
return self.generate_id()
else:
# 抛出异常或使用备用方案
raise Exception(f"Clock moved backwards. Refusing to generate id for {backward_offset} milliseconds")
# 测试代码
def test_snowflake():
generator = DistributedSnowflake(machine_id=1)
# 生成10个ID测试
ids = set()
for i in range(10):
id = generator.generate_id()
print(f"Generated ID: {id} (binary: {bin(id)})")
ids.add(id)
print(f"Generated {len(ids)} unique IDs")
# 解析ID结构
sample_id = generator.generate_id()
timestamp = (sample_id >> generator.timestamp_shift) + generator.epoch
machine_id = (sample_id >> generator.machine_id_shift) & generator.max_machine_id
sequence = sample_id & generator.max_sequence
print(f"Parsed - Timestamp: {timestamp}, Machine ID: {machine_id}, Sequence: {sequence}")
test_snowflake()
这个实现解决了分布式环境下的唯一ID生成问题,通过时间戳、机器ID和序列号的组合保证全局唯一性,并提供了时钟回拨的处理机制。