哈希算法题目:设计一个基于哈希的分布式唯一ID生成器(支持趋势递增和分片标识)
字数 561 2025-11-03 20:30:43
哈希算法题目:设计一个基于哈希的分布式唯一ID生成器(支持趋势递增和分片标识)
题目描述:设计一个分布式唯一ID生成器,需要满足以下要求:
- 全局唯一性:在分布式系统中生成的ID必须全局唯一
- 趋势递增:ID总体上随时间递增,有利于数据库索引性能
- 支持分片标识:ID中包含分片信息,便于数据分片路由
- 高可用性:能够应对节点故障
- 高性能:支持高并发生成
解题过程:
步骤1:分析需求
- 传统UUID虽然唯一但无序,不利于数据库索引
- 数据库自增ID无法在分布式环境中保证唯一性
- 需要设计一个结合时间戳、分片信息和序列号的方案
步骤2:设计ID结构
采用64位ID结构,分为4个部分:
| 符号位(1bit) | 时间戳(41bit) | 分片ID(10bit) | 序列号(12bit) |
- 符号位:固定为0,保证ID为正数
- 时间戳:毫秒级时间戳,可用约69年
- 分片ID:支持1024个分片
- 序列号:每毫秒每分片可生成4096个ID
步骤3:实现基础生成器
import time
import threading
class DistributedIDGenerator:
def __init__(self, shard_id):
self.shard_id = shard_id # 分片标识
self.sequence = 0 # 序列号
self.last_timestamp = -1 # 上次生成时间戳
self.lock = threading.Lock()
# 定义各部分的位数
self.TIMESTAMP_BITS = 41
self.SHARD_BITS = 10
self.SEQUENCE_BITS = 12
# 定义起始时间戳(可自定义)
self.EPOCH = 1609459200000 # 2021-01-01 00:00:00
def _get_timestamp(self):
"""获取当前时间戳(毫秒)"""
return int(time.time() * 1000)
def _wait_next_millis(self, last_timestamp):
"""等待下一毫秒"""
timestamp = self._get_timestamp()
while timestamp <= last_timestamp:
timestamp = self._get_timestamp()
return timestamp
步骤4:实现核心生成逻辑
def generate_id(self):
"""生成唯一ID"""
with self.lock:
timestamp = self._get_timestamp()
# 处理时钟回拨
if timestamp < self.last_timestamp:
raise Exception("时钟回拨异常")
# 如果是同一毫秒
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & ((1 << self.SEQUENCE_BITS) - 1)
# 序列号用完,等待下一毫秒
if self.sequence == 0:
timestamp = self._wait_next_millis(self.last_timestamp)
else:
self.sequence = 0 # 新毫秒重置序列号
self.last_timestamp = timestamp
# 计算相对时间戳
timestamp_offset = timestamp - self.EPOCH
# 组合各部分生成最终ID
id_value = (timestamp_offset << (self.SHARD_BITS + self.SEQUENCE_BITS))
id_value |= (self.shard_id << self.SEQUENCE_BITS)
id_value |= self.sequence
return id_value
步骤5:添加ID解析功能
def parse_id(self, id_value):
"""解析ID的各个组成部分"""
sequence_mask = (1 << self.SEQUENCE_BITS) - 1
shard_mask = (1 << self.SHARD_BITS) - 1
timestamp_mask = (1 << self.TIMESTAMP_BITS) - 1
sequence = id_value & sequence_mask
shard_id = (id_value >> self.SEQUENCE_BITS) & shard_mask
timestamp_offset = (id_value >> (self.SEQUENCE_BITS + self.SHARD_BITS)) & timestamp_mask
absolute_timestamp = timestamp_offset + self.EPOCH
return {
'timestamp': absolute_timestamp,
'shard_id': shard_id,
'sequence': sequence
}
步骤6:处理分布式环境问题
class DistributedIDService:
def __init__(self, zk_hosts=None):
self.generators = {} # 分片ID到生成器的映射
self.zk_client = None # ZooKeeper客户端,用于协调
if zk_hosts:
self._init_coordination(zk_hosts)
def _init_coordination(self, zk_hosts):
"""初始化分布式协调"""
# 使用ZooKeeper或etcd进行分片分配和故障转移
# 确保每个分片ID只被一个节点使用
pass
def get_generator(self, shard_id):
"""获取指定分片的ID生成器"""
if shard_id not in self.generators:
self.generators[shard_id] = DistributedIDGenerator(shard_id)
return self.generators[shard_id]
步骤7:优化和扩展考虑
- 时钟同步:使用NTP保证各节点时间同步
- 分片分配:动态分片分配,支持水平扩展
- 故障转移:主从备份,确保高可用
- 批量生成:预生成ID池,提高性能
步骤8:测试验证
def test_id_generator():
# 测试同一分片ID生成
generator = DistributedIDGenerator(shard_id=1)
ids = set()
for i in range(1000):
new_id = generator.generate_id()
# 验证唯一性
assert new_id not in ids
ids.add(new_id)
# 解析验证
parts = generator.parse_id(new_id)
assert parts['shard_id'] == 1
print("测试通过:生成的1000个ID全部唯一")
# 运行测试
test_id_generator()
这个设计方案结合了时间戳、分片标识和序列号,既保证了全局唯一性,又支持趋势递增和分片路由,适用于大规模分布式系统。