哈希算法题目:设计一个基于哈希的分布式实时游戏排行榜系统(支持分数更新、Top K 查询和分段范围查询)
字数 1503 2025-12-08 00:23:30
哈希算法题目:设计一个基于哈希的分布式实时游戏排行榜系统(支持分数更新、Top K 查询和分段范围查询)
题目描述
设计一个支持实时分数更新、查询Top K玩家以及查询任意分数段内玩家数量的分布式游戏排行榜系统。
系统需要支持高并发操作,并能扩展到多个服务器节点。玩家分数可能频繁变动,需要保证低延迟的查询和更新。
解题过程
步骤1:明确需求与挑战
- 核心功能:
- 更新玩家分数(增加/减少)。
- 查询前K名玩家(按分数降序,分数相同时按最新更新时间排序)。
- 查询分数在
[low, high]区间内的玩家数量。
- 分布式要求:数据分散在多个节点,需处理负载均衡与数据一致性。
- 性能要求:更新和查询都需高效(理想为O(log N)或O(1))。
步骤2:选择核心数据结构
单个节点内需要结合两种数据结构:
- 哈希表:存储玩家ID到分数的映射,支持O(1)的分数更新。
- 平衡树:存储分数到玩家数量的映射,支持O(log N)的Top K和范围查询。
在分布式环境下,常用跳表或Redis的有序集合(底层是跳表+哈希表)。
选择理由:
- 哈希表快速定位玩家当前分数。
- 平衡树(如红黑树、跳表)维护分数有序,支持:
- Top K查询:从高分到低分遍历。
- 范围查询:统计区间内的玩家数量。
步骤3:设计单节点数据结构
假设使用跳表,每个节点存储(分数, 该分数对应的玩家数量)。
同时用哈希表存储 <player_id, score>。
示例结构:
class RankingNode:
def __init__(self):
self.player_to_score = {} # 哈希表:{玩家ID: 分数}
self.score_to_count = {} # 哈希表:{分数: 该分数玩家数量}
self.sorted_scores = SortedList() # 有序列表存储分数(可用跳表实现)
步骤4:分数更新操作
- 从哈希表获取玩家旧分数(若无,则旧分数为
None)。 - 如果新分数与旧分数相同,无需操作。
- 否则:
- 若旧分数存在:
- 减少旧分数的玩家数量,若减到0则从有序结构中移除该分数。
- 更新哈希表:
player_to_score[玩家ID] = 新分数。 - 增加新分数的玩家数量,若该分数不存在则加入有序结构。
- 若旧分数存在:
时间复杂度:O(log N)(更新有序结构)。
步骤5:Top K查询
- 从有序结构(如跳表)中从高分到低分遍历。
- 对每个分数,获取该分数的所有玩家(需额外存储玩家ID列表,或从另一个哈希表
score_to_players获取)。 - 按分数降序、更新时间次序列出玩家,直到取满K个。
时间复杂度:O(K + log N)。
步骤6:分段范围查询
- 在有序结构中查找
low和high的位置。 - 遍历
[low, high]区间内的分数,累加每个分数的玩家数量。
时间复杂度:O(log N + M),M为区间内分数个数。
步骤7:分布式扩展
挑战:数据分布到多个节点,如何支持全局查询?
解决方案:
- 一致性哈希分配玩家到节点:每个节点负责一部分玩家。
- 全局聚合:
- Top K查询:每个节点本地取Top K,汇总到协调节点做归并排序,取全局Top K。
- 范围查询:每个节点返回区间内的玩家数量,协调节点求和。
优化:
- 使用并行查询减少延迟。
- 缓存全局Top K结果(适合更新不频繁的场景)。
步骤8:处理分数相同与排序
若分数相同,按最新更新时间排序(后更新的排名靠前)。
在哈希表中存储(score, timestamp),在有序结构中按分数降序、时间戳降序排序。
步骤9:总结与扩展
- 单节点核心:哈希表 + 有序结构(跳表/红黑树)。
- 分布式核心:一致性哈希分片 + 并行查询聚合。
- 可扩展性:节点可水平扩展,通过分片降低单点压力。
示例(单节点伪代码)
from sortedcontainers import SortedList
import time
class RealTimeRanking:
def __init__(self):
self.player_to_score = {} # {player_id: (score, timestamp)}
self.score_to_players = {} # {score: {player_id: timestamp}}
self.sorted_scores = SortedList() # 按分数降序
def update_score(self, player_id, new_score):
old_info = self.player_to_score.get(player_id)
if old_info:
old_score, _ = old_info
# 从旧分数中移除玩家
self.score_to_players[old_score].pop(player_id)
if not self.score_to_players[old_score]:
del self.score_to_players[old_score]
self.sorted_scores.remove(old_score)
# 更新新分数
ts = time.time()
self.player_to_score[player_id] = (new_score, ts)
if new_score not in self.score_to_players:
self.score_to_players[new_score] = {}
self.sorted_scores.add(new_score)
self.score_to_players[new_score][player_id] = ts
def get_top_k(self, k):
result = []
for score in reversed(self.sorted_scores): # 从高到低
players = self.score_to_players[score]
# 按时间戳降序排序
sorted_players = sorted(players.items(), key=lambda x: -x[1])
for player_id, ts in sorted_players:
result.append((player_id, score, ts))
if len(result) == k:
return result
return result
def range_count(self, low, high):
count = 0
for score in self.sorted_scores.irange(low, high): # 在区间内的分数
count += len(self.score_to_players[score])
return count
这个设计结合了哈希表的快速查找和有序结构的高效范围查询,并可通过一致性哈希扩展到分布式环境,适用于实时排行榜场景。