哈希算法题目:设计一个基于哈希的分布式配置中心
字数 570 2025-11-02 19:16:02
哈希算法题目:设计一个基于哈希的分布式配置中心
题目描述
设计一个分布式配置中心系统,用于管理大量应用的配置信息。系统需要支持以下操作:
- 设置配置(set_config):为指定应用和命名空间设置键值对配置
- 获取配置(get_config):获取指定应用和命名空间的配置值
- 监听配置变更(watch_config):监听特定配置的变化
- 获取历史版本(get_history):获取配置的修改历史
解题过程
第一步:分析系统需求
- 配置数据以键值对形式存储,键的格式为"应用名:命名空间:配置项"
- 需要支持高并发读写,保证配置的实时性
- 配置变更需要通知所有监听该配置的客户端
- 需要记录配置的修改历史,支持版本回滚
第二步:设计数据存储结构
# 配置项数据结构
config_data = {
"app:namespace:key": {
"value": "配置值",
"version": 123,
"timestamp": 1690000000,
"checksum": "md5哈希值"
}
}
# 监听器数据结构
watchers = {
"app:namespace:key": [
{"client_id": "client1", "callback_url": "http://..."},
{"client_id": "client2", "callback_url": "http://..."}
]
}
# 历史记录数据结构
history = {
"app:namespace:key": [
{"version": 122, "value": "旧值", "timestamp": 1689999999},
{"version": 123, "value": "新值", "timestamp": 1690000000}
]
}
第三步:设计哈希分片策略
使用一致性哈希将配置数据分布到多个存储节点:
class ConsistentHashing:
def __init__(self, nodes, virtual_nodes=150):
self.virtual_nodes = virtual_nodes
self.ring = {}
self.sorted_keys = []
for node in nodes:
self.add_node(node)
def add_node(self, node):
for i in range(self.virtual_nodes):
virtual_key = self.hash_function(f"{node}:{i}")
self.ring[virtual_key] = node
self.sorted_keys.append(virtual_key)
self.sorted_keys.sort()
def get_node(self, key):
if not self.ring:
return None
hash_key = self.hash_function(key)
# 在哈希环上查找第一个大于等于hash_key的节点
for ring_key in self.sorted_keys:
if hash_key <= ring_key:
return self.ring[ring_key]
# 如果没找到,返回环的第一个节点
return self.ring[self.sorted_keys[0]]
def hash_function(self, key):
# 使用MD5哈希函数
return int(hashlib.md5(key.encode()).hexdigest(), 16)
第四步:实现核心操作
class DistributedConfigCenter:
def __init__(self, storage_nodes):
self.consistent_hash = ConsistentHashing(storage_nodes)
self.config_stores = {node: {} for node in storage_nodes}
self.watchers = {}
self.history_stores = {}
def set_config(self, app, namespace, key, value):
config_key = f"{app}:{namespace}:{key}"
# 确定存储节点
node = self.consistent_hash.get_node(config_key)
# 生成新版本
old_config = self.config_stores[node].get(config_key, {})
new_version = old_config.get('version', 0) + 1
# 计算校验和
checksum = hashlib.md5(value.encode()).hexdigest()
# 存储新配置
new_config = {
'value': value,
'version': new_version,
'timestamp': time.time(),
'checksum': checksum
}
self.config_stores[node][config_key] = new_config
# 记录历史
if config_key not in self.history_stores:
self.history_stores[config_key] = []
self.history_stores[config_key].append({
'version': new_version,
'value': value,
'timestamp': time.time()
})
# 触发监听器
self._notify_watchers(config_key, new_config)
return new_version
def get_config(self, app, namespace, key):
config_key = f"{app}:{namespace}:{key}"
node = self.consistent_hash.get_node(config_key)
if config_key in self.config_stores[node]:
return self.config_stores[node][config_key]['value']
return None
def watch_config(self, app, namespace, key, client_id, callback_url):
config_key = f"{app}:{namespace}:{key}"
if config_key not in self.watchers:
self.watchers[config_key] = []
# 避免重复监听
for watcher in self.watchers[config_key]:
if watcher['client_id'] == client_id:
return False
self.watchers[config_key].append({
'client_id': client_id,
'callback_url': callback_url
})
return True
def get_history(self, app, namespace, key, limit=10):
config_key = f"{app}:{namespace}:{key}"
if config_key not in self.history_stores:
return []
history = self.history_stores[config_key]
return history[-limit:] # 返回最近的limit条记录
def _notify_watchers(self, config_key, new_config):
if config_key not in self.watchers:
return
for watcher in self.watchers[config_key]:
# 异步通知客户端(简化实现)
threading.Thread(
target=self._send_notification,
args=(watcher['callback_url'], config_key, new_config)
).start()
第五步:处理并发和一致性
import threading
class ConcurrentConfigStore:
def __init__(self):
self.locks = {}
self.global_lock = threading.Lock()
def _get_lock(self, config_key):
with self.global_lock:
if config_key not in self.locks:
self.locks[config_key] = threading.Lock()
return self.locks[config_key]
def atomic_set(self, config_key, new_config):
lock = self._get_lock(config_key)
with lock:
# 执行原子操作
node = self.consistent_hash.get_node(config_key)
self.config_stores[node][config_key] = new_config
第六步:优化和扩展
- 缓存优化:为频繁访问的配置添加本地缓存
- 批量操作:支持批量获取和设置配置
- 权限控制:添加基于应用的访问权限管理
- 配置验证:添加配置值的格式验证
- 监控告警:监控配置变更频率和系统性能
这个设计通过哈希分片实现了配置数据的分布式存储,通过版本控制和历史记录支持配置追溯,通过监听器机制实现配置变更的实时通知,是一个完整的分布式配置中心解决方案。