Our current WebSocket endpoint only echoes messages back to the sender. A real chat application needs to handle multiple users, allowing them to join, leave, and send messages that are broadcast to all other connected users (or users in a specific room). This chapter introduces a WebSocket connection manager to address this.
Purpose of this Chapter
By the end of this chapter, you will:
- Understand the need for a connection manager in a real-time application.
- Implement a
ConnectionManagerclass to store and manage active WebSocket connections. - Modify the WebSocket endpoint to use the manager for connecting, disconnecting, and broadcasting messages.
- Test broadcasting functionality with multiple client connections.
Concepts Explained: Connection Management
When a client connects via WebSocket, the WebSocket object represents that specific connection. To send a message to all active clients, or a subset of them, our server needs a way to keep track of these individual WebSocket objects. A ConnectionManager class typically serves this purpose.
It will include methods for:
connect(websocket): Adds a new WebSocket connection to a list of active connections.disconnect(websocket): Removes a WebSocket connection when a client disconnects.send_personal_message(message, websocket): Sends a message to a specific client.broadcast(message): Sends a message to all currently connected clients.
Step-by-Step Tasks
1. Create app/connections.py for the ConnectionManager
Create a new file app/connections.py to define our ConnectionManager class.
# app/connections.py
from typing import List
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except RuntimeError:
# Handle cases where a connection might have unexpectedly closed
# (e.g., during iteration)
self.active_connections.remove(connection)
print(f"Removed disconnected client during broadcast.")
Code Explanation (app/connections.py):
active_connections: A list to hold all activeWebSocketobjects.connect(): Accepts the connection and adds it to the list.disconnect(): Removes the connection from the list.send_personal_message(): Sends text to a single client.broadcast(): Iterates through all active connections and sends the message. It includes a basictry-exceptblock to remove connections that might have closed unexpectedly during the broadcast.
2. Update app/main.py to Use the ConnectionManager
Now, let’s integrate ConnectionManager into our app/main.py and modify the WebSocket endpoint.
# app/main.py (updated)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Dict, List
from .auth import Hasher, create_access_token, get_current_user, ACCESS_TOKEN_EXPIRE_MINUTES, Token
from .connections import ConnectionManager # Import our new ConnectionManager
from datetime import timedelta
app = FastAPI()
# --- Temporary User Storage (Replace with a real database later) ---
fake_users_db = {
"testuser": {
"username": "testuser",
"hashed_password": Hasher.get_password_hash("password123"),
},
"user2": {
"username": "user2",
"hashed_password": Hasher.get_password_hash("password123"),
}
}
# Instantiate the ConnectionManager globally
# In a larger application, this might be handled via dependency injection
manager = ConnectionManager()
@app.get("/")
async def read_root():
return {"message": "Welcome to the Real-time Chat API!"}
@app.get("/health")
async def health_check():
return {"status": "ok"}
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = fake_users_db.get(form_data.username)
if not user or not Hasher.verify_password(form_data.password, user["hashed_password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user["username"]}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: str = Depends(get_current_user)):
return {"username": current_user, "message": "You are authenticated!"}
# Updated WebSocket endpoint
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
await manager.broadcast(f"Client #{client_id} joined the chat.")
try:
while True:
data = await websocket.receive_text()
# Send message from this client to all other connected clients
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat.")
Code Explanation (app/main.py updates):
from .connections import ConnectionManager: Imports our newly created manager.manager = ConnectionManager(): An instance of the manager is created globally.@app.websocket("/ws/{client_id}"): The WebSocket endpoint now takes aclient_idpath parameter. This will help us identify who is sending messages.await manager.connect(websocket): When a client connects, we add theirwebsocketobject to our manager.await manager.broadcast(f"Client #{client_id} joined the chat."): A message is broadcast to all clients when a new user connects.await manager.broadcast(f"Client #{client_id} says: {data}"): Any message received from a client is now broadcast to everyone.manager.disconnect(websocket)andawait manager.broadcast(...): When a client disconnects, they are removed from the manager, and a “left chat” message is broadcast.
3. Test with Multiple JavaScript Clients
Start the server:
pipenv shell uvicorn app.main:app --reloadModify
client.html: Update theclient.htmlto connect tows://localhost:8000/ws/SOME_CLIENT_ID. You can changeSOME_CLIENT_IDfor each browser tab you open to simulate multiple users.<!-- client.html (updated) --> <!DOCTYPE html> <html> <head> <title>FastAPI WebSocket Chat Client</title> </head> <body> <h1>WebSocket Chat Test</h1> <label for="clientIdInput">Client ID:</label> <input type="text" id="clientIdInput" value="guest" onchange="updateWebSocket()" placeholder="Enter your ID"> <input type="text" id="messageInput" placeholder="Type a message"> <button onclick="sendMessage()">Send</button> <div id="messages"></div> <script> let ws; const messagesDiv = document.getElementById("messages"); const messageInput = document.getElementById("messageInput"); const clientIdInput = document.getElementById("clientIdInput"); function connectWebSocket(clientId) { if (ws) { ws.close(); } ws = new WebSocket(`ws://localhost:8000/ws/${clientId}`); ws.onopen = (event) => { messagesDiv.innerHTML += `<p>Connected as ${clientId} to WebSocket server!</p>`; console.log("WebSocket opened:", event); }; ws.onmessage = (event) => { messagesDiv.innerHTML += `<p>Received: ${event.data}</p>`; console.log("WebSocket message:", event.data); }; ws.onclose = (event) => { messagesDiv.innerHTML += `<p>Disconnected as ${clientId} from WebSocket server.</p>`; console.log("WebSocket closed:", event); }; ws.onerror = (event) => { messagesDiv.innerHTML += "<p style='color:red;'>WebSocket error!</p>"; console.error("WebSocket error:", event); }; } function updateWebSocket() { const clientId = clientIdInput.value || "guest"; connectWebSocket(clientId); } function sendMessage() { const message = messageInput.value; if (message && ws && ws.readyState === WebSocket.OPEN) { ws.send(message); messageInput.value = ""; // Clear input field } else if (ws && ws.readyState !== WebSocket.OPEN) { messagesDiv.innerHTML += "<p style='color:orange;'>WebSocket is not open. Trying to reconnect...</p>"; updateWebSocket(); // Attempt to reconnect } } // Initial connection updateWebSocket(); // Send a message when Enter key is pressed messageInput.addEventListener("keypress", (event) => { if (event.key === "Enter") { sendMessage(); } }); </script> </body> </html>Open multiple
client.htmltabs: Openclient.htmlin one browser tab. It will connect with “guest” as the ID. Open another tab withclient.html. Change the “Client ID” input to “user2” and reload/reconnect. Now, when you type a message in one tab, you should see it appear in both tabs, demonstrating the broadcast functionality. You’ll also see join/leave messages.
Tips/Challenges/Errors
- Order of Imports: Ensure your imports are correct (
.authand.connections). - Missing
client_idin URL: The WebSocket connection will fail if you don’t provide aclient_idin the URL (e.g.,ws://localhost:8000/ws/myuser). - Duplicate
client_id: Currently, our system allows duplicate client IDs. For real authentication, theclient_idwould actually be the authenticated user’s ID, which is unique.
Summary/Key Takeaways
You’ve successfully implemented a ConnectionManager to keep track of active WebSocket connections and broadcast messages to all connected clients. This is a fundamental step towards a fully interactive chat application. In the next chapter, we will introduce a database to store messages, ensuring that chat history is persistent even if the server restarts.