Skip to main content Brad's PyNotes

Gzip Module: Efficient File Compression and Decompression

TL;DR

The gzip module provides seamless compression and decompression of files using the gzip format, reducing file sizes by 60-90% while maintaining a simple interface compatible with standard file operations.

Interesting!

Gzip compression can reduce text files by up to 90% and typically achieves 60-80% compression on most data types - a 100MB log file can become just 10-20MB, dramatically saving storage space and transfer time.

Basic File Operations

Reading Compressed Files

python code snippet start

import gzip

# Read compressed text file
with gzip.open('data.txt.gz', 'rt', encoding='utf-8') as f:
    content = f.read()
    print(content)

# Read compressed binary file
with gzip.open('data.bin.gz', 'rb') as f:
    binary_data = f.read()
    print(f"Read {len(binary_data)} bytes")

# Read line by line (memory efficient)
with gzip.open('large_file.txt.gz', 'rt') as f:
    for line_num, line in enumerate(f, 1):
        print(f"Line {line_num}: {line.strip()}")
        if line_num >= 5:  # Just show first 5 lines
            break

python code snippet end

Writing Compressed Files

python code snippet start

# Write text to compressed file
data = "This is some text data that will be compressed.\n" * 1000

with gzip.open('output.txt.gz', 'wt', encoding='utf-8') as f:
    f.write(data)

print(f"Original size: {len(data.encode())} bytes")
print(f"Compressed size: {os.path.getsize('output.txt.gz')} bytes")

# Write binary data
binary_data = b"Binary data to compress" * 100

with gzip.open('binary.gz', 'wb') as f:
    f.write(binary_data)

python code snippet end

In-Memory Compression

compress() and decompress()

python code snippet start

import gzip

# Compress data in memory
original_data = b"This text will be compressed in memory without creating files"
compressed = gzip.compress(original_data)

print(f"Original: {len(original_data)} bytes")
print(f"Compressed: {len(compressed)} bytes")
print(f"Compression ratio: {len(compressed)/len(original_data):.2%}")

# Decompress data
decompressed = gzip.decompress(compressed)
print(f"Decompressed: {decompressed.decode()}")
print(f"Data matches: {original_data == decompressed}")

python code snippet end

Compression Levels

python code snippet start

# Test different compression levels (0-9)
test_data = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit. " * 100

for level in range(0, 10):
    compressed = gzip.compress(test_data, compresslevel=level)
    ratio = len(compressed) / len(test_data)
    print(f"Level {level}: {len(compressed)} bytes ({ratio:.2%})")

# Level 0: No compression (fastest)
# Level 1: Fastest compression
# Level 6: Default balance
# Level 9: Maximum compression (slowest)

python code snippet end

Log File Processing

Rotating Log Compression

python code snippet start

import gzip
import os
from datetime import datetime

class LogCompressor:
    def __init__(self, log_dir: str):
        self.log_dir = log_dir
    
    def compress_old_logs(self, days_old: int = 7):
        """Compress log files older than specified days"""
        import time
        
        cutoff_time = time.time() - (days_old * 24 * 60 * 60)
        
        for filename in os.listdir(self.log_dir):
            if filename.endswith('.log') and not filename.endswith('.gz'):
                filepath = os.path.join(self.log_dir, filename)
                
                # Check if file is old enough
                if os.path.getmtime(filepath) < cutoff_time:
                    self.compress_file(filepath)
    
    def compress_file(self, filepath: str):
        """Compress a single log file"""
        compressed_path = filepath + '.gz'
        
        with open(filepath, 'rb') as f_in:
            with gzip.open(compressed_path, 'wb') as f_out:
                f_out.writelines(f_in)
        
        # Remove original after successful compression
        os.remove(filepath)
        print(f"Compressed: {filepath} -> {compressed_path}")

# Usage
# compressor = LogCompressor('/var/log/myapp')
# compressor.compress_old_logs(days_old=7)

python code snippet end

Log Analysis from Compressed Files

python code snippet start

import gzip
import re
from collections import Counter

def analyze_compressed_logs(log_files: list[str]) -> dict:
    """Analyze log patterns from compressed files"""
    error_patterns = Counter()
    total_lines = 0
    
    error_regex = re.compile(r'ERROR.*?:(.*?)(?:\n|$)')
    
    for log_file in log_files:
        with gzip.open(log_file, 'rt', encoding='utf-8', errors='ignore') as f:
            for line in f:
                total_lines += 1
                
                # Extract error messages
                match = error_regex.search(line)
                if match:
                    error_type = match.group(1).strip()
                    error_patterns[error_type] += 1
    
    return {
        'total_lines': total_lines,
        'error_patterns': dict(error_patterns.most_common(10))
    }

# Usage
# log_files = ['app.log.gz', 'app.log.1.gz', 'app.log.2.gz']
# analysis = analyze_compressed_logs(log_files)
# print(f"Analyzed {analysis['total_lines']} lines")
# print("Top errors:", analysis['error_patterns'])

python code snippet end

Web Data Processing

HTTP Response Compression

python code snippet start

import gzip
import json
from io import BytesIO

def compress_json_response(data: dict) -> bytes:
    """Compress JSON data for HTTP response"""
    json_str = json.dumps(data, separators=(',', ':'))  # Compact JSON
    json_bytes = json_str.encode('utf-8')
    
    # Compress with high compression for network transfer
    compressed = gzip.compress(json_bytes, compresslevel=9)
    
    print(f"JSON size: {len(json_bytes)} bytes")
    print(f"Compressed: {len(compressed)} bytes")
    print(f"Savings: {(1 - len(compressed)/len(json_bytes)):.1%}")
    
    return compressed

def decompress_json_response(compressed_data: bytes) -> dict:
    """Decompress and parse JSON response"""
    json_bytes = gzip.decompress(compressed_data)
    json_str = json_bytes.decode('utf-8')
    return json.loads(json_str)

# Example usage
large_data = {
    'users': [{'id': i, 'name': f'User {i}', 'email': f'user{i}@example.com'} 
              for i in range(1000)],
    'metadata': {'timestamp': '2024-01-01T00:00:00Z', 'version': '1.0'}
}

compressed = compress_json_response(large_data)
decompressed = decompress_json_response(compressed)
print(f"Data integrity: {large_data == decompressed}")

python code snippet end

File Backup and Archiving

Backup with Compression

python code snippet start

import gzip
import os
import shutil
from pathlib import Path

class BackupManager:
    def __init__(self, backup_dir: str):
        self.backup_dir = Path(backup_dir)
        self.backup_dir.mkdir(exist_ok=True)
    
    def backup_file(self, source_path: str, compress: bool = True) -> str:
        """Create compressed backup of a file"""
        source = Path(source_path)
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        if compress:
            backup_name = f"{source.stem}_{timestamp}.gz"
            backup_path = self.backup_dir / backup_name
            
            with open(source, 'rb') as f_in:
                with gzip.open(backup_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
        else:
            backup_name = f"{source.stem}_{timestamp}{source.suffix}"
            backup_path = self.backup_dir / backup_name
            shutil.copy2(source, backup_path)
        
        return str(backup_path)
    
    def restore_backup(self, backup_path: str, target_path: str):
        """Restore file from compressed backup"""
        backup = Path(backup_path)
        
        if backup.suffix == '.gz':
            with gzip.open(backup, 'rb') as f_in:
                with open(target_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
        else:
            shutil.copy2(backup, target_path)

# Usage
# backup_mgr = BackupManager('/backups')
# backup_path = backup_mgr.backup_file('/important/data.txt')
# backup_mgr.restore_backup(backup_path, '/restored/data.txt')

python code snippet end

Performance Considerations

Streaming Large Files

python code snippet start

def process_large_compressed_file(filename: str, chunk_size: int = 8192):
    """Process large compressed file without loading entirely into memory"""
    line_count = 0
    word_count = 0
    
    with gzip.open(filename, 'rt') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
                
            line_count += chunk.count('\n')
            word_count += len(chunk.split())
    
    return line_count, word_count

# Process multi-GB compressed files efficiently
# lines, words = process_large_compressed_file('huge_dataset.txt.gz')

python code snippet end

Gzip is essential for efficient data storage and transfer - use it whenever you need to save space or reduce bandwidth usage! Use gzip with JSON data compression and CSV file processing . Perfect companion to log file management and works seamlessly with file path operations .

Reference: Python Gzip Documentation