哈希算法题目:设计一个基于哈希的分布式锁服务
字数 609 2025-11-16 05:56:42

哈希算法题目:设计一个基于哈希的分布式锁服务

我将为您详细讲解这个题目的完整解题思路和实现方案。

题目描述

设计一个基于哈希表的分布式锁服务,要求支持多个客户端并发获取和释放锁,确保在分布式环境下的互斥访问。锁需要具备以下特性:

  • 互斥性:同一时刻只有一个客户端能持有锁
  • 避免死锁:锁必须有超时机制,防止客户端崩溃导致锁永远无法释放
  • 容错性:服务需要具备高可用性

解题过程

第一步:基础锁数据结构设计

首先,我们需要设计锁的基本数据结构:

class DistributedLock:
    def __init__(self):
        # 使用哈希表存储锁信息:lock_name -> {owner, timestamp, ttl}
        self.locks = {}
        self.default_ttl = 30  # 默认30秒超时
    
    def acquire_lock(self, lock_name, client_id, ttl=None):
        """获取锁"""
        pass
    
    def release_lock(self, lock_name, client_id):
        """释放锁"""
        pass
    
    def is_locked(self, lock_name):
        """检查锁是否被占用"""
        pass

第二步:实现基本的锁获取逻辑

现在实现锁获取的核心逻辑:

import time
import threading

class DistributedLock:
    def __init__(self):
        self.locks = {}
        self.default_ttl = 30
        self.lock = threading.Lock()  # 用于线程安全
    
    def acquire_lock(self, lock_name, client_id, ttl=None):
        """获取分布式锁"""
        if ttl is None:
            ttl = self.default_ttl
        
        current_time = time.time()
        expire_time = current_time + ttl
        
        with self.lock:
            # 检查锁是否存在且未过期
            if lock_name in self.locks:
                lock_info = self.locks[lock_name]
                # 如果锁已过期,可以重新获取
                if lock_info['expire_time'] < current_time:
                    # 锁已过期,可以获取
                    self.locks[lock_name] = {
                        'owner': client_id,
                        'acquire_time': current_time,
                        'expire_time': expire_time,
                        'ttl': ttl
                    }
                    return True
                else:
                    # 锁已被其他客户端持有
                    return False
            else:
                # 锁不存在,可以获取
                self.locks[lock_name] = {
                    'owner': client_id,
                    'acquire_time': current_time,
                    'expire_time': expire_time,
                    'ttl': ttl
                }
                return True

第三步:实现锁释放和检查功能

    def release_lock(self, lock_name, client_id):
        """释放锁 - 只有锁的持有者才能释放"""
        with self.lock:
            if lock_name in self.locks:
                lock_info = self.locks[lock_name]
                # 只有锁的持有者才能释放锁
                if lock_info['owner'] == client_id:
                    del self.locks[lock_name]
                    return True
            return False
    
    def is_locked(self, lock_name):
        """检查锁是否被占用(包括检查是否过期)"""
        with self.lock:
            current_time = time.time()
            if lock_name in self.locks:
                lock_info = self.locks[lock_name]
                if lock_info['expire_time'] >= current_time:
                    return True
                else:
                    # 锁已过期,自动清理
                    del self.locks[lock_name]
            return False
    
    def get_lock_owner(self, lock_name):
        """获取锁的当前持有者"""
        with self.lock:
            if self.is_locked(lock_name):
                return self.locks[lock_name]['owner']
            return None

第四步:添加锁续期功能

在实际应用中,客户端可能需要延长锁的持有时间:

    def renew_lock(self, lock_name, client_id, new_ttl=None):
        """续期锁"""
        if new_ttl is None:
            new_ttl = self.default_ttl
        
        current_time = time.time()
        new_expire_time = current_time + new_ttl
        
        with self.lock:
            if (lock_name in self.locks and 
                self.locks[lock_name]['owner'] == client_id and
                self.locks[lock_name]['expire_time'] >= current_time):
                
                self.locks[lock_name]['expire_time'] = new_expire_time
                self.locks[lock_name]['ttl'] = new_ttl
                return True
            return False

第五步:实现非阻塞的尝试获取锁

    def try_acquire_lock(self, lock_name, client_id, ttl=None, max_retries=3, retry_interval=1):
        """尝试获取锁,支持重试机制"""
        for attempt in range(max_retries):
            if self.acquire_lock(lock_name, client_id, ttl):
                return True
            if attempt < max_retries - 1:  # 不是最后一次重试
                time.sleep(retry_interval)
        return False

第六步:添加锁监控和自动清理

为了防止内存泄漏,我们需要定期清理过期的锁:

    def cleanup_expired_locks(self):
        """清理所有过期的锁"""
        current_time = time.time()
        expired_locks = []
        
        with self.lock:
            for lock_name, lock_info in list(self.locks.items()):
                if lock_info['expire_time'] < current_time:
                    expired_locks.append(lock_name)
                    del self.locks[lock_name]
        
        return expired_locks
    
    def start_cleanup_daemon(self, interval=60):
        """启动后台清理线程"""
        def cleanup_worker():
            while True:
                time.sleep(interval)
                expired_count = len(self.cleanup_expired_locks())
                if expired_count > 0:
                    print(f"Cleaned up {expired_count} expired locks")
        
        import threading
        cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
        cleanup_thread.start()

第七步:完整的分布式锁服务实现

将以上所有功能整合:

import time
import threading
from typing import Optional, Dict, Any

class DistributedLockService:
    """
    基于哈希表的分布式锁服务
    """
    
    def __init__(self, default_ttl: int = 30, cleanup_interval: int = 60):
        self.locks: Dict[str, Dict[str, Any]] = {}
        self.default_ttl = default_ttl
        self.cleanup_interval = cleanup_interval
        self.lock = threading.Lock()
        self.running = True
        
        # 启动后台清理线程
        self._start_cleanup_daemon()
    
    def _start_cleanup_daemon(self):
        """启动后台清理守护线程"""
        def cleanup_worker():
            while self.running:
                time.sleep(self.cleanup_interval)
                self._cleanup_expired_locks()
        
        cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
        cleanup_thread.start()
    
    def _cleanup_expired_locks(self):
        """内部方法:清理过期锁"""
        current_time = time.time()
        with self.lock:
            expired_keys = [
                lock_name for lock_name, lock_info in self.locks.items()
                if lock_info['expire_time'] < current_time
            ]
            for key in expired_keys:
                del self.locks[key]
    
    def acquire(self, lock_name: str, client_id: str, ttl: Optional[int] = None) -> bool:
        """获取分布式锁"""
        if ttl is None:
            ttl = self.default_ttl
        
        current_time = time.time()
        expire_time = current_time + ttl
        
        with self.lock:
            # 检查锁是否可获取
            if lock_name in self.locks:
                lock_info = self.locks[lock_name]
                if lock_info['expire_time'] >= current_time:
                    return False  # 锁被其他客户端持有且未过期
            
            # 获取锁
            self.locks[lock_name] = {
                'owner': client_id,
                'acquire_time': current_time,
                'expire_time': expire_time,
                'ttl': ttl
            }
            return True
    
    def release(self, lock_name: str, client_id: str) -> bool:
        """释放锁"""
        with self.lock:
            if (lock_name in self.locks and 
                self.locks[lock_name]['owner'] == client_id):
                del self.locks[lock_name]
                return True
            return False
    
    def try_acquire(self, lock_name: str, client_id: str, 
                   ttl: Optional[int] = None, max_retries: int = 3, 
                   retry_interval: float = 1.0) -> bool:
        """尝试获取锁,支持重试"""
        for attempt in range(max_retries):
            if self.acquire(lock_name, client_id, ttl):
                return True
            if attempt < max_retries - 1:
                time.sleep(retry_interval)
        return False
    
    def renew(self, lock_name: str, client_id: str, new_ttl: Optional[int] = None) -> bool:
        """续期锁"""
        if new_ttl is None:
            new_ttl = self.default_ttl
        
        current_time = time.time()
        new_expire_time = current_time + new_ttl
        
        with self.lock:
            if (lock_name in self.locks and 
                self.locks[lock_name]['owner'] == client_id and
                self.locks[lock_name]['expire_time'] >= current_time):
                
                self.locks[lock_name]['expire_time'] = new_expire_time
                self.locks[lock_name]['ttl'] = new_ttl
                return True
            return False
    
    def get_lock_info(self, lock_name: str) -> Optional[Dict[str, Any]]:
        """获取锁的详细信息"""
        with self.lock:
            if lock_name in self.locks:
                return self.locks[lock_name].copy()
            return None
    
    def shutdown(self):
        """关闭服务"""
        self.running = False

第八步:使用示例

# 使用示例
def example_usage():
    lock_service = DistributedLockService(default_ttl=10)
    
    # 客户端A尝试获取锁
    client_a = "client_001"
    if lock_service.acquire("resource_1", client_a):
        print("Client A acquired the lock")
        
        # 执行业务逻辑
        time.sleep(2)
        
        # 释放锁
        lock_service.release("resource_1", client_a)
        print("Client A released the lock")
    else:
        print("Client A failed to acquire the lock")
    
    # 客户端B尝试获取同一个锁
    client_b = "client_002"
    success = lock_service.try_acquire("resource_1", client_b, max_retries=5)
    if success:
        print("Client B acquired the lock after retrying")
        lock_service.release("resource_1", client_b)
    else:
        print("Client B failed to acquire the lock")

# 运行示例
if __name__ == "__main__":
    example_usage()

关键设计要点

  1. 哈希表选择:使用字典作为底层存储,提供O(1)时间复杂度的锁操作
  2. 线程安全:使用互斥锁确保并发安全
  3. 超时机制:防止死锁,自动清理过期锁
  4. 所有权验证:只有锁的持有者才能释放锁
  5. 后台清理:定期清理过期锁,防止内存泄漏
  6. 重试机制:提供友好的尝试获取接口

这个设计提供了一个功能完整的基于哈希表的分布式锁服务,可以在单机多线程环境下可靠工作。在实际的分布式环境中,可以考虑使用Redis等分布式存储来替代内存中的哈希表。

哈希算法题目:设计一个基于哈希的分布式锁服务 我将为您详细讲解这个题目的完整解题思路和实现方案。 题目描述 设计一个基于哈希表的分布式锁服务,要求支持多个客户端并发获取和释放锁,确保在分布式环境下的互斥访问。锁需要具备以下特性: 互斥性:同一时刻只有一个客户端能持有锁 避免死锁:锁必须有超时机制,防止客户端崩溃导致锁永远无法释放 容错性:服务需要具备高可用性 解题过程 第一步:基础锁数据结构设计 首先,我们需要设计锁的基本数据结构: 第二步:实现基本的锁获取逻辑 现在实现锁获取的核心逻辑: 第三步:实现锁释放和检查功能 第四步:添加锁续期功能 在实际应用中,客户端可能需要延长锁的持有时间: 第五步:实现非阻塞的尝试获取锁 第六步:添加锁监控和自动清理 为了防止内存泄漏,我们需要定期清理过期的锁: 第七步:完整的分布式锁服务实现 将以上所有功能整合: 第八步:使用示例 关键设计要点 哈希表选择 :使用字典作为底层存储,提供O(1)时间复杂度的锁操作 线程安全 :使用互斥锁确保并发安全 超时机制 :防止死锁,自动清理过期锁 所有权验证 :只有锁的持有者才能释放锁 后台清理 :定期清理过期锁,防止内存泄漏 重试机制 :提供友好的尝试获取接口 这个设计提供了一个功能完整的基于哈希表的分布式锁服务,可以在单机多线程环境下可靠工作。在实际的分布式环境中,可以考虑使用Redis等分布式存储来替代内存中的哈希表。