哈希算法题目:设计一个基于哈希的分布式唯一ID生成器(雪花算法变种)
字数 534 2025-11-03 08:34:44

哈希算法题目:设计一个基于哈希的分布式唯一ID生成器(雪花算法变种)

题目描述
设计一个分布式唯一ID生成器,要求生成的ID全局唯一、趋势递增,且能适应高并发场景。系统需要在多台机器上运行,每台机器有唯一的机器ID。ID结构为64位,包含时间戳、机器ID和序列号。需要解决时钟回拨问题。

解题过程

步骤1:理解雪花算法基础结构
雪花算法的标准ID结构(64位):

  • 符号位(1位):固定为0,保证ID为正数
  • 时间戳(41位):记录ID生成的时间(毫秒级)
  • 机器ID(10位):最多支持1024台机器
  • 序列号(12位):每毫秒最多生成4096个ID

我们需要设计一个变种,解决时钟回拨问题。

步骤2:设计基础ID生成逻辑

class SnowflakeIDGenerator:
    def __init__(self, machine_id):
        self.machine_id = machine_id  # 机器ID(0-1023)
        self.sequence = 0  # 序列号
        self.last_timestamp = -1  # 上次生成ID的时间戳
        
        # 位分配
        self.timestamp_bits = 41
        self.machine_id_bits = 10
        self.sequence_bits = 12
        
        # 最大值计算
        self.max_sequence = (1 << self.sequence_bits) - 1
        self.max_machine_id = (1 << self.machine_id_bits) - 1
        
        # 位偏移量
        self.timestamp_shift = self.sequence_bits + self.machine_id_bits
        self.machine_id_shift = self.sequence_bits

步骤3:实现基础ID生成方法

def generate_id(self):
    current_timestamp = self.get_current_timestamp()
    
    if current_timestamp < self.last_timestamp:
        # 时钟回拨,需要特殊处理
        return self.handle_clock_backward(current_timestamp)
    
    if current_timestamp == self.last_timestamp:
        # 同一毫秒内,递增序列号
        self.sequence = (self.sequence + 1) & self.max_sequence
        if self.sequence == 0:
            # 序列号用尽,等待下一毫秒
            current_timestamp = self.wait_next_millis(self.last_timestamp)
    else:
        # 新的毫秒,重置序列号
        self.sequence = 0
    
    self.last_timestamp = current_timestamp
    
    # 组合ID
    id = (current_timestamp << self.timestamp_shift) | \
         (self.machine_id << self.machine_id_shift) | \
         self.sequence
    return id

步骤4:解决时钟回拨问题
时钟回拨是指系统时间突然倒退,可能导致ID重复。我们采用以下策略:

def handle_clock_backward(self, current_timestamp):
    # 计算回拨的时间差
    backward_offset = self.last_timestamp - current_timestamp
    
    if backward_offset <= 5:  # 回拨5毫秒以内,等待
        time.sleep(backward_offset / 1000.0)
        return self.generate_id()
    else:
        # 回拨超过5毫秒,使用备用方案
        return self.generate_backup_id(current_timestamp)

def generate_backup_id(self, current_timestamp):
    # 方案1:使用扩展位记录回拨信息
    # 在序列号高位设置回拨标志位
    backup_flag = 1  # 回拨标志
    backup_sequence = (backup_flag << (self.sequence_bits - 1)) | self.sequence
    
    id = (current_timestamp << self.timestamp_shift) | \
         (self.machine_id << self.machine_id_shift) | \
         backup_sequence
    return id

步骤5:添加时间戳优化
为了延长系统使用寿命,我们可以使用相对时间戳:

def __init__(self, machine_id, epoch=1609459200000):  # 2021-01-01 00:00:00
    self.epoch = epoch  # 自定义纪元时间
    # 其他初始化代码...

def get_current_timestamp(self):
    # 返回相对于纪元的毫秒数
    return int(time.time() * 1000) - self.epoch

步骤6:实现分布式协调
对于机器ID的分配,可以使用ZooKeeper或Redis进行协调:

def acquire_machine_id(self, zk_hosts):
    # 使用ZooKeeper分配机器ID
    from kazoo.client import KazooClient
    
    zk = KazooClient(hosts=zk_hosts)
    zk.start()
    
    # 创建临时顺序节点
    path = zk.create("/snowflake/machine_", ephemeral=True, sequence=True)
    machine_id = int(path.split('_')[-1]) % (self.max_machine_id + 1)
    
    return machine_id

步骤7:完整实现与测试

import time
import threading

class DistributedSnowflake:
    def __init__(self, machine_id, epoch=1609459200000):
        self.machine_id = machine_id
        self.epoch = epoch
        self.sequence = 0
        self.last_timestamp = -1
        self.lock = threading.Lock()
        
        # 位配置
        self.timestamp_bits = 41
        self.machine_id_bits = 10
        self.sequence_bits = 12
        
        self.max_sequence = (1 << self.sequence_bits) - 1
        self.max_machine_id = (1 << self.machine_id_bits) - 1
        
        self.timestamp_shift = self.sequence_bits + self.machine_id_bits
        self.machine_id_shift = self.sequence_bits
    
    def generate_id(self):
        with self.lock:
            current_timestamp = self.get_current_timestamp()
            
            if current_timestamp < self.last_timestamp:
                return self.handle_clock_backward(current_timestamp)
            
            if current_timestamp == self.last_timestamp:
                self.sequence = (self.sequence + 1) & self.max_sequence
                if self.sequence == 0:
                    current_timestamp = self.wait_next_millis(self.last_timestamp)
            else:
                self.sequence = 0
            
            self.last_timestamp = current_timestamp
            
            return (current_timestamp << self.timestamp_shift) | \
                   (self.machine_id << self.machine_id_shift) | \
                   self.sequence
    
    def get_current_timestamp(self):
        return int(time.time() * 1000) - self.epoch
    
    def wait_next_millis(self, last_timestamp):
        timestamp = self.get_current_timestamp()
        while timestamp <= last_timestamp:
            time.sleep(0.001)
            timestamp = self.get_current_timestamp()
        return timestamp
    
    def handle_clock_backward(self, current_timestamp):
        backward_offset = self.last_timestamp - current_timestamp
        if backward_offset <= 5:
            time.sleep(backward_offset / 1000.0)
            return self.generate_id()
        else:
            # 抛出异常或使用备用方案
            raise Exception(f"Clock moved backwards. Refusing to generate id for {backward_offset} milliseconds")

# 测试代码
def test_snowflake():
    generator = DistributedSnowflake(machine_id=1)
    
    # 生成10个ID测试
    ids = set()
    for i in range(10):
        id = generator.generate_id()
        print(f"Generated ID: {id} (binary: {bin(id)})")
        ids.add(id)
    
    print(f"Generated {len(ids)} unique IDs")
    
    # 解析ID结构
    sample_id = generator.generate_id()
    timestamp = (sample_id >> generator.timestamp_shift) + generator.epoch
    machine_id = (sample_id >> generator.machine_id_shift) & generator.max_machine_id
    sequence = sample_id & generator.max_sequence
    
    print(f"Parsed - Timestamp: {timestamp}, Machine ID: {machine_id}, Sequence: {sequence}")

test_snowflake()

这个实现解决了分布式环境下的唯一ID生成问题,通过时间戳、机器ID和序列号的组合保证全局唯一性,并提供了时钟回拨的处理机制。

哈希算法题目:设计一个基于哈希的分布式唯一ID生成器(雪花算法变种) 题目描述 设计一个分布式唯一ID生成器,要求生成的ID全局唯一、趋势递增,且能适应高并发场景。系统需要在多台机器上运行,每台机器有唯一的机器ID。ID结构为64位,包含时间戳、机器ID和序列号。需要解决时钟回拨问题。 解题过程 步骤1:理解雪花算法基础结构 雪花算法的标准ID结构(64位): 符号位(1位):固定为0,保证ID为正数 时间戳(41位):记录ID生成的时间(毫秒级) 机器ID(10位):最多支持1024台机器 序列号(12位):每毫秒最多生成4096个ID 我们需要设计一个变种,解决时钟回拨问题。 步骤2:设计基础ID生成逻辑 步骤3:实现基础ID生成方法 步骤4:解决时钟回拨问题 时钟回拨是指系统时间突然倒退,可能导致ID重复。我们采用以下策略: 步骤5:添加时间戳优化 为了延长系统使用寿命,我们可以使用相对时间戳: 步骤6:实现分布式协调 对于机器ID的分配,可以使用ZooKeeper或Redis进行协调: 步骤7:完整实现与测试 这个实现解决了分布式环境下的唯一ID生成问题,通过时间戳、机器ID和序列号的组合保证全局唯一性,并提供了时钟回拨的处理机制。