Database Connection
Set up SQLite database integration with async operations. Create database models, establish connections, and implement CRUD operations for your blog API.
🎯 What You'll Learn
- •Set up SQLite database with SQLAlchemy
- •Create database models with relationships
- •Implement async database operations
- •Build CRUD functions for users and posts
Database Connection
Welcome to database integration! In this lesson, you'll connect your Blog API to a SQLite database and implement robust data persistence with async operations.
🎯 What We're Building
Your database layer will provide:
- 🗃️ SQLite integration for lightweight, serverless storage
- 👥 User management with secure data handling
- 📝 Blog post storage with rich metadata
- 🔗 Relational data with foreign key constraints
- ⚡ Async operations for better performance
📊 Database Design
Users Table
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
hashed_password TEXT NOT NULL,
full_name TEXT,
role TEXT DEFAULT 'reader',
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Posts Table
CREATE TABLE posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
author_id INTEGER NOT NULL,
tags TEXT, -- JSON array as string
published BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (author_id) REFERENCES users (id)
);
🏗️ SQLite Fundamentals
Why SQLite?
- Zero configuration - no server setup required
- File-based - entire database in a single file
- ACID compliant - reliable transactions
- Lightweight - perfect for development and small apps
- Python built-in - no external dependencies
Basic Connection Pattern
import sqlite3
# Connect to database
connection = sqlite3.connect('database.db')
connection.row_factory = sqlite3.Row # Enable dict-like access
# Execute queries
cursor = connection.cursor()
cursor.execute('SELECT * FROM users')
rows = cursor.fetchall()
# Always close connections
connection.close()
🔄 CRUD Operations
Create (INSERT)
async def create_user(self, user_data: dict) -> Optional[int]:
try:
cursor = self.connection.cursor()
cursor.execute('''
INSERT INTO users (username, email, hashed_password)
VALUES (?, ?, ?)
''', (user_data['username'], user_data['email'], user_data['password']))
self.connection.commit()
return cursor.lastrowid # Returns the new record's ID
except sqlite3.IntegrityError:
return None # Handle unique constraint violations
Read (SELECT)
async def get_user_by_username(self, username: str) -> Optional[dict]:
cursor = self.connection.cursor()
cursor.execute('SELECT * FROM users WHERE username = ?', (username,))
row = cursor.fetchone()
return dict(row) if row else None
Update (UPDATE)
async def update_user(self, user_id: int, updates: dict) -> bool:
cursor = self.connection.cursor()
cursor.execute('''
UPDATE users
SET full_name = ?, email = ?
WHERE id = ?
''', (updates['full_name'], updates['email'], user_id))
self.connection.commit()
return cursor.rowcount > 0 # True if any rows were updated
Delete (DELETE)
async def delete_user(self, user_id: int) -> bool:
cursor = self.connection.cursor()
cursor.execute('DELETE FROM users WHERE id = ?', (user_id,))
self.connection.commit()
return cursor.rowcount > 0
🔗 Handling Relationships
JOIN Queries for Related Data
async def get_posts_with_authors(self) -> List[dict]:
cursor = self.connection.cursor()
cursor.execute('''
SELECT
p.id, p.title, p.content, p.published,
u.username, u.email, u.full_name
FROM posts p
JOIN users u ON p.author_id = u.id
ORDER BY p.created_at DESC
''')
posts = []
for row in cursor.fetchall():
row_dict = dict(row)
# Restructure data to include nested author
row_dict['author'] = {
'username': row_dict.pop('username'),
'email': row_dict.pop('email'),
'full_name': row_dict.pop('full_name')
}
posts.append(row_dict)
return posts
🛡️ Error Handling
Common Database Errors
try:
cursor.execute('INSERT INTO users ...')
self.connection.commit()
except sqlite3.IntegrityError as e:
# Handle unique constraint violations
print(f"Integrity error: {e}")
return None
except sqlite3.OperationalError as e:
# Handle SQL syntax errors, missing tables
print(f"Operational error: {e}")
raise
except Exception as e:
# Handle unexpected errors
print(f"Unexpected error: {e}")
raise
Transaction Safety
def safe_transaction(self, operations):
try:
cursor = self.connection.cursor()
for operation in operations:
cursor.execute(operation['sql'], operation['params'])
self.connection.commit()
return True
except Exception as e:
self.connection.rollback() # Undo all changes
print(f"Transaction failed: {e}")
return False
🚀 Async Considerations
Why Async Database Operations?
- Non-blocking - don't freeze your API while waiting for database
- Scalability - handle more concurrent requests
- Better UX - responsive applications
Simple Async Wrapper
import asyncio
class AsyncDatabaseManager:
async def connect(self):
# In production, use aiosqlite for true async
await asyncio.sleep(0) # Yield control
self.connection = sqlite3.connect(self.db_file)
async def execute_query(self, sql: str, params: tuple = ()):
await asyncio.sleep(0) # Yield control
cursor = self.connection.cursor()
cursor.execute(sql, params)
return cursor.fetchall()
💾 JSON Data Storage
Storing Arrays as JSON
import json
# Store tags as JSON string
tags_list = ['python', 'fastapi', 'tutorial']
tags_json = json.dumps(tags_list)
cursor.execute('INSERT INTO posts (title, tags) VALUES (?, ?)',
(title, tags_json))
# Retrieve and parse JSON
cursor.execute('SELECT tags FROM posts WHERE id = ?', (post_id,))
row = cursor.fetchone()
tags_list = json.loads(row['tags']) if row['tags'] else []
🎯 Your Implementation Tasks
1. Database Manager Class
- Connection management with proper error handling
- Table creation with appropriate schemas
- Async method signatures for future scalability
2. User Operations
- Create users with unique constraint handling
- Retrieve users by username and ID
- Password security (store hashed passwords only)
3. Post Operations
- Create posts with author relationships
- Retrieve posts with JOIN queries for author data
- JSON tag storage for flexible metadata
4. Sample Data
- Admin user for system management
- Author user for content creation
- Sample posts with various states (published/draft)
💡 Pro Tips
Use Parameter Binding
# ✅ Safe - prevents SQL injection
cursor.execute('SELECT * FROM users WHERE username = ?', (username,))
# ❌ Dangerous - vulnerable to SQL injection
cursor.execute(f'SELECT * FROM users WHERE username = "{username}"')
Handle Connection Lifecycle
class DatabaseManager:
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
# Usage
async with DatabaseManager() as db:
user = await db.get_user_by_username('admin')
Optimize Queries
# ✅ Efficient - single query with JOIN
SELECT p.*, u.username FROM posts p JOIN users u ON p.author_id = u.id
# ❌ Inefficient - N+1 query problem
for post in posts:
author = get_user_by_id(post.author_id) # Separate query for each post
🔍 Testing Your Database
Your implementation should:
- ✅ Connect successfully to SQLite
- ✅ Create tables without errors
- ✅ Handle unique constraints gracefully
- ✅ Store and retrieve users and posts
- ✅ Include author data in post queries
- ✅ Parse JSON tags correctly
🚀 Next Steps
After completing this database layer, you'll:
- 🔐 Add authentication with JWT tokens
- 🔒 Implement authorization with role-based access
- 📁 Handle file uploads with validation
- ❌ Create custom exceptions for better error handling
Ready to build your data persistence layer? Let's code! 📊
💡 Hint
Start by implementing the connect() method to establish a SQLite connection. Use cursor.execute() for SQL commands and self.connection.commit() to save changes. Remember to handle exceptions for database constraints like unique usernames.
Ready to Practice?
Now that you understand the theory, let's put it into practice with hands-on coding!
Start Interactive Lesson