Build a Blog API with Authentication • Lesson 5

File Upload Magic

Handle file uploads for images and documents with validation, storage, and security. Learn to process multiple files and handle different formats.

šŸŽÆ What You'll Learn

  • •Implement secure file upload functionality
  • •Validate file types, sizes, and content
  • •Store files with proper naming and organization
  • •Handle image processing and thumbnails

File Upload Magic

Welcome to secure file handling! In this lesson, you'll implement robust file upload functionality with validation, storage, and security measures to protect your blog API.

šŸŽÆ What We're Building

Your file upload system will provide:

  • šŸ“¤ Secure file uploads with validation
  • šŸ–¼ļø Image processing and thumbnail generation
  • šŸ“ Organized file storage with proper naming
  • šŸ”’ Security measures against malicious files
  • šŸ“Š File management with metadata tracking

šŸ” Security First

File Upload Vulnerabilities

File uploads are a common attack vector:

  • Malicious executables disguised as images
  • Script injections in uploaded files
  • Path traversal attacks (../../etc/passwd)
  • Resource exhaustion with huge files
  • MIME type spoofing to bypass filters

Defense Strategies

# āœ… Multi-layer validation
def validate_file(file_content, filename, mime_type):
    # 1. Extension whitelist
    allowed_extensions = {'.jpg', '.png', '.pdf'}
    if not get_extension(filename) in allowed_extensions:
        return False
    
    # 2. MIME type validation
    allowed_mimes = {'image/jpeg', 'image/png', 'application/pdf'}
    if mime_type not in allowed_mimes:
        return False
    
    # 3. File signature validation
    if not validate_file_signature(file_content):
        return False
    
    # 4. Size limits
    if len(file_content) > MAX_FILE_SIZE:
        return False
    
    return True

šŸ“ File Storage Architecture

1. Secure Filename Generation

def generate_secure_filename(original_name, user_id):
    # Never trust user-provided filenames
    name, ext = os.path.splitext(original_name)
    
    # Generate unique identifier
    unique_id = str(uuid.uuid4())
    
    # Create secure filename
    secure_name = f"{user_id}_{unique_id}_{safe_name(name)}{ext.lower()}"
    
    # Remove dangerous characters
    return re.sub(r'[^a-zA-Z0-9._-]', '', secure_name)

def safe_name(name):
    # Take only alphanumeric characters from original name
    safe = re.sub(r'[^a-zA-Z0-9]', '_', name)
    return safe[:20]  # Limit length

2. Organized Storage Structure

uploads/
ā”œā”€ā”€ 2024/
│   ā”œā”€ā”€ 01/  # Month
│   │   ā”œā”€ā”€ images/
│   │   │   ā”œā”€ā”€ user_123_uuid_photo.jpg
│   │   │   └── user_456_uuid_document.pdf
│   │   └── thumbnails/
│   │       └── user_123_uuid_photo_thumb.jpg
│   └── 02/
└── temp/  # For processing

3. Database Schema

CREATE TABLE files (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    user_id INTEGER NOT NULL,
    filename TEXT NOT NULL,           -- Secure filename
    original_name TEXT,               -- User's original filename  
    file_path TEXT NOT NULL,          -- Full path to file
    file_size INTEGER,                -- Size in bytes
    content_type TEXT,                -- MIME type
    file_hash TEXT,                   -- SHA-256 hash
    metadata TEXT,                    -- JSON metadata
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (user_id) REFERENCES users (id)
);

šŸ›”ļø File Validation Techniques

1. File Signature Validation

def validate_file_signature(file_content):
    """Check file headers/magic bytes"""
    signatures = {
        b'\xff\xd8\xff': 'jpeg',
        b'\x89PNG\r\n\x1a\n': 'png', 
        b'%PDF': 'pdf',
        b'GIF87a': 'gif',
        b'GIF89a': 'gif'
    }
    
    for signature, file_type in signatures.items():
        if file_content.startswith(signature):
            return file_type
    
    return None

2. Content Scanning

def scan_file_content(file_content):
    """Scan for suspicious content"""
    suspicious_patterns = [
        b'<script',           # JavaScript
        b'<?php',             # PHP code
        b'exec(',             # System commands
        b'eval(',             # Code evaluation
        b'<iframe',           # Embedded frames
    ]
    
    content_lower = file_content.lower()
    for pattern in suspicious_patterns:
        if pattern in content_lower:
            return False, f"Suspicious content detected: {pattern.decode()}"
    
    return True, "Clean"

3. File Size and Resource Limits

class FileUploadConfig:
    def __init__(self):
        self.max_file_size = 10 * 1024 * 1024      # 10MB
        self.max_files_per_user = 100               # Per user limit
        self.max_total_size_per_user = 100 * 1024 * 1024  # 100MB total
        self.allowed_extensions = {'.jpg', '.png', '.pdf'}
        
        # Rate limiting
        self.max_uploads_per_hour = 20
        self.max_uploads_per_day = 100

šŸ–¼ļø Image Processing

1. Thumbnail Generation

from PIL import Image
import io

def create_thumbnail(image_content, max_size=(150, 150)):
    """Create thumbnail while preserving aspect ratio"""
    try:
        # Open image
        image = Image.open(io.BytesIO(image_content))
        
        # Convert to RGB if necessary
        if image.mode in ('RGBA', 'LA', 'P'):
            image = image.convert('RGB')
        
        # Create thumbnail
        image.thumbnail(max_size, Image.Resampling.LANCZOS)
        
        # Save to bytes
        output = io.BytesIO()
        image.save(output, format='JPEG', quality=85, optimize=True)
        return output.getvalue()
        
    except Exception as e:
        print(f"Thumbnail creation failed: {e}")
        return None

2. Image Optimization

def optimize_image(image_content, quality=85):
    """Optimize image for web"""
    try:
        image = Image.open(io.BytesIO(image_content))
        
        # Resize if too large
        max_dimensions = (1920, 1920)
        if image.size[0] > max_dimensions[0] or image.size[1] > max_dimensions[1]:
            image.thumbnail(max_dimensions, Image.Resampling.LANCZOS)
        
        # Save optimized
        output = io.BytesIO()
        image.save(output, format='JPEG', quality=quality, optimize=True)
        return output.getvalue()
        
    except Exception as e:
        print(f"Image optimization failed: {e}")
        return image_content  # Return original if optimization fails

šŸ“Š File Management

1. Duplicate Detection

import hashlib

def detect_duplicate(file_content, user_id):
    """Check if user already uploaded this file"""
    file_hash = hashlib.sha256(file_content).hexdigest()
    
    # Check database for existing file with same hash and user
    existing_file = db.query(
        "SELECT id, filename FROM files WHERE file_hash = ? AND user_id = ?",
        (file_hash, user_id)
    ).fetchone()
    
    return existing_file

2. Storage Cleanup

async def cleanup_orphaned_files():
    """Remove files not referenced in database"""
    # Get all files in storage
    storage_files = set()
    for root, dirs, files in os.walk(UPLOAD_FOLDER):
        for file in files:
            storage_files.add(os.path.join(root, file))
    
    # Get all files in database
    db_files = {row['file_path'] for row in db.query("SELECT file_path FROM files")}
    
    # Remove orphaned files
    orphaned = storage_files - db_files
    for file_path in orphaned:
        try:
            os.remove(file_path)
            print(f"Removed orphaned file: {file_path}")
        except Exception as e:
            print(f"Failed to remove {file_path}: {e}")

3. User Quota Management

async def check_user_quota(user_id, new_file_size):
    """Check if user can upload more files"""
    # Get user's current usage
    user_stats = db.query(
        "SELECT COUNT(*) as file_count, SUM(file_size) as total_size FROM files WHERE user_id = ?",
        (user_id,)
    ).fetchone()
    
    file_count = user_stats['file_count']
    total_size = user_stats['total_size'] or 0
    
    # Check limits
    if file_count >= MAX_FILES_PER_USER:
        return False, "File count limit exceeded"
    
    if total_size + new_file_size > MAX_TOTAL_SIZE_PER_USER:
        return False, "Storage quota exceeded"
    
    return True, "OK"

šŸ”„ Upload Flow

1. Complete Upload Process

async def upload_file(file_content, filename, content_type, user_id):
    """Complete file upload flow"""
    
    # Step 1: Initial validation
    if not validate_file_type(filename, content_type):
        return error("Invalid file type")
    
    if not validate_file_size(len(file_content)):
        return error("File too large")
    
    # Step 2: Security checks
    if not validate_file_signature(file_content):
        return error("Invalid file format")
    
    is_clean, message = scan_file_content(file_content)
    if not is_clean:
        return error(f"Security scan failed: {message}")
    
    # Step 3: Check user limits
    can_upload, quota_message = await check_user_quota(user_id, len(file_content))
    if not can_upload:
        return error(quota_message)
    
    # Step 4: Check for duplicates
    duplicate = detect_duplicate(file_content, user_id)
    if duplicate:
        return success({"file": duplicate, "message": "File already exists"})
    
    # Step 5: Process file
    if content_type.startswith('image/'):
        file_content = optimize_image(file_content)
        thumbnail = create_thumbnail(file_content)
    
    # Step 6: Generate secure filename
    secure_filename = generate_secure_filename(filename, user_id)
    
    # Step 7: Save to storage
    file_path = save_to_storage(file_content, secure_filename)
    
    # Step 8: Save metadata to database
    file_record = {
        'user_id': user_id,
        'filename': secure_filename,
        'original_name': filename,
        'file_path': file_path,
        'file_size': len(file_content),
        'content_type': content_type,
        'file_hash': hashlib.sha256(file_content).hexdigest()
    }
    
    file_id = await save_file_metadata(file_record)
    
    return success({
        "file": {
            "id": file_id,
            "filename": secure_filename,
            "url": f"/files/{file_id}",
            "size": len(file_content)
        }
    })

2. FastAPI Integration

from fastapi import FastAPI, UploadFile, File, Depends, HTTPException
from fastapi.responses import FileResponse

@app.post("/upload")
async def upload_file(
    file: UploadFile = File(...),
    current_user: dict = Depends(get_current_user)
):
    # Read file content
    file_content = await file.read()
    
    # Upload with validation
    result = await file_upload_service.upload_file(
        file_content, 
        file.filename, 
        file.content_type,
        current_user['id']
    )
    
    if not result['success']:
        raise HTTPException(400, result['error'])
    
    return result['file']

@app.get("/files/{file_id}")
async def get_file(
    file_id: int,
    current_user: dict = Depends(get_current_user)
):
    # Get file info
    file_info = await storage_manager.get_file_info(file_id)
    
    if not file_info:
        raise HTTPException(404, "File not found")
    
    # Check permissions
    if file_info['user_id'] != current_user['id'] and current_user['role'] != 'admin':
        raise HTTPException(403, "Access denied")
    
    # Serve file
    return FileResponse(
        file_info['file_path'],
        filename=file_info['original_name']
    )

šŸ’” Best Practices

1. Virus Scanning

# Integrate with antivirus
import subprocess

def scan_with_clamav(file_path):
    try:
        result = subprocess.run(
            ['clamscan', '--no-summary', file_path],
            capture_output=True, text=True
        )
        return result.returncode == 0  # 0 = clean
    except Exception:
        return False  # Assume infected if scan fails

2. CDN Integration

# Upload to cloud storage
import boto3

def upload_to_s3(file_content, filename):
    s3 = boto3.client('s3')
    s3.put_object(
        Bucket='my-blog-uploads',
        Key=filename,
        Body=file_content,
        ContentType=content_type
    )
    return f"https://cdn.myblog.com/{filename}"

3. Progressive Upload

// Client-side chunked upload
function uploadLargeFile(file) {
    const chunkSize = 1024 * 1024; // 1MB chunks
    const chunks = Math.ceil(file.size / chunkSize);
    
    for (let i = 0; i < chunks; i++) {
        const start = i * chunkSize;
        const end = Math.min(start + chunkSize, file.size);
        const chunk = file.slice(start, end);
        
        await uploadChunk(chunk, i, chunks);
    }
}

šŸŽÆ Your Implementation Tasks

  1. Create FileUploadConfig with security settings
  2. Build FileProcessor with validation and naming
  3. Implement FileStorageManager with database integration
  4. Create FileUploadService with complete upload flow
  5. Test thoroughly with various file types and edge cases

šŸš€ Advanced Features

After mastering basic uploads, consider:

  • Image resizing and format conversion
  • Video thumbnail generation
  • Document preview generation
  • Bulk operations and zip handling
  • Upload progress tracking
  • File versioning and history

Ready to handle files like a pro? Let's build secure upload magic! šŸ“

šŸ’” Hint

Start with FileUploadConfig to define allowed file types and size limits. Use UUID for secure filename generation and validate both file extensions and MIME types for security.

Ready to Practice?

Now that you understand the theory, let's put it into practice with hands-on coding!

Start Interactive Lesson