哈希算法题目:设计一个基于哈希的在线选举系统
字数 664 2025-11-02 11:43:41

哈希算法题目:设计一个基于哈希的在线选举系统

题目描述

你需要设计一个在线选举系统,用于实时统计候选人的得票情况。系统需要支持以下操作:

  • vote(candidate, timestamp):在给定的时间戳为某个候选人投票
  • query(top_k, timestamp):查询到某个时间戳为止,得票数前k名的候选人列表

要求实现一个基于哈希的高效解决方案,能够处理大量投票数据。

解题思路分析

这个问题的核心在于如何高效地记录投票数据并快速查询top k结果。我们需要考虑:

  1. 投票数据的时间顺序性(按时间戳递增)
  2. 需要支持任意时间点的top k查询
  3. 系统需要处理高频的投票操作

解决方案详解

步骤1:数据结构设计

我们需要设计合适的数据结构来存储和查询投票数据:

from collections import defaultdict
import bisect
import heapq

class OnlineElectionSystem:
    def __init__(self):
        # 存储每个候选人的总票数
        self.candidate_votes = defaultdict(int)
        
        # 按时间戳存储每个时刻的候选人票数快照
        self.timestamp_snapshots = []
        self.vote_records = []
        
        # 使用最大堆来维护top k候选人
        self.max_heap = []
        
    def vote(self, candidate: str, timestamp: int) -> None:
        """记录投票"""
        self.candidate_votes[candidate] += 1
        
        # 记录当前时间戳的票数快照
        current_votes = self.candidate_votes.copy()
        self.timestamp_snapshots.append((timestamp, current_votes))
        self.vote_records.append((timestamp, candidate))

步骤2:时间戳处理优化

由于时间戳是递增的,我们可以使用二分查找来快速定位特定时间点的数据:

def _find_closest_timestamp(self, timestamp: int) -> dict:
    """使用二分查找找到小于等于给定时间戳的最新快照"""
    # 找到第一个时间戳大于target的位置
    index = bisect.bisect_right(self.timestamp_snapshots, 
                               (timestamp, {})) - 1
    
    if index < 0:
        return {}
    
    return self.timestamp_snapshots[index][1]

步骤3:Top K查询实现

使用最大堆来高效获取前k名候选人:

def query(self, top_k: int, timestamp: int) -> list:
    """查询指定时间戳的前k名候选人"""
    if top_k <= 0:
        return []
    
    # 获取指定时间点的票数数据
    votes_at_time = self._find_closest_timestamp(timestamp)
    if not votes_at_time:
        return []
    
    # 使用最大堆获取top k
    heap = []
    for candidate, votes in votes_at_time.items():
        # 使用负数票数构建最大堆(Python的heapq是最小堆)
        heapq.heappush(heap, (-votes, candidate))
    
    result = []
    for _ in range(min(top_k, len(heap))):
        votes, candidate = heapq.heappop(heap)
        result.append(candidate)
    
    return result

步骤4:性能优化版本

对于高频投票场景,我们可以进一步优化:

class OptimizedOnlineElectionSystem:
    def __init__(self):
        self.candidate_votes = defaultdict(int)
        self.timestamps = []  # 存储所有时间戳
        self.snapshots = []   # 存储对应时间戳的快照
        
    def vote(self, candidate: str, timestamp: int) -> None:
        self.candidate_votes[candidate] += 1
        
        # 只在票数变化或达到一定间隔时创建快照,节省空间
        if not self.timestamps or timestamp - self.timestamps[-1] >= 60:  # 每分钟一个快照
            self.timestamps.append(timestamp)
            self.snapshots.append(self.candidate_votes.copy())
    
    def query_optimized(self, top_k: int, timestamp: int) -> list:
        """优化版的查询方法"""
        # 二分查找找到合适的时间点
        index = bisect.bisect_right(self.timestamps, timestamp) - 1
        if index < 0:
            return []
        
        votes = self.snapshots[index]
        
        # 使用快速选择或堆排序获取top k
        candidates = list(votes.items())
        candidates.sort(key=lambda x: (-x[1], x[0]))  # 按票数降序,票数相同时按字母序
        
        return [candidate for candidate, _ in candidates[:top_k]]

步骤5:处理边界情况

def query_with_tie_breaking(self, top_k: int, timestamp: int) -> list:
    """处理票数相同的情况(按候选人名字典序)"""
    votes_at_time = self._find_closest_timestamp(timestamp)
    if not votes_at_time:
        return []
    
    # 先按票数排序,票数相同的按名字典序
    candidates = []
    for candidate, votes in votes_at_time.items():
        candidates.append((-votes, candidate))  # 使用负数实现降序
    
    candidates.sort()  # 先按票数(负数)升序,再按名字典序
    
    result = []
    for i in range(min(top_k, len(candidates))):
        result.append(candidates[i][1])
    
    return result

步骤6:完整实现示例

class CompleteOnlineElectionSystem:
    def __init__(self):
        self.candidate_votes = defaultdict(int)
        self.timestamps = []
        self.snapshots = []
        self.last_snapshot_time = 0
    
    def vote(self, candidate: str, timestamp: int) -> None:
        self.candidate_votes[candidate] += 1
        
        # 创建时间点快照(可根据实际需求调整频率)
        if timestamp - self.last_snapshot_time >= 1 or not self.timestamps:
            self.timestamps.append(timestamp)
            self.snapshots.append(self.candidate_votes.copy())
            self.last_snapshot_time = timestamp
    
    def query(self, top_k: int, timestamp: int) -> list:
        if top_k <= 0 or not self.timestamps:
            return []
        
        # 二分查找
        left, right = 0, len(self.timestamps) - 1
        while left <= right:
            mid = (left + right) // 2
            if self.timestamps[mid] <= timestamp:
                left = mid + 1
            else:
                right = mid - 1
        
        if right < 0:
            return []
        
        # 获取对应时间点的票数
        votes = self.snapshots[right]
        
        # 获取top k
        sorted_candidates = sorted(votes.items(), 
                                 key=lambda x: (-x[1], x[0]))
        
        return [candidate for candidate, _ in sorted_candidates[:top_k]]

关键要点总结

  1. 哈希表的作用:使用字典快速记录和更新候选人的票数
  2. 时间戳处理:利用二分查找在有序时间戳数组中快速定位
  3. Top K算法:结合排序或堆操作高效获取前k名
  4. 空间优化:通过合理的快照策略平衡空间和时间复杂度
  5. 边界处理:考虑票数相同、时间戳超出范围等特殊情况

这个解决方案的时间复杂度为:投票操作O(1),查询操作O(n log k),其中n是候选人数量,k是查询的top k值。

哈希算法题目:设计一个基于哈希的在线选举系统 题目描述 你需要设计一个在线选举系统,用于实时统计候选人的得票情况。系统需要支持以下操作: vote(candidate, timestamp) :在给定的时间戳为某个候选人投票 query(top_k, timestamp) :查询到某个时间戳为止,得票数前k名的候选人列表 要求实现一个基于哈希的高效解决方案,能够处理大量投票数据。 解题思路分析 这个问题的核心在于如何高效地记录投票数据并快速查询top k结果。我们需要考虑: 投票数据的时间顺序性(按时间戳递增) 需要支持任意时间点的top k查询 系统需要处理高频的投票操作 解决方案详解 步骤1:数据结构设计 我们需要设计合适的数据结构来存储和查询投票数据: 步骤2:时间戳处理优化 由于时间戳是递增的,我们可以使用二分查找来快速定位特定时间点的数据: 步骤3:Top K查询实现 使用最大堆来高效获取前k名候选人: 步骤4:性能优化版本 对于高频投票场景,我们可以进一步优化: 步骤5:处理边界情况 步骤6:完整实现示例 关键要点总结 哈希表的作用 :使用字典快速记录和更新候选人的票数 时间戳处理 :利用二分查找在有序时间戳数组中快速定位 Top K算法 :结合排序或堆操作高效获取前k名 空间优化 :通过合理的快照策略平衡空间和时间复杂度 边界处理 :考虑票数相同、时间戳超出范围等特殊情况 这个解决方案的时间复杂度为:投票操作O(1),查询操作O(n log k),其中n是候选人数量,k是查询的top k值。