哈希算法题目:设计一个基于哈希的分布式唯一ID生成器(支持趋势递增和分片标识)
字数 561 2025-11-03 20:30:43

哈希算法题目:设计一个基于哈希的分布式唯一ID生成器(支持趋势递增和分片标识)

题目描述:设计一个分布式唯一ID生成器,需要满足以下要求:

  1. 全局唯一性:在分布式系统中生成的ID必须全局唯一
  2. 趋势递增:ID总体上随时间递增,有利于数据库索引性能
  3. 支持分片标识:ID中包含分片信息,便于数据分片路由
  4. 高可用性:能够应对节点故障
  5. 高性能:支持高并发生成

解题过程:

步骤1:分析需求

  • 传统UUID虽然唯一但无序,不利于数据库索引
  • 数据库自增ID无法在分布式环境中保证唯一性
  • 需要设计一个结合时间戳、分片信息和序列号的方案

步骤2:设计ID结构
采用64位ID结构,分为4个部分:

| 符号位(1bit) | 时间戳(41bit) | 分片ID(10bit) | 序列号(12bit) |
  • 符号位:固定为0,保证ID为正数
  • 时间戳:毫秒级时间戳,可用约69年
  • 分片ID:支持1024个分片
  • 序列号:每毫秒每分片可生成4096个ID

步骤3:实现基础生成器

import time
import threading

class DistributedIDGenerator:
    def __init__(self, shard_id):
        self.shard_id = shard_id  # 分片标识
        self.sequence = 0  # 序列号
        self.last_timestamp = -1  # 上次生成时间戳
        self.lock = threading.Lock()
        
        # 定义各部分的位数
        self.TIMESTAMP_BITS = 41
        self.SHARD_BITS = 10
        self.SEQUENCE_BITS = 12
        
        # 定义起始时间戳(可自定义)
        self.EPOCH = 1609459200000  # 2021-01-01 00:00:00
    
    def _get_timestamp(self):
        """获取当前时间戳(毫秒)"""
        return int(time.time() * 1000)
    
    def _wait_next_millis(self, last_timestamp):
        """等待下一毫秒"""
        timestamp = self._get_timestamp()
        while timestamp <= last_timestamp:
            timestamp = self._get_timestamp()
        return timestamp

步骤4:实现核心生成逻辑

    def generate_id(self):
        """生成唯一ID"""
        with self.lock:
            timestamp = self._get_timestamp()
            
            # 处理时钟回拨
            if timestamp < self.last_timestamp:
                raise Exception("时钟回拨异常")
            
            # 如果是同一毫秒
            if timestamp == self.last_timestamp:
                self.sequence = (self.sequence + 1) & ((1 << self.SEQUENCE_BITS) - 1)
                # 序列号用完,等待下一毫秒
                if self.sequence == 0:
                    timestamp = self._wait_next_millis(self.last_timestamp)
            else:
                self.sequence = 0  # 新毫秒重置序列号
            
            self.last_timestamp = timestamp
            
            # 计算相对时间戳
            timestamp_offset = timestamp - self.EPOCH
            
            # 组合各部分生成最终ID
            id_value = (timestamp_offset << (self.SHARD_BITS + self.SEQUENCE_BITS))
            id_value |= (self.shard_id << self.SEQUENCE_BITS)
            id_value |= self.sequence
            
            return id_value

步骤5:添加ID解析功能

    def parse_id(self, id_value):
        """解析ID的各个组成部分"""
        sequence_mask = (1 << self.SEQUENCE_BITS) - 1
        shard_mask = (1 << self.SHARD_BITS) - 1
        timestamp_mask = (1 << self.TIMESTAMP_BITS) - 1
        
        sequence = id_value & sequence_mask
        shard_id = (id_value >> self.SEQUENCE_BITS) & shard_mask
        timestamp_offset = (id_value >> (self.SEQUENCE_BITS + self.SHARD_BITS)) & timestamp_mask
        absolute_timestamp = timestamp_offset + self.EPOCH
        
        return {
            'timestamp': absolute_timestamp,
            'shard_id': shard_id,
            'sequence': sequence
        }

步骤6:处理分布式环境问题

class DistributedIDService:
    def __init__(self, zk_hosts=None):
        self.generators = {}  # 分片ID到生成器的映射
        self.zk_client = None  # ZooKeeper客户端,用于协调
        
        if zk_hosts:
            self._init_coordination(zk_hosts)
    
    def _init_coordination(self, zk_hosts):
        """初始化分布式协调"""
        # 使用ZooKeeper或etcd进行分片分配和故障转移
        # 确保每个分片ID只被一个节点使用
        pass
    
    def get_generator(self, shard_id):
        """获取指定分片的ID生成器"""
        if shard_id not in self.generators:
            self.generators[shard_id] = DistributedIDGenerator(shard_id)
        return self.generators[shard_id]

步骤7:优化和扩展考虑

  1. 时钟同步:使用NTP保证各节点时间同步
  2. 分片分配:动态分片分配,支持水平扩展
  3. 故障转移:主从备份,确保高可用
  4. 批量生成:预生成ID池,提高性能

步骤8:测试验证

def test_id_generator():
    # 测试同一分片ID生成
    generator = DistributedIDGenerator(shard_id=1)
    
    ids = set()
    for i in range(1000):
        new_id = generator.generate_id()
        # 验证唯一性
        assert new_id not in ids
        ids.add(new_id)
        
        # 解析验证
        parts = generator.parse_id(new_id)
        assert parts['shard_id'] == 1
    
    print("测试通过:生成的1000个ID全部唯一")

# 运行测试
test_id_generator()

这个设计方案结合了时间戳、分片标识和序列号,既保证了全局唯一性,又支持趋势递增和分片路由,适用于大规模分布式系统。

哈希算法题目:设计一个基于哈希的分布式唯一ID生成器(支持趋势递增和分片标识) 题目描述:设计一个分布式唯一ID生成器,需要满足以下要求: 全局唯一性:在分布式系统中生成的ID必须全局唯一 趋势递增:ID总体上随时间递增,有利于数据库索引性能 支持分片标识:ID中包含分片信息,便于数据分片路由 高可用性:能够应对节点故障 高性能:支持高并发生成 解题过程: 步骤1:分析需求 传统UUID虽然唯一但无序,不利于数据库索引 数据库自增ID无法在分布式环境中保证唯一性 需要设计一个结合时间戳、分片信息和序列号的方案 步骤2:设计ID结构 采用64位ID结构,分为4个部分: 符号位:固定为0,保证ID为正数 时间戳:毫秒级时间戳,可用约69年 分片ID:支持1024个分片 序列号:每毫秒每分片可生成4096个ID 步骤3:实现基础生成器 步骤4:实现核心生成逻辑 步骤5:添加ID解析功能 步骤6:处理分布式环境问题 步骤7:优化和扩展考虑 时钟同步:使用NTP保证各节点时间同步 分片分配:动态分片分配,支持水平扩展 故障转移:主从备份,确保高可用 批量生成:预生成ID池,提高性能 步骤8:测试验证 这个设计方案结合了时间戳、分片标识和序列号,既保证了全局唯一性,又支持趋势递增和分片路由,适用于大规模分布式系统。