Heapq Module: Efficient Priority Queue Operations
TL;DR
The heapq module provides heap queue (priority queue) operations using a binary heap implemented as a list, where heappush()
and heappop()
maintain the heap invariant with O(log n) complexity.
Interesting!
Python’s heapq implements a min-heap (smallest element first), but you can simulate a max-heap by negating values when pushing and popping. The heap is stored in a regular list where for any index i, the children are at indices 2i+1 and 2i+2!
Basic Heap Operations
python code snippet start
import heapq
# Create and manipulate heaps
print("=== Basic Heap Operations ===")
# Start with an empty heap
heap = []
# Push elements onto the heap
data = [4, 1, 7, 3, 8, 5]
for item in data:
heapq.heappush(heap, item)
print(f"After pushing {data}: {heap}")
# Pop the smallest element
smallest = heapq.heappop(heap)
print(f"Popped smallest: {smallest}")
print(f"Heap after pop: {heap}")
# Peek at the smallest without removing it
if heap:
print(f"Smallest element: {heap[0]}")
# Push and pop in one operation
result = heapq.heappushpop(heap, 2)
print(f"Pushed 2, popped: {result}")
print(f"Heap after pushpop: {heap}")
# Replace the smallest element
result = heapq.heapreplace(heap, 9)
print(f"Replaced smallest with 9, old value: {result}")
print(f"Heap after replace: {heap}")
python code snippet end
Converting Lists to Heaps
python code snippet start
import heapq
# Transform existing lists into heaps
print("=== Heapify Operations ===")
# Convert a list to a heap in-place
numbers = [9, 5, 6, 2, 3, 7, 1, 4, 8]
print(f"Original list: {numbers}")
heapq.heapify(numbers)
print(f"After heapify: {numbers}")
# Verify heap property
def is_valid_heap(heap):
"""Check if list maintains heap property."""
n = len(heap)
for i in range(n):
left_child = 2 * i + 1
right_child = 2 * i + 2
# Check left child
if left_child < n and heap[i] > heap[left_child]:
return False
# Check right child
if right_child < n and heap[i] > heap[right_child]:
return False
return True
print(f"Is valid heap: {is_valid_heap(numbers)}")
# Create heap from existing data
data = [15, 10, 25, 5, 30, 8, 20]
heap_copy = data.copy()
heapq.heapify(heap_copy)
print(f"Original: {data}")
print(f"Heapified: {heap_copy}")
python code snippet end
Priority Queue Implementation
python code snippet start
import heapq
from dataclasses import dataclass, field
from typing import Any
# Simple priority queue using tuples
print("=== Priority Queue with Tuples ===")
def basic_priority_queue():
"""Basic priority queue using (priority, item) tuples."""
pq = []
# Lower numbers = higher priority
tasks = [
(3, "Low priority task"),
(1, "High priority task"),
(2, "Medium priority task"),
(1, "Another high priority task")
]
# Add tasks to priority queue
for priority, task in tasks:
heapq.heappush(pq, (priority, task))
print("Processing tasks by priority:")
while pq:
priority, task = heapq.heappop(pq)
print(f"Priority {priority}: {task}")
basic_priority_queue()
# Advanced priority queue class
@dataclass
class Task:
priority: int
description: str
created_at: float = field(default_factory=lambda: __import__('time').time())
def __lt__(self, other):
# For heapq comparison - lower priority number = higher priority
if self.priority != other.priority:
return self.priority < other.priority
# If same priority, older tasks first
return self.created_at < other.created_at
class PriorityQueue:
"""Advanced priority queue implementation."""
def __init__(self):
self._queue = []
self._index = 0 # Tie-breaker for same priorities
def push(self, item, priority):
"""Add item with given priority."""
# Use negative priority for max-heap behavior if needed
heapq.heappush(self._queue, (priority, self._index, item))
self._index += 1
def pop(self):
"""Remove and return highest priority item."""
if self.is_empty():
raise IndexError("pop from empty priority queue")
return heapq.heappop(self._queue)[-1] # Return just the item
def peek(self):
"""Return highest priority item without removing."""
if self.is_empty():
raise IndexError("peek at empty priority queue")
return self._queue[0][-1]
def is_empty(self):
"""Check if queue is empty."""
return len(self._queue) == 0
def size(self):
"""Return number of items in queue."""
return len(self._queue)
# Usage example
print("\n=== Advanced Priority Queue ===")
pq = PriorityQueue()
# Add items (lower number = higher priority)
pq.push("Emergency fix", 1)
pq.push("Code review", 3)
pq.push("Bug fix", 2)
pq.push("Documentation", 4)
pq.push("Critical bug", 1)
print("Processing tasks:")
while not pq.is_empty():
task = pq.pop()
print(f"Processing: {task}")
python code snippet end
Finding Largest and Smallest Elements
python code snippet start
import heapq
import random
# Efficient finding of n largest/smallest elements
print("=== Finding Extremes ===")
# Generate test data
data = random.sample(range(1, 101), 20)
print(f"Test data: {data}")
# Find n largest elements
n = 5
largest = heapq.nlargest(n, data)
print(f"{n} largest elements: {largest}")
# Find n smallest elements
smallest = heapq.nsmallest(n, data)
print(f"{n} smallest elements: {smallest}")
# Working with complex objects
class Student:
def __init__(self, name, grade):
self.name = name
self.grade = grade
def __repr__(self):
return f"Student('{self.name}', {self.grade})"
students = [
Student("Alice", 92),
Student("Bob", 78),
Student("Charlie", 85),
Student("Diana", 96),
Student("Eve", 89),
Student("Frank", 73)
]
# Find top students by grade
top_students = heapq.nlargest(3, students, key=lambda s: s.grade)
print(f"Top 3 students: {top_students}")
# Find students who need help
help_needed = heapq.nsmallest(2, students, key=lambda s: s.grade)
print(f"Students needing help: {help_needed}")
# Performance comparison
import time
def performance_comparison():
"""Compare heapq vs sorted for finding extremes."""
large_data = random.sample(range(1, 100000), 50000)
# Using heapq.nlargest
start = time.time()
heap_result = heapq.nlargest(10, large_data)
heap_time = time.time() - start
# Using sorted
start = time.time()
sorted_result = sorted(large_data, reverse=True)[:10]
sorted_time = time.time() - start
print(f"\nPerformance comparison (finding 10 largest from 50K items):")
print(f"heapq.nlargest: {heap_time:.4f} seconds")
print(f"sorted + slice: {sorted_time:.4f} seconds")
print(f"Speedup: {sorted_time/heap_time:.2f}x")
return heap_result, sorted_result
heap_res, sorted_res = performance_comparison()
print(f"Results identical: {heap_res == sorted_res}")
python code snippet end
Heap Algorithms and Applications
python code snippet start
import heapq
from collections import defaultdict
# Heap sort implementation
print("=== Heap Sort ===")
def heap_sort(arr):
"""Sort array using heap operations."""
# Create a copy to avoid modifying original
data = arr.copy()
heapq.heapify(data)
result = []
while data:
result.append(heapq.heappop(data))
return result
def heap_sort_inplace(arr):
"""In-place heap sort (more memory efficient)."""
# Build max heap by negating values
neg_arr = [-x for x in arr]
heapq.heapify(neg_arr)
# Extract elements in sorted order
sorted_arr = []
while neg_arr:
sorted_arr.append(-heapq.heappop(neg_arr))
return sorted_arr
# Test heap sort
test_data = [64, 34, 25, 12, 22, 11, 90, 5]
print(f"Original: {test_data}")
print(f"Heap sorted: {heap_sort(test_data)}")
print(f"In-place heap sorted: {heap_sort_inplace(test_data)}")
# Merge multiple sorted sequences
print("\n=== Merging Sorted Sequences ===")
def merge_sorted_sequences(*sequences):
"""Efficiently merge multiple sorted sequences."""
# Create heap of (value, sequence_index, item_index)
heap = []
# Initialize heap with first item from each sequence
for seq_idx, sequence in enumerate(sequences):
if sequence: # Non-empty sequence
heapq.heappush(heap, (sequence[0], seq_idx, 0))
result = []
while heap:
value, seq_idx, item_idx = heapq.heappop(heap)
result.append(value)
# Add next item from the same sequence
next_idx = item_idx + 1
if next_idx < len(sequences[seq_idx]):
next_value = sequences[seq_idx][next_idx]
heapq.heappush(heap, (next_value, seq_idx, next_idx))
return result
# Test merging
seq1 = [1, 4, 7, 10]
seq2 = [2, 5, 8, 11]
seq3 = [3, 6, 9, 12]
merged = merge_sorted_sequences(seq1, seq2, seq3)
print(f"Sequence 1: {seq1}")
print(f"Sequence 2: {seq2}")
print(f"Sequence 3: {seq3}")
print(f"Merged: {merged}")
# K-way merge using heapq.merge (built-in)
builtin_merged = list(heapq.merge(seq1, seq2, seq3))
print(f"Built-in merge: {builtin_merged}")
print(f"Results identical: {merged == builtin_merged}")
python code snippet end
Real-World Applications
python code snippet start
import heapq
import time
from collections import namedtuple
from datetime import datetime, timedelta
# Job scheduling system
print("=== Job Scheduling System ===")
Job = namedtuple('Job', ['priority', 'scheduled_time', 'name', 'duration'])
class JobScheduler:
"""Job scheduler using priority queue."""
def __init__(self):
self._jobs = []
self._job_counter = 0
def schedule_job(self, name, priority, duration, delay=0):
"""Schedule a job with given priority and delay."""
scheduled_time = time.time() + delay
job = Job(priority, scheduled_time, name, duration)
heapq.heappush(self._jobs, job)
print(f"Scheduled: {name} (priority {priority}, delay {delay}s)")
def run_ready_jobs(self):
"""Run all jobs that are ready to execute."""
current_time = time.time()
ready_jobs = []
# Extract ready jobs
temp_jobs = []
while self._jobs:
job = heapq.heappop(self._jobs)
if job.scheduled_time <= current_time:
ready_jobs.append(job)
else:
temp_jobs.append(job)
# Put non-ready jobs back
for job in temp_jobs:
heapq.heappush(self._jobs, job)
# Execute ready jobs in priority order
for job in sorted(ready_jobs):
print(f"Executing: {job.name} (duration: {job.duration}s)")
return len(ready_jobs)
def pending_jobs(self):
"""Return number of pending jobs."""
return len(self._jobs)
# Usage example
scheduler = JobScheduler()
scheduler.schedule_job("Critical system update", 1, 30, delay=0)
scheduler.schedule_job("Backup database", 3, 60, delay=2)
scheduler.schedule_job("Send notifications", 2, 5, delay=1)
scheduler.schedule_job("Generate reports", 4, 120, delay=3)
print("\nRunning ready jobs...")
ready_count = scheduler.run_ready_jobs()
print(f"Executed {ready_count} jobs, {scheduler.pending_jobs()} pending")
# Dijkstra's shortest path algorithm
print("\n=== Shortest Path Algorithm ===")
class Graph:
"""Graph representation for shortest path."""
def __init__(self):
self.vertices = defaultdict(list)
def add_edge(self, from_vertex, to_vertex, weight):
"""Add weighted edge to graph."""
self.vertices[from_vertex].append((to_vertex, weight))
def dijkstra(self, start, end):
"""Find shortest path using Dijkstra's algorithm."""
# Priority queue: (distance, vertex, path)
pq = [(0, start, [start])]
visited = set()
distances = defaultdict(lambda: float('inf'))
distances[start] = 0
while pq:
current_dist, current_vertex, path = heapq.heappop(pq)
if current_vertex in visited:
continue
visited.add(current_vertex)
if current_vertex == end:
return current_dist, path
# Explore neighbors
for neighbor, weight in self.vertices[current_vertex]:
if neighbor not in visited:
new_dist = current_dist + weight
if new_dist < distances[neighbor]:
distances[neighbor] = new_dist
new_path = path + [neighbor]
heapq.heappush(pq, (new_dist, neighbor, new_path))
return float('inf'), [] # No path found
# Create sample graph
graph = Graph()
graph.add_edge('A', 'B', 4)
graph.add_edge('A', 'C', 2)
graph.add_edge('B', 'C', 1)
graph.add_edge('B', 'D', 5)
graph.add_edge('C', 'D', 8)
graph.add_edge('C', 'E', 10)
graph.add_edge('D', 'E', 2)
# Find shortest path
distance, path = graph.dijkstra('A', 'E')
print(f"Shortest path from A to E: {' -> '.join(path)}")
print(f"Total distance: {distance}")
# Event simulation system
print("\n=== Event Simulation ===")
Event = namedtuple('Event', ['time', 'type', 'data'])
class EventSimulator:
"""Discrete event simulator using heap queue."""
def __init__(self):
self._events = []
self._current_time = 0
def schedule_event(self, event_time, event_type, data=None):
"""Schedule an event at given time."""
event = Event(event_time, event_type, data)
heapq.heappush(self._events, event)
def run_simulation(self, max_time=100):
"""Run simulation until max_time."""
while self._events and self._current_time < max_time:
event = heapq.heappop(self._events)
if event.time > max_time:
break
self._current_time = event.time
self._process_event(event)
def _process_event(self, event):
"""Process a single event."""
print(f"Time {event.time}: {event.type}")
# Example event processing
if event.type == "customer_arrival":
# Schedule service completion
service_time = self._current_time + 5 # 5 time units
self.schedule_event(service_time, "service_complete", event.data)
elif event.type == "service_complete":
print(f" Customer {event.data} service completed")
# Run simulation
sim = EventSimulator()
sim.schedule_event(10, "customer_arrival", "Customer1")
sim.schedule_event(15, "customer_arrival", "Customer2")
sim.schedule_event(25, "customer_arrival", "Customer3")
print("Running event simulation:")
sim.run_simulation(50)
python code snippet end
Advanced Heap Techniques
python code snippet start
import heapq
from typing import List, Tuple
import bisect
# Max heap implementation
print("=== Max Heap Implementation ===")
class MaxHeap:
"""Max heap implementation using min heap with negated values."""
def __init__(self, initial_data=None):
if initial_data:
self._heap = [-x for x in initial_data]
heapq.heapify(self._heap)
else:
self._heap = []
def push(self, item):
"""Add item to max heap."""
heapq.heappush(self._heap, -item)
def pop(self):
"""Remove and return largest item."""
if not self._heap:
raise IndexError("pop from empty heap")
return -heapq.heappop(self._heap)
def peek(self):
"""Return largest item without removing."""
if not self._heap:
raise IndexError("peek at empty heap")
return -self._heap[0]
def __len__(self):
return len(self._heap)
def __bool__(self):
return bool(self._heap)
# Test max heap
max_heap = MaxHeap([3, 1, 6, 5, 2, 4])
print("Max heap operations:")
print(f"Largest: {max_heap.peek()}")
print(f"Pop largest: {max_heap.pop()}")
print(f"Pop largest: {max_heap.pop()}")
max_heap.push(10)
max_heap.push(7)
print(f"After adding 10 and 7, largest: {max_heap.peek()}")
# Median finder using two heaps
print("\n=== Running Median Calculator ===")
class MedianFinder:
"""Efficiently find median of a stream of numbers."""
def __init__(self):
# Max heap for smaller half (negate values)
self._small = []
# Min heap for larger half
self._large = []
def add_number(self, num):
"""Add a number to the data structure."""
# Add to appropriate heap
if not self._small or num <= -self._small[0]:
heapq.heappush(self._small, -num)
else:
heapq.heappush(self._large, num)
# Balance heaps (size difference <= 1)
if len(self._small) > len(self._large) + 1:
val = -heapq.heappop(self._small)
heapq.heappush(self._large, val)
elif len(self._large) > len(self._small) + 1:
val = heapq.heappop(self._large)
heapq.heappush(self._small, -val)
def find_median(self):
"""Return current median."""
if len(self._small) == len(self._large):
if not self._small: # No numbers added yet
return None
return (-self._small[0] + self._large[0]) / 2.0
elif len(self._small) > len(self._large):
return -self._small[0]
else:
return self._large[0]
# Test median finder
median_finder = MedianFinder()
numbers = [5, 15, 1, 3, 8, 7, 9, 2, 6]
print("Adding numbers and finding median:")
for num in numbers:
median_finder.add_number(num)
median = median_finder.find_median()
print(f"After adding {num}: median = {median}")
# K closest points to origin
print("\n=== K Closest Points ===")
def k_closest_points(points: List[Tuple[int, int]], k: int) -> List[Tuple[int, int]]:
"""Find k closest points to origin using heap."""
# Use max heap to keep track of k closest points
heap = []
for x, y in points:
distance_squared = x*x + y*y
if len(heap) < k:
heapq.heappush(heap, (-distance_squared, x, y))
elif distance_squared < -heap[0][0]:
heapq.heapreplace(heap, (-distance_squared, x, y))
return [(x, y) for _, x, y in heap]
# Test k closest points
points = [(1, 3), (3, 4), (2, -1), (-2, 2), (5, 3), (3, 1)]
k = 3
closest = k_closest_points(points, k)
print(f"Points: {points}")
print(f"{k} closest points to origin: {closest}")
# Top K frequent elements
print("\n=== Top K Frequent Elements ===")
def top_k_frequent(nums: List[int], k: int) -> List[int]:
"""Find k most frequent elements using heap."""
from collections import Counter
# Count frequencies
count = Counter(nums)
# Use min heap to keep track of top k
heap = []
for num, freq in count.items():
if len(heap) < k:
heapq.heappush(heap, (freq, num))
elif freq > heap[0][0]:
heapq.heapreplace(heap, (freq, num))
return [num for freq, num in heap]
# Test top k frequent
nums = [1, 1, 1, 2, 2, 3, 3, 3, 3, 4]
k = 2
frequent = top_k_frequent(nums, k)
print(f"Numbers: {nums}")
print(f"Top {k} frequent: {frequent}")
python code snippet end
Performance and Best Practices
python code snippet start
import heapq
import time
import random
from collections import deque
# Performance comparisons
print("=== Performance Analysis ===")
def benchmark_operations():
"""Benchmark different heap operations."""
# Test data sizes
sizes = [1000, 10000, 100000]
for size in sizes:
print(f"\nBenchmarking with {size} elements:")
# Generate random data
data = [random.randint(1, size * 10) for _ in range(size)]
# Heapify performance
heap_data = data.copy()
start = time.time()
heapq.heapify(heap_data)
heapify_time = time.time() - start
# Push operations
empty_heap = []
start = time.time()
for item in data:
heapq.heappush(empty_heap, item)
push_time = time.time() - start
# Pop operations
start = time.time()
while heap_data:
heapq.heappop(heap_data)
pop_time = time.time() - start
print(f" Heapify: {heapify_time:.4f}s")
print(f" Push all: {push_time:.4f}s")
print(f" Pop all: {pop_time:.4f}s")
# Memory efficiency tips
print("\n=== Memory Efficiency Tips ===")
def memory_efficient_heap_operations():
"""Demonstrate memory-efficient heap usage."""
# Use heapify instead of repeated pushes
data = list(range(10000, 0, -1)) # Reverse sorted
# Inefficient: repeated pushes
start = time.time()
heap1 = []
for item in data:
heapq.heappush(heap1, item)
time1 = time.time() - start
# Efficient: heapify
start = time.time()
heap2 = data.copy()
heapq.heapify(heap2)
time2 = time.time() - start
print(f"Repeated pushes: {time1:.4f}s")
print(f"Heapify: {time2:.4f}s")
print(f"Speedup: {time1/time2:.2f}x")
# Memory-efficient k-largest without storing all
def efficient_k_largest(iterable, k):
"""Memory-efficient k largest elements."""
heap = []
for item in iterable:
if len(heap) < k:
heapq.heappush(heap, item)
elif item > heap[0]:
heapq.heapreplace(heap, item)
return heap
# Test with large data
large_data = (random.randint(1, 1000000) for _ in range(1000000))
k_largest = efficient_k_largest(large_data, 10)
print(f"10 largest from 1M elements: {sorted(k_largest, reverse=True)}")
# Best practices
class HeapBestPractices:
"""Collection of heap best practices."""
@staticmethod
def stable_heap_sort(arr, key=None, reverse=False):
"""Stable heap sort with custom key function."""
if key is None:
key = lambda x: x
# Create tuples with (key_value, original_index, item)
indexed_items = [(key(item), i, item) for i, item in enumerate(arr)]
if reverse:
# For descending order, negate the key values
indexed_items = [(-key_val, i, item) for key_val, i, item in indexed_items]
heapq.heapify(indexed_items)
result = []
while indexed_items:
key_val, orig_idx, item = heapq.heappop(indexed_items)
result.append(item)
return result
@staticmethod
def heap_based_priority_queue_with_update():
"""Priority queue that supports priority updates."""
class UpdatablePriorityQueue:
def __init__(self):
self._heap = []
self._entry_finder = {} # mapping of items to entries
self._counter = 0 # unique sequence count
self.REMOVED = '<removed-item>' # placeholder for removed items
def add_item(self, item, priority=0):
"""Add a new item or update priority of existing item."""
if item in self._entry_finder:
self.remove_item(item)
entry = [priority, self._counter, item]
self._entry_finder[item] = entry
heapq.heappush(self._heap, entry)
self._counter += 1
def remove_item(self, item):
"""Mark an existing item as REMOVED."""
entry = self._entry_finder.pop(item)
entry[-1] = self.REMOVED
def pop_item(self):
"""Remove and return the lowest priority item."""
while self._heap:
priority, count, item = heapq.heappop(self._heap)
if item is not self.REMOVED:
del self._entry_finder[item]
return item
raise KeyError('pop from empty priority queue')
def is_empty(self):
return not self._entry_finder
return UpdatablePriorityQueue()
@staticmethod
def sliding_window_maximum(arr, k):
"""Find maximum in each sliding window of size k."""
if not arr or k <= 0:
return []
# Use deque to maintain elements in decreasing order
from collections import deque
dq = deque()
result = []
for i in range(len(arr)):
# Remove elements outside current window
while dq and dq[0] <= i - k:
dq.popleft()
# Remove smaller elements from back
while dq and arr[dq[-1]] <= arr[i]:
dq.pop()
dq.append(i)
# Add to result when window is full
if i >= k - 1:
result.append(arr[dq[0]])
return result
# Test best practices
print("\n=== Best Practices Examples ===")
# Stable sort
students = [
("Alice", 85),
("Bob", 90),
("Charlie", 85),
("Diana", 90)
]
stable_sorted = HeapBestPractices.stable_heap_sort(
students,
key=lambda x: x[1],
reverse=True
)
print("Stable sort by grade (descending):")
for student in stable_sorted:
print(f" {student}")
# Updatable priority queue
pq = HeapBestPractices.heap_based_priority_queue_with_update()
pq.add_item("task1", 3)
pq.add_item("task2", 1)
pq.add_item("task3", 2)
pq.add_item("task2", 4) # Update priority
print("\nUpdatable priority queue:")
while not pq.is_empty():
task = pq.pop_item()
print(f" Processing: {task}")
# Sliding window maximum
arr = [1, 3, -1, -3, 5, 3, 6, 7]
k = 3
maximums = HeapBestPractices.sliding_window_maximum(arr, k)
print(f"\nSliding window maximums (k={k}):")
print(f"Array: {arr}")
print(f"Maximums: {maximums}")
# Run benchmarks
benchmark_operations()
memory_efficient_heap_operations()
python code snippet end
The heapq module provides efficient implementations of fundamental algorithms and data structures, making it essential for performance-critical applications involving priority queues, sorting, and optimization problems. Use heapq with statistics for data analysis and timeit for performance testing .
Reference: Python Heapq Module Documentation