Gzip Module: Efficient File Compression and Decompression
TL;DR
The gzip module provides seamless compression and decompression of files using the gzip format, reducing file sizes by 60-90% while maintaining a simple interface compatible with standard file operations.
Interesting!
Gzip compression can reduce text files by up to 90% and typically achieves 60-80% compression on most data types - a 100MB log file can become just 10-20MB, dramatically saving storage space and transfer time.
Basic File Operations
Reading Compressed Files
python code snippet start
import gzip
# Read compressed text file
with gzip.open('data.txt.gz', 'rt', encoding='utf-8') as f:
content = f.read()
print(content)
# Read compressed binary file
with gzip.open('data.bin.gz', 'rb') as f:
binary_data = f.read()
print(f"Read {len(binary_data)} bytes")
# Read line by line (memory efficient)
with gzip.open('large_file.txt.gz', 'rt') as f:
for line_num, line in enumerate(f, 1):
print(f"Line {line_num}: {line.strip()}")
if line_num >= 5: # Just show first 5 lines
break
python code snippet end
Writing Compressed Files
python code snippet start
# Write text to compressed file
data = "This is some text data that will be compressed.\n" * 1000
with gzip.open('output.txt.gz', 'wt', encoding='utf-8') as f:
f.write(data)
print(f"Original size: {len(data.encode())} bytes")
print(f"Compressed size: {os.path.getsize('output.txt.gz')} bytes")
# Write binary data
binary_data = b"Binary data to compress" * 100
with gzip.open('binary.gz', 'wb') as f:
f.write(binary_data)
python code snippet end
In-Memory Compression
compress() and decompress()
python code snippet start
import gzip
# Compress data in memory
original_data = b"This text will be compressed in memory without creating files"
compressed = gzip.compress(original_data)
print(f"Original: {len(original_data)} bytes")
print(f"Compressed: {len(compressed)} bytes")
print(f"Compression ratio: {len(compressed)/len(original_data):.2%}")
# Decompress data
decompressed = gzip.decompress(compressed)
print(f"Decompressed: {decompressed.decode()}")
print(f"Data matches: {original_data == decompressed}")
python code snippet end
Compression Levels
python code snippet start
# Test different compression levels (0-9)
test_data = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit. " * 100
for level in range(0, 10):
compressed = gzip.compress(test_data, compresslevel=level)
ratio = len(compressed) / len(test_data)
print(f"Level {level}: {len(compressed)} bytes ({ratio:.2%})")
# Level 0: No compression (fastest)
# Level 1: Fastest compression
# Level 6: Default balance
# Level 9: Maximum compression (slowest)
python code snippet end
Log File Processing
Rotating Log Compression
python code snippet start
import gzip
import os
from datetime import datetime
class LogCompressor:
def __init__(self, log_dir: str):
self.log_dir = log_dir
def compress_old_logs(self, days_old: int = 7):
"""Compress log files older than specified days"""
import time
cutoff_time = time.time() - (days_old * 24 * 60 * 60)
for filename in os.listdir(self.log_dir):
if filename.endswith('.log') and not filename.endswith('.gz'):
filepath = os.path.join(self.log_dir, filename)
# Check if file is old enough
if os.path.getmtime(filepath) < cutoff_time:
self.compress_file(filepath)
def compress_file(self, filepath: str):
"""Compress a single log file"""
compressed_path = filepath + '.gz'
with open(filepath, 'rb') as f_in:
with gzip.open(compressed_path, 'wb') as f_out:
f_out.writelines(f_in)
# Remove original after successful compression
os.remove(filepath)
print(f"Compressed: {filepath} -> {compressed_path}")
# Usage
# compressor = LogCompressor('/var/log/myapp')
# compressor.compress_old_logs(days_old=7)
python code snippet end
Log Analysis from Compressed Files
python code snippet start
import gzip
import re
from collections import Counter
def analyze_compressed_logs(log_files: list[str]) -> dict:
"""Analyze log patterns from compressed files"""
error_patterns = Counter()
total_lines = 0
error_regex = re.compile(r'ERROR.*?:(.*?)(?:\n|$)')
for log_file in log_files:
with gzip.open(log_file, 'rt', encoding='utf-8', errors='ignore') as f:
for line in f:
total_lines += 1
# Extract error messages
match = error_regex.search(line)
if match:
error_type = match.group(1).strip()
error_patterns[error_type] += 1
return {
'total_lines': total_lines,
'error_patterns': dict(error_patterns.most_common(10))
}
# Usage
# log_files = ['app.log.gz', 'app.log.1.gz', 'app.log.2.gz']
# analysis = analyze_compressed_logs(log_files)
# print(f"Analyzed {analysis['total_lines']} lines")
# print("Top errors:", analysis['error_patterns'])
python code snippet end
Web Data Processing
HTTP Response Compression
python code snippet start
import gzip
import json
from io import BytesIO
def compress_json_response(data: dict) -> bytes:
"""Compress JSON data for HTTP response"""
json_str = json.dumps(data, separators=(',', ':')) # Compact JSON
json_bytes = json_str.encode('utf-8')
# Compress with high compression for network transfer
compressed = gzip.compress(json_bytes, compresslevel=9)
print(f"JSON size: {len(json_bytes)} bytes")
print(f"Compressed: {len(compressed)} bytes")
print(f"Savings: {(1 - len(compressed)/len(json_bytes)):.1%}")
return compressed
def decompress_json_response(compressed_data: bytes) -> dict:
"""Decompress and parse JSON response"""
json_bytes = gzip.decompress(compressed_data)
json_str = json_bytes.decode('utf-8')
return json.loads(json_str)
# Example usage
large_data = {
'users': [{'id': i, 'name': f'User {i}', 'email': f'user{i}@example.com'}
for i in range(1000)],
'metadata': {'timestamp': '2024-01-01T00:00:00Z', 'version': '1.0'}
}
compressed = compress_json_response(large_data)
decompressed = decompress_json_response(compressed)
print(f"Data integrity: {large_data == decompressed}")
python code snippet end
File Backup and Archiving
Backup with Compression
python code snippet start
import gzip
import os
import shutil
from pathlib import Path
class BackupManager:
def __init__(self, backup_dir: str):
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(exist_ok=True)
def backup_file(self, source_path: str, compress: bool = True) -> str:
"""Create compressed backup of a file"""
source = Path(source_path)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if compress:
backup_name = f"{source.stem}_{timestamp}.gz"
backup_path = self.backup_dir / backup_name
with open(source, 'rb') as f_in:
with gzip.open(backup_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
else:
backup_name = f"{source.stem}_{timestamp}{source.suffix}"
backup_path = self.backup_dir / backup_name
shutil.copy2(source, backup_path)
return str(backup_path)
def restore_backup(self, backup_path: str, target_path: str):
"""Restore file from compressed backup"""
backup = Path(backup_path)
if backup.suffix == '.gz':
with gzip.open(backup, 'rb') as f_in:
with open(target_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
else:
shutil.copy2(backup, target_path)
# Usage
# backup_mgr = BackupManager('/backups')
# backup_path = backup_mgr.backup_file('/important/data.txt')
# backup_mgr.restore_backup(backup_path, '/restored/data.txt')
python code snippet end
Performance Considerations
Streaming Large Files
python code snippet start
def process_large_compressed_file(filename: str, chunk_size: int = 8192):
"""Process large compressed file without loading entirely into memory"""
line_count = 0
word_count = 0
with gzip.open(filename, 'rt') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
line_count += chunk.count('\n')
word_count += len(chunk.split())
return line_count, word_count
# Process multi-GB compressed files efficiently
# lines, words = process_large_compressed_file('huge_dataset.txt.gz')
python code snippet end
Gzip is essential for efficient data storage and transfer - use it whenever you need to save space or reduce bandwidth usage! Use gzip with JSON data compression and CSV file processing . Perfect companion to log file management and works seamlessly with file path operations .
Reference: Python Gzip Documentation