一致性哈希在分布式数据库分片中的应用
字数 404 2025-12-04 13:21:07
一致性哈希在分布式数据库分片中的应用
题目描述
设计一个基于一致性哈希的分布式数据库分片系统,需要解决以下问题:
- 将数据均匀分布到多个数据库节点
- 支持节点的动态添加和移除,尽量减少数据迁移量
- 处理节点负载不均的问题(热点节点)
解题过程
第一步:理解基本一致性哈希原理
- 将哈希空间组织成一个虚拟的环(0到2^32-1)
- 每个节点通过哈希函数映射到环上的某个位置
- 每个数据键通过同样的哈希函数映射到环上
- 数据存储在顺时针方向找到的第一个节点上
第二步:基础实现
import hashlib
class ConsistentHashing:
def __init__(self, nodes=None, virtual_replicas=3):
self.virtual_replicas = virtual_replicas # 虚拟节点倍数
self.ring = {} # 哈希环:位置 -> 节点
self.sorted_keys = [] # 排序的哈希位置
if nodes:
for node in nodes:
self.add_node(node)
def _hash(self, key):
"""计算32位哈希值"""
return int(hashlib.md5(key.encode()).hexdigest(), 16) % (2**32)
第三步:添加虚拟节点解决负载均衡
def add_node(self, node):
"""添加物理节点,同时创建多个虚拟节点"""
for i in range(self.virtual_replicas):
virtual_node = f"{node}#{i}"
position = self._hash(virtual_node)
# 处理哈希冲突(位置重复)
while position in self.ring:
position = (position + 1) % (2**32)
self.ring[position] = node
self.sorted_keys.append(position)
self.sorted_keys.sort()
def remove_node(self, node):
"""移除节点及其所有虚拟节点"""
keys_to_remove = []
for position, node_name in self.ring.items():
if node_name == node:
keys_to_remove.append(position)
for key in keys_to_remove:
del self.ring[key]
self.sorted_keys.remove(key)
第四步:数据分片定位
def get_node(self, key):
"""根据键找到对应的节点"""
if not self.ring:
return None
key_hash = self._hash(key)
# 二分查找找到第一个大于等于key_hash的位置
import bisect
idx = bisect.bisect_left(self.sorted_keys, key_hash)
# 如果超过最大位置,回到环开头
if idx == len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
第五步:处理数据迁移
def get_migration_keys(self, new_node, existing_keys):
"""计算添加新节点时需要迁移的键"""
migration_map = {}
for key in existing_keys:
current_node = self.get_node(key)
# 临时添加新节点,检查键是否应该迁移
self.add_node(new_node)
new_assigned_node = self.get_node(key)
self.remove_node(new_node) # 恢复原状
if new_assigned_node == new_node and new_assigned_node != current_node:
if new_node not in migration_map:
migration_map[new_node] = []
migration_map[new_node].append(key)
return migration_map
第六步:优化负载均衡
def rebalance_nodes(self, nodes, target_data_distribution):
"""重新平衡节点负载"""
# 计算当前数据分布
current_dist = self.get_data_distribution(nodes)
# 如果某个节点负载过高,增加其虚拟节点数量
for node in nodes:
current_load = current_dist.get(node, 0)
target_load = target_data_distribution.get(node, 0)
if current_load > target_load * 1.2: # 负载超过20%
# 为该节点增加虚拟节点
for i in range(self.virtual_replicas, self.virtual_replicas + 2):
self.add_virtual_node(node, i)
def get_data_distribution(self, sample_keys):
"""获取当前数据分布情况"""
distribution = {}
for key in sample_keys:
node = self.get_node(key)
distribution[node] = distribution.get(node, 0) + 1
return distribution
第七步:完整系统实现
class DistributedDatabase:
def __init__(self):
self.consistent_hash = ConsistentHashing()
self.data_shards = {} # 节点 -> 数据分片
self.node_status = {} # 节点状态
def add_database_node(self, node_name, capacity):
"""添加数据库节点"""
self.consistent_hash.add_node(node_name)
self.data_shards[node_name] = {}
self.node_status[node_name] = {
'capacity': capacity,
'current_load': 0,
'status': 'active'
}
def store_data(self, key, value):
"""存储数据到对应的分片"""
node = self.consistent_hash.get_node(key)
if node and self.node_status[node]['status'] == 'active':
self.data_shards[node][key] = value
self.node_status[node]['current_load'] += 1
return True
return False
def handle_node_failure(self, failed_node):
"""处理节点故障"""
self.node_status[failed_node]['status'] = 'failed'
# 将故障节点的数据迁移到其他节点
failed_data = self.data_shards[failed_node]
for key, value in failed_data.items():
new_node = self.consistent_hash.get_node(key)
if new_node != failed_node:
self.data_shards[new_node][key] = value
关键要点总结
- 虚拟节点确保数据均匀分布
- 节点变动时只影响相邻数据,迁移量最小
- 通过调整虚拟节点数量实现负载均衡
- 支持节点的动态添加、移除和故障恢复
这种设计能够有效解决分布式数据库的分片问题,提供良好的可扩展性和容错能力。