哈希算法题目:设计一个基于哈希的在线选举系统
字数 664 2025-11-02 11:43:41
哈希算法题目:设计一个基于哈希的在线选举系统
题目描述
你需要设计一个在线选举系统,用于实时统计候选人的得票情况。系统需要支持以下操作:
vote(candidate, timestamp):在给定的时间戳为某个候选人投票query(top_k, timestamp):查询到某个时间戳为止,得票数前k名的候选人列表
要求实现一个基于哈希的高效解决方案,能够处理大量投票数据。
解题思路分析
这个问题的核心在于如何高效地记录投票数据并快速查询top k结果。我们需要考虑:
- 投票数据的时间顺序性(按时间戳递增)
- 需要支持任意时间点的top k查询
- 系统需要处理高频的投票操作
解决方案详解
步骤1:数据结构设计
我们需要设计合适的数据结构来存储和查询投票数据:
from collections import defaultdict
import bisect
import heapq
class OnlineElectionSystem:
def __init__(self):
# 存储每个候选人的总票数
self.candidate_votes = defaultdict(int)
# 按时间戳存储每个时刻的候选人票数快照
self.timestamp_snapshots = []
self.vote_records = []
# 使用最大堆来维护top k候选人
self.max_heap = []
def vote(self, candidate: str, timestamp: int) -> None:
"""记录投票"""
self.candidate_votes[candidate] += 1
# 记录当前时间戳的票数快照
current_votes = self.candidate_votes.copy()
self.timestamp_snapshots.append((timestamp, current_votes))
self.vote_records.append((timestamp, candidate))
步骤2:时间戳处理优化
由于时间戳是递增的,我们可以使用二分查找来快速定位特定时间点的数据:
def _find_closest_timestamp(self, timestamp: int) -> dict:
"""使用二分查找找到小于等于给定时间戳的最新快照"""
# 找到第一个时间戳大于target的位置
index = bisect.bisect_right(self.timestamp_snapshots,
(timestamp, {})) - 1
if index < 0:
return {}
return self.timestamp_snapshots[index][1]
步骤3:Top K查询实现
使用最大堆来高效获取前k名候选人:
def query(self, top_k: int, timestamp: int) -> list:
"""查询指定时间戳的前k名候选人"""
if top_k <= 0:
return []
# 获取指定时间点的票数数据
votes_at_time = self._find_closest_timestamp(timestamp)
if not votes_at_time:
return []
# 使用最大堆获取top k
heap = []
for candidate, votes in votes_at_time.items():
# 使用负数票数构建最大堆(Python的heapq是最小堆)
heapq.heappush(heap, (-votes, candidate))
result = []
for _ in range(min(top_k, len(heap))):
votes, candidate = heapq.heappop(heap)
result.append(candidate)
return result
步骤4:性能优化版本
对于高频投票场景,我们可以进一步优化:
class OptimizedOnlineElectionSystem:
def __init__(self):
self.candidate_votes = defaultdict(int)
self.timestamps = [] # 存储所有时间戳
self.snapshots = [] # 存储对应时间戳的快照
def vote(self, candidate: str, timestamp: int) -> None:
self.candidate_votes[candidate] += 1
# 只在票数变化或达到一定间隔时创建快照,节省空间
if not self.timestamps or timestamp - self.timestamps[-1] >= 60: # 每分钟一个快照
self.timestamps.append(timestamp)
self.snapshots.append(self.candidate_votes.copy())
def query_optimized(self, top_k: int, timestamp: int) -> list:
"""优化版的查询方法"""
# 二分查找找到合适的时间点
index = bisect.bisect_right(self.timestamps, timestamp) - 1
if index < 0:
return []
votes = self.snapshots[index]
# 使用快速选择或堆排序获取top k
candidates = list(votes.items())
candidates.sort(key=lambda x: (-x[1], x[0])) # 按票数降序,票数相同时按字母序
return [candidate for candidate, _ in candidates[:top_k]]
步骤5:处理边界情况
def query_with_tie_breaking(self, top_k: int, timestamp: int) -> list:
"""处理票数相同的情况(按候选人名字典序)"""
votes_at_time = self._find_closest_timestamp(timestamp)
if not votes_at_time:
return []
# 先按票数排序,票数相同的按名字典序
candidates = []
for candidate, votes in votes_at_time.items():
candidates.append((-votes, candidate)) # 使用负数实现降序
candidates.sort() # 先按票数(负数)升序,再按名字典序
result = []
for i in range(min(top_k, len(candidates))):
result.append(candidates[i][1])
return result
步骤6:完整实现示例
class CompleteOnlineElectionSystem:
def __init__(self):
self.candidate_votes = defaultdict(int)
self.timestamps = []
self.snapshots = []
self.last_snapshot_time = 0
def vote(self, candidate: str, timestamp: int) -> None:
self.candidate_votes[candidate] += 1
# 创建时间点快照(可根据实际需求调整频率)
if timestamp - self.last_snapshot_time >= 1 or not self.timestamps:
self.timestamps.append(timestamp)
self.snapshots.append(self.candidate_votes.copy())
self.last_snapshot_time = timestamp
def query(self, top_k: int, timestamp: int) -> list:
if top_k <= 0 or not self.timestamps:
return []
# 二分查找
left, right = 0, len(self.timestamps) - 1
while left <= right:
mid = (left + right) // 2
if self.timestamps[mid] <= timestamp:
left = mid + 1
else:
right = mid - 1
if right < 0:
return []
# 获取对应时间点的票数
votes = self.snapshots[right]
# 获取top k
sorted_candidates = sorted(votes.items(),
key=lambda x: (-x[1], x[0]))
return [candidate for candidate, _ in sorted_candidates[:top_k]]
关键要点总结
- 哈希表的作用:使用字典快速记录和更新候选人的票数
- 时间戳处理:利用二分查找在有序时间戳数组中快速定位
- Top K算法:结合排序或堆操作高效获取前k名
- 空间优化:通过合理的快照策略平衡空间和时间复杂度
- 边界处理:考虑票数相同、时间戳超出范围等特殊情况
这个解决方案的时间复杂度为:投票操作O(1),查询操作O(n log k),其中n是候选人数量,k是查询的top k值。