A real chat application needs to store messages to provide chat history. This chapter will guide you through setting up a SQLite database and integrating it into our FastAPI application using SQLAlchemy, a powerful SQL toolkit and Object-Relational Mapper (ORM).

Purpose of this Chapter

By the end of this chapter, you will:

  • Understand the basics of ORM and why we use SQLAlchemy.
  • Set up a SQLite database connection.
  • Define database models for users and chat messages.
  • Implement methods to store new messages and retrieve chat history.
  • Update the WebSocket endpoint to save messages.

Concepts Explained: SQLAlchemy and ORM

Object-Relational Mapping (ORM) is a technique that lets you query and manipulate data from a database using an object-oriented paradigm. Instead of writing raw SQL, you interact with database tables as Python classes and objects.

SQLAlchemy is Python’s most popular SQL toolkit and ORM. It provides:

  • Declarative mapping: Allows defining database tables and models using Python classes.
  • Session management: Handles interactions with the database (add, commit, rollback).
  • Abstraction: Works with various database backends (SQLite, PostgreSQL, MySQL, etc.), allowing us to easily switch later if needed.

SQLite is a C library that provides a lightweight, embedded relational database management system. It’s file-based, meaning the entire database is stored in a single file, making it excellent for development and simple deployments.

Step-by-Step Tasks

1. Install SQLAlchemy and Async Drivers

Stop your Uvicorn server (Ctrl+C). We need to install SQLAlchemy for ORM and aiosqlite for asynchronous SQLite database access.

pipenv install sqlalchemy aiosqlite

2. Create app/database.py for Database Setup

Create a new file app/database.py to handle our database engine, session, and base for declarative models.

# app/database.py

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base

# Define the database URL
# For SQLite, this points to a file. For production, you'd use PostgreSQL, etc.
DATABASE_URL = "sqlite:///./chat.db"

# Create the SQLAlchemy engine
# connect_args are specific to SQLite and ensure only one thread can communicate
# if using synchronous operations (though we'll use async)
engine = create_engine(
    DATABASE_URL, connect_args={"check_same_thread": False}, echo=False # Set echo=True for SQL logs
)

# Create a SessionLocal class
# This will be used to create session instances for database interactions
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Base class for our ORM models
Base = declarative_base()

# Dependency to get a database session
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Code Explanation (app/database.py):

  • DATABASE_URL: Specifies the connection string for our SQLite database file named chat.db.
  • create_engine: Creates the SQLAlchemy engine. echo=True is useful during development to see the SQL queries being executed.
  • SessionLocal: A factory for session objects. autocommit=False means we manually commit() changes, autoflush=False prevents flushing before commits.
  • Base = declarative_base(): All our ORM models will inherit from this Base class.
  • get_db(): A FastAPI dependency function that provides a database session, ensuring it’s properly closed after use.

3. Create app/models.py for Database Models

Create a new file app/models.py to define our User and Message models.

# app/models.py

from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime

from .database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    hashed_password = Column(String)

    messages = relationship("Message", back_populates="owner")

class Message(Base):
    __tablename__ = "messages"

    id = Column(Integer, primary_key=True, index=True)
    content = Column(String, index=True)
    timestamp = Column(DateTime, default=datetime.utcnow)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="messages")

Code Explanation (app/models.py):

  • User Model:
    • __tablename__: Specifies the database table name.
    • id, username, hashed_password: Define the columns with their types and constraints (e.g., primary_key=True, unique=True, index=True).
    • messages = relationship(...): Defines a relationship with the Message model, indicating a user can have multiple messages. back_populates links it to the owner relationship in Message.
  • Message Model:
    • id, content, timestamp: Columns for message ID, content, and creation timestamp.
    • owner_id = Column(Integer, ForeignKey("users.id")): Creates a foreign key linking messages to users.
    • owner = relationship(...): Defines the reverse relationship, linking a message back to its owner.

4. Create Database Tables

Before running the application, we need to create the database tables. This is typically done once.

Add a small script to app/main.py (temporarily) or a dedicated create_db.py file to do this. For simplicity, let’s add it to app/main.py and then remove it after execution.

# app/main.py (TEMPORARY ADDITION for database creation)
# Add this BEFORE app = FastAPI() line

from .database import Base, engine # New import

# This will create the database tables defined in models.py
Base.metadata.create_all(bind=engine)

# (rest of your app/main.py content)

Now, run app/main.py once to create the chat.db file and tables.

pipenv shell
python -c "from app.database import Base, engine; from app import models; Base.metadata.create_all(bind=engine)"

After this command, you should see a chat.db file created in your realtime-chat-app directory. You can now remove or comment out the Base.metadata.create_all(bind=engine) line from app/main.py if you added it there.

5. Update Authentication to Use Database Users

Now, let’s modify app/main.py to use our User model for user authentication instead of the fake_users_db. We also need Pydantic models for request/response.

Create a new file app/schemas.py:

# app/schemas.py

from pydantic import BaseModel
from datetime import datetime

# Pydantic models for data validation and serialization

class UserCreate(BaseModel):
    username: str
    password: str

class UserResponse(BaseModel):
    id: int
    username: str

    class Config:
        from_attributes = True # Or orm_mode = True for Pydantic v1.x

class MessageCreate(BaseModel):
    content: str

class MessageResponse(BaseModel):
    id: int
    content: str
    timestamp: datetime
    owner_id: int
    owner: UserResponse # Include owner details

    class Config:
        from_attributes = True # Or orm_mode = True for Pydantic v1.x

Update app/main.py to use these models and interact with the database.

# app/main.py (further updated)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Dict, List
from sqlalchemy.orm import Session # New import

from .auth import Hasher, create_access_token, get_current_user, ACCESS_TOKEN_EXPIRE_MINUTES, Token
from .connections import ConnectionManager
from .database import get_db # New import
from . import models, schemas # New import
from datetime import timedelta

app = FastAPI()

manager = ConnectionManager()

# --- User Registration Endpoint ---
@app.post("/register", response_model=schemas.UserResponse, status_code=status.HTTP_201_CREATED)
async def register_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = db.query(models.User).filter(models.User.username == user.username).first()
    if db_user:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered")

    hashed_password = Hasher.get_password_hash(user.password)
    db_user = models.User(username=user.username, hashed_password=hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    user = db.query(models.User).filter(models.User.username == form_data.username).first()
    if not user or not Hasher.verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

# Update get_current_user to return the actual User model, not just username
from .auth import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, TokenData
async def get_current_user_db(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = db.query(models.User).filter(models.User.username == token_data.username).first()
    if user is None:
        raise credentials_exception
    return user

@app.get("/users/me", response_model=schemas.UserResponse)
async def read_users_me(current_user: models.User = Depends(get_current_user_db)):
    return current_user

# --- Chat History Endpoint ---
@app.get("/messages", response_model=List[schemas.MessageResponse])
async def get_chat_history(db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_db), skip: int = 0, limit: int = 100):
    messages = db.query(models.Message).order_by(models.Message.timestamp.desc()).offset(skip).limit(limit).all()
    # Eagerly load owner to avoid N+1 queries if owner details are needed
    return messages

# Updated WebSocket endpoint to save messages
@app.websocket("/ws/{client_id}") # Renamed for clarity, original was fine too
async def websocket_chat(websocket: WebSocket, client_id: str, db: Session = Depends(get_db)): # Add db dependency
    # NOTE: For production, authenticate WebSocket connection with JWT too.
    # This example skips that for simplicity in message persistence, but it's crucial.
    # You would parse a token from a query parameter or header and validate it here.

    await manager.connect(websocket)
    welcome_message = f"Client #{client_id} joined the chat."
    await manager.broadcast(welcome_message)

    try:
        while True:
            data = await websocket.receive_text()
            full_message = f"Client #{client_id} says: {data}"

            # Save message to database
            new_message = models.Message(content=data, owner_id=1) # For now, hardcode owner_id=1 (e.g., testuser)
                                                                 # In next chapter, we'll use actual authenticated user
            db.add(new_message)
            db.commit()
            db.refresh(new_message)

            await manager.broadcast(full_message)
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client #{client_id} left the chat.")

Note on get_current_user_db and WebSocket Authentication: The current websocket_chat endpoint does not actually authenticate the WebSocket connection using JWT. It takes a client_id directly. For a secure application, you would need to modify the WebSocket endpoint to:

  1. Expect a JWT in a query parameter (e.g., ws://localhost:8000/ws?token=YOUR_JWT).
  2. Validate that JWT inside the WebSocket endpoint using a similar mechanism to get_current_user_db. We will address securing WebSocket connections in a later chapter. For now, the focus is on message persistence.

6. Run and Test

  1. Start the server:

    pipenv shell
    uvicorn app.main:app --reload
    
  2. Register a user: Go to http://127.0.0.1:8000/docs, use the /register endpoint to create a user (e.g., username: chatuser, password: chatpassword).

  3. Login to get a token: Use the /token endpoint with the new user’s credentials to get an access token.

  4. Authorize in Swagger UI: Use the obtained token to authorize in Swagger.

  5. Test chat history: Access http://127.0.0.1:8000/messages (ensure you are authorized). Initially, it will be empty.

  6. Use client.html for chat: Open client.html and connect. Send a few messages. In app/main.py, the owner_id for new messages is currently hardcoded to 1. In the database, the first registered user (chatuser if you followed step 2) will likely have id=1.

  7. Check chat history again: Refresh http://127.0.0.1:8000/messages (authorized). You should now see the messages you sent, stored in the database!

    If you restart the server, the chat history will still be there, demonstrating persistence.

Tips/Challenges/Errors

  • sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: users: This means you didn’t run the Base.metadata.create_all(bind=engine) step or it failed. Ensure chat.db exists and contains the tables.
  • Pydantic Config.orm_mode / from_attributes: If you are using Pydantic v1.x, use orm_mode = True. For Pydantic v2.x, use from_attributes = True. FastAPI documentation usually defaults to v2.x.
  • Database Session Management: It’s crucial to use Depends(get_db) for every endpoint that interacts with the database to ensure proper session handling (opening and closing connections).

Summary/Key Takeaways

You’ve successfully integrated a SQLite database using SQLAlchemy, allowing for persistent storage of user data and chat messages. You’ve learned how to define models, create tables, and perform basic database operations (add, commit, query). Our chat application can now remember messages! In the next chapter, we’ll enhance the chat functionality by introducing the concept of chat rooms.