Python WebSocket with websockets
The websockets library is the standard choice for Python WebSocket communication. It is built on top of asyncio, supports both client and server roles, and follows the WebSocket specification completely. This guide covers installation, server creation, client connections, broadcasting, authentication, error handling, and common patterns you will encounter in real projects.
Installation
Install the library with pip:
pip install websockets
You need Python 3.7 or newer. The websockets library depends on asyncio, which is part of the Python standard library. No additional dependencies are required.
Verify the installation:
import websockets
print(websockets.__version__)
Basic Server
A WebSocket server built with websockets requires an async handler function. This function receives a connection object for each client that connects. You pass it to websockets.serve() and run the event loop with asyncio.run(). The asyncio WebSocket implementation handles connections concurrently without threading.
import asyncio
import websockets
async def handler(websocket):
async for message in websocket:
print(f"Received: {message}")
await websocket.send(f"Echo: {message}")
async def main():
async with websockets.serve(handler, "localhost", 8765):
await asyncio.Future() # Run forever
asyncio.run(main())
The async for message in websocket pattern automatically handles the connection lifecycle. It iterates over incoming messages until the client disconnects. The server listens on localhost port 8765 and echoes back every message it receives.
The await asyncio.Future() call keeps the server running indefinitely. Without it, the server would start and immediately shut down.
Basic Client
The client side uses websockets.connect() as an async context manager. This opens the connection, gives you a websocket object, and closes the connection when you exit the block.
import asyncio
import websockets
async def client():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
await websocket.send("Hello, server!")
response = await websocket.recv()
print(f"Server says: {response}")
asyncio.run(client())
For long-running clients that need to send and receive multiple messages, you can loop inside the context manager:
import asyncio
import websockets
async def interactive_client():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
for i in range(5):
message = f"Message {i}"
await websocket.send(message)
response = await websocket.recv()
print(f"Got: {response}")
await asyncio.sleep(1)
asyncio.run(interactive_client())
Handling Multiple Clients
A real server needs to handle many clients at once. The websockets library creates a new asyncio task for each connection automatically, so your handler function runs concurrently for every client. To broadcast messages, keep track of connected clients in a set.
import asyncio
import websockets
CONNECTIONS = set()
async def handler(websocket):
CONNECTIONS.add(websocket)
try:
async for message in websocket:
# Broadcast to all connected clients
websockets.broadcast(CONNECTIONS, message)
finally:
CONNECTIONS.remove(websocket)
async def main():
async with websockets.serve(handler, "localhost", 8765):
await asyncio.Future()
asyncio.run(main())
The websockets.broadcast() function sends a message to every connection in the set. It handles errors internally, so if one client has disconnected, the broadcast still reaches the others. The try/finally block guarantees that a disconnected client is always removed from the set.
If you need to send a message to a specific subset of clients, iterate over the set yourself:
async def send_to_many(targets: set, message: str):
for ws in targets.copy():
try:
await ws.send(message)
except websockets.ConnectionClosed:
targets.discard(ws)
JSON Messages
Most real applications send structured data rather than plain strings. Use Python’s json module to serialize and deserialize messages. Defining a simple message protocol helps keep your code organized.
import asyncio
import json
import websockets
async def handler(websocket):
async for raw in websocket:
data = json.loads(raw)
msg_type = data.get("type")
if msg_type == "ping":
response = {"type": "pong", "timestamp": data.get("timestamp")}
await websocket.send(json.dumps(response))
elif msg_type == "chat":
response = {
"type": "chat",
"user": data.get("user", "anonymous"),
"text": data.get("text", ""),
}
await websocket.send(json.dumps(response))
else:
error = {"type": "error", "message": f"Unknown type: {msg_type}"}
await websocket.send(json.dumps(error))
async def main():
async with websockets.serve(handler, "localhost", 8765):
await asyncio.Future()
asyncio.run(main())
A matching client:
import asyncio
import json
import websockets
async def client():
async with websockets.connect("ws://localhost:8765") as ws:
# Send a chat message
await ws.send(json.dumps({
"type": "chat",
"user": "alice",
"text": "Hello everyone!"
}))
response = json.loads(await ws.recv())
print(response)
asyncio.run(client())
For a deeper look at message formats and protocols, see the WebSocket security guide which covers validation of incoming data.
Authentication
You often need to verify a client’s identity before allowing a WebSocket connection. The websockets library lets you inspect headers and query parameters during the handshake using the process_request callback.
import asyncio
import websockets
from http import HTTPStatus
from urllib.parse import urlparse, parse_qs
VALID_TOKENS = {"secret-token-123", "secret-token-456"}
async def authenticate(connection, request):
# Check token from query parameter
query = parse_qs(urlparse(request.path).query)
tokens = query.get("token", [])
if not tokens or tokens[0] not in VALID_TOKENS:
return connection.respond(HTTPStatus.UNAUTHORIZED, "Invalid token\n")
async def handler(websocket):
async for message in websocket:
await websocket.send(f"Authenticated echo: {message}")
async def main():
async with websockets.serve(
handler,
"localhost",
8765,
process_request=authenticate,
):
await asyncio.Future()
asyncio.run(main())
The client connects with a token in the URL:
async def authenticated_client():
uri = "ws://localhost:8765?token=secret-token-123"
async with websockets.connect(uri) as ws:
await ws.send("Private message")
print(await ws.recv())
You can also check the Authorization header by passing extra headers from the client:
async def header_auth_client():
headers = {"Authorization": "Bearer my-jwt-token"}
async with websockets.connect("ws://localhost:8765", additional_headers=headers) as ws:
await ws.send("Hello")
print(await ws.recv())
Error Handling
WebSocket connections can close at any time. The websockets library raises ConnectionClosed (and its subclasses ConnectionClosedOK and ConnectionClosedError) when this happens. Proper error handling prevents your server from crashing and lets your client recover.
Server-side error handling:
import asyncio
import websockets
async def handler(websocket):
try:
async for message in websocket:
try:
result = process_message(message)
await websocket.send(result)
except ValueError as e:
await websocket.send(f"Error: {e}")
except websockets.ConnectionClosedError as e:
print(f"Client disconnected with error: {e.code} {e.reason}")
except websockets.ConnectionClosedOK:
print("Client disconnected normally")
def process_message(message: str) -> str:
# Your business logic here
return f"Processed: {message}"
Client-side reconnection pattern:
import asyncio
import websockets
async def resilient_client():
uri = "ws://localhost:8765"
while True:
try:
async with websockets.connect(uri) as ws:
print("Connected")
async for message in ws:
print(f"Received: {message}")
except websockets.ConnectionClosed:
print("Connection lost. Reconnecting in 3 seconds...")
await asyncio.sleep(3)
except OSError as e:
print(f"Connection failed: {e}. Retrying in 5 seconds...")
await asyncio.sleep(5)
asyncio.run(resilient_client())
This reconnection loop catches both clean disconnections and network failures. You can add exponential backoff for production use by increasing the wait time on each retry. For more patterns around connection management, check the WebSocket scalability guide.
Rooms and Groups Pattern
Many applications need to group connections into rooms or channels. A dictionary mapping room names to sets of connections works well for this.
import asyncio
import json
import websockets
ROOMS: dict[str, set] = {}
async def handler(websocket):
current_room = None
try:
async for raw in websocket:
data = json.loads(raw)
action = data.get("action")
if action == "join":
room_name = data.get("room", "default")
# Leave current room if in one
if current_room and current_room in ROOMS:
ROOMS[current_room].discard(websocket)
# Join new room
if room_name not in ROOMS:
ROOMS[room_name] = set()
ROOMS[room_name].add(websocket)
current_room = room_name
await websocket.send(json.dumps({
"action": "joined",
"room": room_name,
"members": len(ROOMS[room_name]),
}))
elif action == "message":
if current_room and current_room in ROOMS:
broadcast_data = json.dumps({
"action": "message",
"room": current_room,
"text": data.get("text", ""),
"sender": data.get("sender", "anonymous"),
})
websockets.broadcast(ROOMS[current_room], broadcast_data)
elif action == "leave":
if current_room and current_room in ROOMS:
ROOMS[current_room].discard(websocket)
if not ROOMS[current_room]:
del ROOMS[current_room]
current_room = None
await websocket.send(json.dumps({"action": "left"}))
finally:
if current_room and current_room in ROOMS:
ROOMS[current_room].discard(websocket)
if not ROOMS[current_room]:
del ROOMS[current_room]
async def main():
async with websockets.serve(handler, "localhost", 8765):
await asyncio.Future()
asyncio.run(main())
The finally block cleans up the connection from any room when the client disconnects. Empty rooms are removed from the dictionary to prevent memory leaks.
Full Example: Chat Server and Client
Here is a complete, working chat application. The server tracks users by name, supports joining rooms, and broadcasts messages to everyone in the same room.
Chat Server
import asyncio
import json
import websockets
from datetime import datetime
ROOMS: dict[str, set] = {}
USER_NAMES: dict[websockets.WebSocketServerProtocol, str] = {}
async def handler(websocket):
username = None
room = None
try:
async for raw in websocket:
data = json.loads(raw)
action = data.get("action")
if action == "register":
username = data["username"]
USER_NAMES[websocket] = username
await websocket.send(json.dumps({
"action": "registered",
"username": username,
}))
elif action == "join":
if not username:
await websocket.send(json.dumps({
"action": "error",
"message": "Register first",
}))
continue
room = data["room"]
if room not in ROOMS:
ROOMS[room] = set()
ROOMS[room].add(websocket)
# Notify the room
notification = json.dumps({
"action": "notification",
"text": f"{username} joined {room}",
"timestamp": datetime.now().isoformat(),
})
websockets.broadcast(ROOMS[room], notification)
elif action == "message":
if not room or room not in ROOMS:
continue
outgoing = json.dumps({
"action": "message",
"username": username or "anonymous",
"text": data.get("text", ""),
"room": room,
"timestamp": datetime.now().isoformat(),
})
websockets.broadcast(ROOMS[room], outgoing)
elif action == "list_rooms":
room_list = {
name: len(members) for name, members in ROOMS.items()
}
await websocket.send(json.dumps({
"action": "room_list",
"rooms": room_list,
}))
finally:
if room and room in ROOMS:
ROOMS[room].discard(websocket)
if ROOMS[room] and username:
notification = json.dumps({
"action": "notification",
"text": f"{username} left {room}",
"timestamp": datetime.now().isoformat(),
})
websockets.broadcast(ROOMS[room], notification)
if not ROOMS[room]:
del ROOMS[room]
USER_NAMES.pop(websocket, None)
async def main():
print("Chat server starting on ws://localhost:8765")
async with websockets.serve(handler, "localhost", 8765):
await asyncio.Future()
asyncio.run(main())
Chat Client
import asyncio
import json
import sys
import websockets
async def receive_messages(websocket):
async for raw in websocket:
data = json.loads(raw)
action = data.get("action")
if action == "message":
print(f"[{data['room']}] {data['username']}: {data['text']}")
elif action == "notification":
print(f"** {data['text']} **")
elif action == "registered":
print(f"Registered as {data['username']}")
elif action == "room_list":
print("Active rooms:")
for name, count in data["rooms"].items():
print(f" {name}: {count} user(s)")
elif action == "error":
print(f"Error: {data['message']}")
async def send_messages(websocket, username: str, room: str):
# Register and join
await websocket.send(json.dumps({"action": "register", "username": username}))
await asyncio.sleep(0.1)
await websocket.send(json.dumps({"action": "join", "room": room}))
# Read from stdin in a loop
loop = asyncio.get_event_loop()
while True:
text = await loop.run_in_executor(None, sys.stdin.readline)
text = text.strip()
if not text:
continue
if text == "/quit":
break
if text == "/rooms":
await websocket.send(json.dumps({"action": "list_rooms"}))
continue
await websocket.send(json.dumps({"action": "message", "text": text}))
async def main():
username = input("Enter your username: ").strip()
room = input("Enter room name: ").strip()
uri = "ws://localhost:8765"
async with websockets.connect(uri) as ws:
receiver = asyncio.create_task(receive_messages(ws))
sender = asyncio.create_task(send_messages(ws, username, room))
done, pending = await asyncio.wait(
[receiver, sender],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
asyncio.run(main())
Run the server in one terminal and start multiple clients in other terminals to test the chat.
Integration with Web Frameworks
The websockets library works well on its own, but you may want to integrate WebSocket support into an existing web application. Here is how it compares with framework-based options.
FastAPI and Starlette have built-in WebSocket support through ASGI. If your project already uses FastAPI for REST endpoints, adding WebSocket routes directly to FastAPI keeps everything in one application:
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
When to use websockets standalone: You want a dedicated WebSocket server separate from your HTTP API, you need fine-grained control over the WebSocket handshake, or your project does not use a web framework.
When to use a framework: Your application already serves HTTP endpoints, you want a single deployment unit, or you need the middleware and dependency injection features of the framework.
For testing either approach, see how to test WebSocket connections and try the online WebSocket tester.
Production Tips
Running websockets in production requires attention to timeouts, message limits, and security settings.
Ping and Pong
The websockets library sends ping frames automatically to detect dead connections. You can configure the interval and timeout:
async def main():
async with websockets.serve(
handler,
"localhost",
8765,
ping_interval=20, # Send a ping every 20 seconds
ping_timeout=10, # Wait 10 seconds for a pong response
):
await asyncio.Future()
If a client does not respond to a ping within ping_timeout seconds, the server closes the connection. This prevents zombie connections from accumulating.
Message Size Limits
By default, websockets allows messages up to 1 MB. For applications that transfer large payloads, increase this limit. For applications that expect small messages, decrease it to protect against abuse:
async with websockets.serve(
handler, "localhost", 8765,
max_size=2 ** 20, # 1 MB (default)
):
await asyncio.Future()
Origin Checking
Restrict which origins can connect to your WebSocket server. This helps prevent cross-site WebSocket hijacking:
async with websockets.serve(
handler, "localhost", 8765,
origins=["https://yoursite.com", "https://www.yoursite.com"],
):
await asyncio.Future()
Logging
The websockets library uses Python’s standard logging module. Enable debug logging to troubleshoot connection issues:
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("websockets")
logger.setLevel(logging.DEBUG)
For production, set the level to WARNING or ERROR to reduce log volume.
Running Behind a Reverse Proxy
When running behind nginx or another reverse proxy, make sure to configure WebSocket proxying. On the Python side, no changes are needed. The websockets library reads the X-Forwarded-For header automatically when available.
Alternatives
The websockets library is not the only option for WebSocket support in Python. Here is a comparison of popular choices:
| Library | Async | Server | Client | Framework Integration |
|---|---|---|---|---|
| websockets | Yes | Yes | Yes | Standalone |
| aiohttp | Yes | Yes | Yes | aiohttp web framework |
| FastAPI | Yes | Yes | No | FastAPI/Starlette |
| Django Channels | Yes | Yes | No | Django |
aiohttp provides WebSocket support as part of a larger HTTP client/server library. If you already use aiohttp for HTTP requests, adding WebSocket support through it avoids an extra dependency.
FastAPI offers WebSocket support through Starlette’s ASGI implementation. It is the best choice when your project already uses FastAPI for REST APIs.
Django Channels extends Django with WebSocket and other async protocol support. If your project is built on Django, Channels is the natural fit, though it requires additional infrastructure like a channel layer (typically Redis).
For a standalone WebSocket server or client with no framework baggage, websockets remains the most focused and well-maintained option. To understand how WebSocket connections work at the protocol level, read the JavaScript WebSocket guide which covers the same concepts from the browser side.
What to Read Next
- JavaScript WebSocket Guide covers the browser-side API, which pairs with a Python websockets server.
- WebSocket Security explains authentication, encryption, input validation, and common attack vectors.
- WebSocket Scalability covers horizontal scaling, load balancing, and managing thousands of concurrent connections.
- How to Test WebSocket Connections walks through unit testing, integration testing, and load testing for WebSocket servers.
- WebSocket Tester Tool provides an interactive browser-based tool for connecting to and debugging WebSocket servers.