哈希算法题目:设计一个基于哈希的分布式唯一ID生成器(支持趋势递增和分片标识)
字数 676 2025-11-03 18:00:43
哈希算法题目:设计一个基于哈希的分布式唯一ID生成器(支持趋势递增和分片标识)
题目描述
设计一个分布式唯一ID生成器,需要满足以下要求:
- 生成的ID在分布式环境下全局唯一
- ID呈趋势递增(便于数据库索引)
- 支持分片标识(能够从ID中解析出所属分片)
- 高性能,低延迟
解题过程
步骤1:分析需求
- 全局唯一性:必须确保不同节点、不同时间生成的ID不会重复
- 趋势递增:新生成的ID比旧ID大,有利于数据库B+树索引
- 分片标识:ID中需要包含分片信息,便于数据路由
- 高性能:需要支持高并发场景下的ID生成
步骤2:设计ID结构
采用64位ID结构,划分为4个部分:
| 符号位(1bit) | 时间戳(41bit) | 分片ID(10bit) | 序列号(12bit) |
- 符号位:固定为0,保证ID为正数
- 时间戳:毫秒级时间戳,可用约69年
- 分片ID:支持最多1024个分片
- 序列号:每毫秒内自增序列,支持每毫秒4096个ID
步骤3:实现核心逻辑
import time
import threading
class DistributedIDGenerator:
def __init__(self, shard_id):
self.shard_id = shard_id & 0x3FF # 10bit分片ID
self.sequence = 0
self.last_timestamp = -1
self.sequence_bits = 12
self.lock = threading.Lock()
# 时间戳偏移量(从2024-01-01开始)
self.epoch = 1704067200000
def _get_current_timestamp(self):
return int(time.time() * 1000)
def _wait_next_millisecond(self, last_timestamp):
timestamp = self._get_current_timestamp()
while timestamp <= last_timestamp:
timestamp = self._get_current_timestamp()
return timestamp
def generate_id(self):
with self.lock:
timestamp = self._get_current_timestamp()
# 处理时钟回拨
if timestamp < self.last_timestamp:
raise Exception("时钟回拨异常")
# 同一毫秒内生成多个ID
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & ((1 << self.sequence_bits) - 1)
if self.sequence == 0: # 序列号用完,等待下一毫秒
timestamp = self._wait_next_millisecond(timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
# 组装ID
timestamp_offset = timestamp - self.epoch
unique_id = (timestamp_offset << (10 + 12)) | (self.shard_id << 12) | self.sequence
return unique_id
def parse_id(self, id):
"""解析ID的各个组成部分"""
sequence = id & ((1 << 12) - 1)
shard_id = (id >> 12) & ((1 << 10) - 1)
timestamp = (id >> (10 + 12)) + self.epoch
return {
'timestamp': timestamp,
'shard_id': shard_id,
'sequence': sequence
}
步骤4:处理边界情况
- 时钟回拨问题:检测到当前时间小于上次生成时间时抛出异常
- 序列号耗尽:当同一毫秒内序列号用完时,等待到下一毫秒
- 分片ID溢出:使用位掩码确保分片ID在有效范围内
步骤5:测试验证
# 测试代码
def test_id_generator():
generator1 = DistributedIDGenerator(shard_id=1)
generator2 = DistributedIDGenerator(shard_id=2)
# 生成测试
id1 = generator1.generate_id()
id2 = generator2.generate_id()
print(f"生成的ID1: {id1}")
print(f"生成的ID2: {id2}")
# 解析测试
parsed1 = generator1.parse_id(id1)
parsed2 = generator2.parse_id(id2)
print(f"解析ID1: {parsed1}")
print(f"解析ID2: {parsed2}")
# 验证唯一性
assert id1 != id2, "ID应该唯一"
assert parsed1['shard_id'] == 1, "分片ID解析错误"
assert parsed2['shard_id'] == 2, "分片ID解析错误"
test_id_generator()
步骤6:性能优化考虑
- 预生成ID:可以预先生成一批ID缓存在内存中
- 批量生成:支持一次生成多个ID,减少锁竞争
- 时钟同步:使用NTP服务保持各节点时钟同步
步骤7:扩展思考
- 可以支持自定义epoch时间
- 可以调整各部分的位数分配以适应不同场景
- 可以添加机器ID字段支持更细粒度的分布式部署
这种设计结合了时间戳、分片标识和序列号,既保证了全局唯一性,又支持趋势递增和分片路由,适合分布式系统使用。