排序算法之:多路归并排序(K-Way Merge Sort)的进阶应用:海量数据流的中位数查找
字数 909 2025-11-13 08:29:51
排序算法之:多路归并排序(K-Way Merge Sort)的进阶应用:海量数据流的中位数查找
题目描述
假设有一个持续不断的数据流,数据量可能达到数十亿级别。我们需要实时维护这个数据流的中位数(即排序后位于中间位置的数)。如果数据量为偶数,中位数取中间两个数的平均值。请设计一个高效算法,能够在数据不断到达时快速返回当前中位数。
解题过程
步骤1:问题分析
- 数据流特性:数据逐个到达,无法预知全部数据
- 内存限制:数据量可能超过内存容量,不能存储全部数据
- 实时性要求:每次新数据到达后需要快速更新中位数
- 中位数定义:对于n个数据,中位数是第⌈n/2⌉小的数(n为奇数时)或第n/2小和第n/2+1小的平均数(n为偶数时)
步骤2:核心思路
使用两个堆来维护数据流:
- 最大堆:存储较小的一半数据,堆顶是这半数据的最大值
- 最小堆:存储较大的一半数据,堆顶是这半数据的最小值
- 保持两个堆的大小相等(数据量为偶数时)或最大堆比最小堆多1个元素(数据量为奇数时)
步骤3:数据结构设计
import heapq
class MedianFinder:
def __init__(self):
# 最大堆(通过存储负数实现)
self.max_heap = []
# 最小堆
self.min_heap = []
def addNum(self, num: int) -> None:
# 详细实现见下一步
pass
def findMedian(self) -> float:
# 详细实现见下一步
pass
步骤4:添加数据的详细步骤
def addNum(self, num: int) -> None:
# 第一步:将新元素插入到最大堆
heapq.heappush(self.max_heap, -num)
# 第二步:平衡操作 - 确保最大堆的堆顶 <= 最小堆的堆顶
if self.max_heap and self.min_heap and (-self.max_heap[0] > self.min_heap[0]):
max_heap_top = -heapq.heappop(self.max_heap)
heapq.heappush(self.min_heap, max_heap_top)
# 第三步:调整堆的大小,保持平衡条件
# 最大堆大小应该等于最小堆大小,或者比最小堆多1
if len(self.max_heap) > len(self.min_heap) + 1:
max_heap_top = -heapq.heappop(self.max_heap)
heapq.heappush(self.min_heap, max_heap_top)
elif len(self.min_heap) > len(self.max_heap):
min_heap_top = heapq.heappop(self.min_heap)
heapq.heappush(self.max_heap, -min_heap_top)
步骤5:查找中位数的实现
def findMedian(self) -> float:
if len(self.max_heap) > len(self.min_heap):
# 奇数个元素,中位数在最大堆堆顶
return -self.max_heap[0]
else:
# 偶数个元素,中位数是两个堆顶的平均值
return (-self.max_heap[0] + self.min_heap[0]) / 2
步骤6:算法正确性证明
- 有序性保证:通过步骤4的平衡操作,确保最大堆的所有元素 ≤ 最小堆的所有元素
- 大小平衡:通过步骤4的大小调整,保证两个堆的大小满足:
- |最大堆大小 - 最小堆大小| ≤ 1
- 中位数位置:
- 当总数为奇数时,中位数在较大的那半个堆的堆顶
- 当总数为偶数时,中位数是两个堆顶的平均值
步骤7:时间复杂度分析
- addNum操作:O(log n),每个堆操作的时间复杂度
- findMedian操作:O(1),只是访问堆顶元素
- 空间复杂度:O(n),需要存储所有数据
步骤8:处理海量数据的优化
对于极端海量数据(如超过内存容量),可以采用以下策略:
- 分位数摘要:使用GK算法或Q-digest维护近似中位数
- 抽样方法:对数据流进行随机抽样,在样本上计算近似中位数
- 分段处理:将数据分成多个文件,使用外部排序和多路归并
步骤9:完整代码示例
import heapq
class MedianFinder:
def __init__(self):
self.max_heap = [] # 存储较小的一半(最大堆)
self.min_heap = [] # 存储较大的一半(最小堆)
def addNum(self, num: int) -> None:
heapq.heappush(self.max_heap, -num)
# 平衡堆顶元素
if (self.max_heap and self.min_heap and
-self.max_heap[0] > self.min_heap[0]):
max_top = -heapq.heappop(self.max_heap)
heapq.heappush(self.min_heap, max_top)
# 平衡堆大小
if len(self.max_heap) > len(self.min_heap) + 1:
max_top = -heapq.heappop(self.max_heap)
heapq.heappush(self.min_heap, max_top)
elif len(self.min_heap) > len(self.max_heap):
min_top = heapq.heappop(self.min_heap)
heapq.heappush(self.max_heap, -min_top)
def findMedian(self) -> float:
if len(self.max_heap) > len(self.min_heap):
return -self.max_heap[0]
else:
return (-self.max_heap[0] + self.min_heap[0]) / 2
# 使用示例
finder = MedianFinder()
for i in [1, 3, 2, 6, 4, 5]:
finder.addNum(i)
print(f"当前中位数: {finder.findMedian()}")
这个算法巧妙地利用了两个堆的性质,在数据流不断到达时高效维护中位数,是处理实时数据统计问题的经典解决方案。