哈希算法题目:设计一个基于哈希的分布式锁服务
字数 609 2025-11-16 05:56:42
哈希算法题目:设计一个基于哈希的分布式锁服务
我将为您详细讲解这个题目的完整解题思路和实现方案。
题目描述
设计一个基于哈希表的分布式锁服务,要求支持多个客户端并发获取和释放锁,确保在分布式环境下的互斥访问。锁需要具备以下特性:
- 互斥性:同一时刻只有一个客户端能持有锁
- 避免死锁:锁必须有超时机制,防止客户端崩溃导致锁永远无法释放
- 容错性:服务需要具备高可用性
解题过程
第一步:基础锁数据结构设计
首先,我们需要设计锁的基本数据结构:
class DistributedLock:
def __init__(self):
# 使用哈希表存储锁信息:lock_name -> {owner, timestamp, ttl}
self.locks = {}
self.default_ttl = 30 # 默认30秒超时
def acquire_lock(self, lock_name, client_id, ttl=None):
"""获取锁"""
pass
def release_lock(self, lock_name, client_id):
"""释放锁"""
pass
def is_locked(self, lock_name):
"""检查锁是否被占用"""
pass
第二步:实现基本的锁获取逻辑
现在实现锁获取的核心逻辑:
import time
import threading
class DistributedLock:
def __init__(self):
self.locks = {}
self.default_ttl = 30
self.lock = threading.Lock() # 用于线程安全
def acquire_lock(self, lock_name, client_id, ttl=None):
"""获取分布式锁"""
if ttl is None:
ttl = self.default_ttl
current_time = time.time()
expire_time = current_time + ttl
with self.lock:
# 检查锁是否存在且未过期
if lock_name in self.locks:
lock_info = self.locks[lock_name]
# 如果锁已过期,可以重新获取
if lock_info['expire_time'] < current_time:
# 锁已过期,可以获取
self.locks[lock_name] = {
'owner': client_id,
'acquire_time': current_time,
'expire_time': expire_time,
'ttl': ttl
}
return True
else:
# 锁已被其他客户端持有
return False
else:
# 锁不存在,可以获取
self.locks[lock_name] = {
'owner': client_id,
'acquire_time': current_time,
'expire_time': expire_time,
'ttl': ttl
}
return True
第三步:实现锁释放和检查功能
def release_lock(self, lock_name, client_id):
"""释放锁 - 只有锁的持有者才能释放"""
with self.lock:
if lock_name in self.locks:
lock_info = self.locks[lock_name]
# 只有锁的持有者才能释放锁
if lock_info['owner'] == client_id:
del self.locks[lock_name]
return True
return False
def is_locked(self, lock_name):
"""检查锁是否被占用(包括检查是否过期)"""
with self.lock:
current_time = time.time()
if lock_name in self.locks:
lock_info = self.locks[lock_name]
if lock_info['expire_time'] >= current_time:
return True
else:
# 锁已过期,自动清理
del self.locks[lock_name]
return False
def get_lock_owner(self, lock_name):
"""获取锁的当前持有者"""
with self.lock:
if self.is_locked(lock_name):
return self.locks[lock_name]['owner']
return None
第四步:添加锁续期功能
在实际应用中,客户端可能需要延长锁的持有时间:
def renew_lock(self, lock_name, client_id, new_ttl=None):
"""续期锁"""
if new_ttl is None:
new_ttl = self.default_ttl
current_time = time.time()
new_expire_time = current_time + new_ttl
with self.lock:
if (lock_name in self.locks and
self.locks[lock_name]['owner'] == client_id and
self.locks[lock_name]['expire_time'] >= current_time):
self.locks[lock_name]['expire_time'] = new_expire_time
self.locks[lock_name]['ttl'] = new_ttl
return True
return False
第五步:实现非阻塞的尝试获取锁
def try_acquire_lock(self, lock_name, client_id, ttl=None, max_retries=3, retry_interval=1):
"""尝试获取锁,支持重试机制"""
for attempt in range(max_retries):
if self.acquire_lock(lock_name, client_id, ttl):
return True
if attempt < max_retries - 1: # 不是最后一次重试
time.sleep(retry_interval)
return False
第六步:添加锁监控和自动清理
为了防止内存泄漏,我们需要定期清理过期的锁:
def cleanup_expired_locks(self):
"""清理所有过期的锁"""
current_time = time.time()
expired_locks = []
with self.lock:
for lock_name, lock_info in list(self.locks.items()):
if lock_info['expire_time'] < current_time:
expired_locks.append(lock_name)
del self.locks[lock_name]
return expired_locks
def start_cleanup_daemon(self, interval=60):
"""启动后台清理线程"""
def cleanup_worker():
while True:
time.sleep(interval)
expired_count = len(self.cleanup_expired_locks())
if expired_count > 0:
print(f"Cleaned up {expired_count} expired locks")
import threading
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
第七步:完整的分布式锁服务实现
将以上所有功能整合:
import time
import threading
from typing import Optional, Dict, Any
class DistributedLockService:
"""
基于哈希表的分布式锁服务
"""
def __init__(self, default_ttl: int = 30, cleanup_interval: int = 60):
self.locks: Dict[str, Dict[str, Any]] = {}
self.default_ttl = default_ttl
self.cleanup_interval = cleanup_interval
self.lock = threading.Lock()
self.running = True
# 启动后台清理线程
self._start_cleanup_daemon()
def _start_cleanup_daemon(self):
"""启动后台清理守护线程"""
def cleanup_worker():
while self.running:
time.sleep(self.cleanup_interval)
self._cleanup_expired_locks()
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
def _cleanup_expired_locks(self):
"""内部方法:清理过期锁"""
current_time = time.time()
with self.lock:
expired_keys = [
lock_name for lock_name, lock_info in self.locks.items()
if lock_info['expire_time'] < current_time
]
for key in expired_keys:
del self.locks[key]
def acquire(self, lock_name: str, client_id: str, ttl: Optional[int] = None) -> bool:
"""获取分布式锁"""
if ttl is None:
ttl = self.default_ttl
current_time = time.time()
expire_time = current_time + ttl
with self.lock:
# 检查锁是否可获取
if lock_name in self.locks:
lock_info = self.locks[lock_name]
if lock_info['expire_time'] >= current_time:
return False # 锁被其他客户端持有且未过期
# 获取锁
self.locks[lock_name] = {
'owner': client_id,
'acquire_time': current_time,
'expire_time': expire_time,
'ttl': ttl
}
return True
def release(self, lock_name: str, client_id: str) -> bool:
"""释放锁"""
with self.lock:
if (lock_name in self.locks and
self.locks[lock_name]['owner'] == client_id):
del self.locks[lock_name]
return True
return False
def try_acquire(self, lock_name: str, client_id: str,
ttl: Optional[int] = None, max_retries: int = 3,
retry_interval: float = 1.0) -> bool:
"""尝试获取锁,支持重试"""
for attempt in range(max_retries):
if self.acquire(lock_name, client_id, ttl):
return True
if attempt < max_retries - 1:
time.sleep(retry_interval)
return False
def renew(self, lock_name: str, client_id: str, new_ttl: Optional[int] = None) -> bool:
"""续期锁"""
if new_ttl is None:
new_ttl = self.default_ttl
current_time = time.time()
new_expire_time = current_time + new_ttl
with self.lock:
if (lock_name in self.locks and
self.locks[lock_name]['owner'] == client_id and
self.locks[lock_name]['expire_time'] >= current_time):
self.locks[lock_name]['expire_time'] = new_expire_time
self.locks[lock_name]['ttl'] = new_ttl
return True
return False
def get_lock_info(self, lock_name: str) -> Optional[Dict[str, Any]]:
"""获取锁的详细信息"""
with self.lock:
if lock_name in self.locks:
return self.locks[lock_name].copy()
return None
def shutdown(self):
"""关闭服务"""
self.running = False
第八步:使用示例
# 使用示例
def example_usage():
lock_service = DistributedLockService(default_ttl=10)
# 客户端A尝试获取锁
client_a = "client_001"
if lock_service.acquire("resource_1", client_a):
print("Client A acquired the lock")
# 执行业务逻辑
time.sleep(2)
# 释放锁
lock_service.release("resource_1", client_a)
print("Client A released the lock")
else:
print("Client A failed to acquire the lock")
# 客户端B尝试获取同一个锁
client_b = "client_002"
success = lock_service.try_acquire("resource_1", client_b, max_retries=5)
if success:
print("Client B acquired the lock after retrying")
lock_service.release("resource_1", client_b)
else:
print("Client B failed to acquire the lock")
# 运行示例
if __name__ == "__main__":
example_usage()
关键设计要点
- 哈希表选择:使用字典作为底层存储,提供O(1)时间复杂度的锁操作
- 线程安全:使用互斥锁确保并发安全
- 超时机制:防止死锁,自动清理过期锁
- 所有权验证:只有锁的持有者才能释放锁
- 后台清理:定期清理过期锁,防止内存泄漏
- 重试机制:提供友好的尝试获取接口
这个设计提供了一个功能完整的基于哈希表的分布式锁服务,可以在单机多线程环境下可靠工作。在实际的分布式环境中,可以考虑使用Redis等分布式存储来替代内存中的哈希表。