tests.ws

Rust WebSocket with Tungstenite

rust websocket tungstenite tokio async

Tungstenite is a lightweight, spec-compliant WebSocket library for Rust. It provides both a synchronous API through the tungstenite crate and an async API through tokio-tungstenite, which integrates with the Tokio runtime. If you need low-level control over WebSocket connections in Rust without pulling in a full web framework, tungstenite is the standard choice.

The library handles the WebSocket handshake, frame parsing, control frames (ping, pong, close), and message fragmentation. It works with both text and binary messages and follows RFC 6455 closely. For most production services, you will want the async variant, tokio-tungstenite, since it allows you to handle thousands of concurrent connections on a single thread.

Installation

Add the dependencies to your Cargo.toml. For async usage, you need both tokio and tokio-tungstenite:

[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.24"
tungstenite = "0.24"
futures-util = "0.3"

The futures-util crate provides stream and sink extensions that make working with async WebSocket connections more ergonomic. You will use StreamExt and SinkExt traits throughout your async code.

If you only need the synchronous API (for simple scripts or testing), you can depend on tungstenite alone:

[dependencies]
tungstenite = "0.24"

Run cargo build to fetch and compile the dependencies. Tungstenite compiles quickly since it has a small dependency tree.

Synchronous Server

The tungstenite crate provides a blocking API that is useful for prototyping or simple single-threaded tools. You bind a TCP listener and accept connections manually.

use std::net::TcpListener;
use tungstenite::accept;
use tungstenite::Message;

fn main() {
    let server = TcpListener::bind("127.0.0.1:9001").expect("Failed to bind");
    println!("Listening on ws://127.0.0.1:9001");

    for stream in server.incoming() {
        let stream = stream.expect("Failed to accept connection");
        let mut ws = accept(stream).expect("WebSocket handshake failed");

        loop {
            let msg = match ws.read() {
                Ok(msg) => msg,
                Err(e) => {
                    eprintln!("Read error: {}", e);
                    break;
                }
            };

            match msg {
                Message::Text(text) => {
                    println!("Received: {}", text);
                    ws.send(Message::Text(format!("Echo: {}", text))).ok();
                }
                Message::Binary(data) => {
                    ws.send(Message::Binary(data)).ok();
                }
                Message::Close(_) => {
                    println!("Client disconnected");
                    break;
                }
                _ => {}
            }
        }
    }
}

This server handles one client at a time. Each call to ws.read() blocks the thread until a message arrives. For anything beyond a quick test, you should use the async API instead.

You can test this with a browser console or a WebSocket testing tool:

const ws = new WebSocket("ws://127.0.0.1:9001");
ws.onmessage = (e) => console.log(e.data);
ws.onopen = () => ws.send("hello");

Async Server with Tokio-Tungstenite

The async API is what you will use in production. It integrates with Tokio’s TCP listener and spawns a task per connection. Each task is lightweight, so you can handle many thousands of concurrent clients.

use futures_util::{SinkExt, StreamExt};
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use tungstenite::Message;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:9001").await.unwrap();
    println!("Listening on ws://127.0.0.1:9001");

    while let Ok((stream, addr)) = listener.accept().await {
        tokio::spawn(async move {
            let ws_stream = match accept_async(stream).await {
                Ok(ws) => ws,
                Err(e) => {
                    eprintln!("Handshake failed for {}: {}", addr, e);
                    return;
                }
            };

            let (mut write, mut read) = ws_stream.split();

            while let Some(result) = read.next().await {
                match result {
                    Ok(Message::Text(text)) => {
                        println!("[{}] {}", addr, text);
                        if write.send(Message::Text(format!("Echo: {}", text))).await.is_err() {
                            break;
                        }
                    }
                    Ok(Message::Binary(data)) => {
                        if write.send(Message::Binary(data)).await.is_err() {
                            break;
                        }
                    }
                    Ok(Message::Close(_)) => break,
                    Err(e) => {
                        eprintln!("Error from {}: {}", addr, e);
                        break;
                    }
                    _ => {}
                }
            }

            println!("{} disconnected", addr);
        });
    }
}

The split() method divides the WebSocket stream into a read half and a write half. This is important because it lets you read and write concurrently within the same task, or pass the write half to another task entirely.

Each spawned task runs independently. When a client disconnects or an error occurs, the task exits and its resources are cleaned up automatically.

Client Connection

Tokio-tungstenite also provides a client API. This is useful for building WebSocket clients, writing integration tests, or connecting to external services.

use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::connect_async;
use tungstenite::Message;

#[tokio::main]
async fn main() {
    let url = "ws://127.0.0.1:9001";
    let (mut ws_stream, response) = connect_async(url)
        .await
        .expect("Failed to connect");

    println!("Connected with status: {}", response.status());

    ws_stream.send(Message::Text("Hello from Rust".into())).await.unwrap();

    while let Some(result) = ws_stream.next().await {
        match result {
            Ok(Message::Text(text)) => {
                println!("Server says: {}", text);
                break;
            }
            Ok(Message::Close(frame)) => {
                println!("Server closed: {:?}", frame);
                break;
            }
            Err(e) => {
                eprintln!("Error: {}", e);
                break;
            }
            _ => {}
        }
    }

    ws_stream.close(None).await.ok();
}

The connect_async function performs the WebSocket handshake and returns both the stream and the HTTP response from the upgrade. You can inspect response headers if the server sends custom headers during the handshake.

If you need to set custom headers on the client request (for authentication tokens, for example), use tokio_tungstenite::connect_async_with_config with a custom Request:

use tokio_tungstenite::tungstenite::http::Request;

let request = Request::builder()
    .uri("ws://127.0.0.1:9001")
    .header("Authorization", "Bearer my-token")
    .header("Sec-WebSocket-Protocol", "chat")
    .body(())
    .unwrap();

let (ws_stream, _response) = connect_async(request).await.unwrap();

JSON Messaging

Most real applications send structured data over WebSocket connections. Use serde and serde_json to serialize and deserialize messages.

Add the dependencies:

[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"

Define your message types:

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
enum ClientMessage {
    #[serde(rename = "chat")]
    Chat { text: String },
    #[serde(rename = "join")]
    Join { room: String },
    #[serde(rename = "ping")]
    Ping,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
enum ServerMessage {
    #[serde(rename = "chat")]
    Chat { from: String, text: String },
    #[serde(rename = "error")]
    Error { message: String },
    #[serde(rename = "pong")]
    Pong,
}

Use them in your message handler:

Ok(Message::Text(text)) => {
    match serde_json::from_str::<ClientMessage>(&text) {
        Ok(ClientMessage::Chat { text }) => {
            let reply = ServerMessage::Chat {
                from: addr.to_string(),
                text,
            };
            let json = serde_json::to_string(&reply).unwrap();
            write.send(Message::Text(json)).await.ok();
        }
        Ok(ClientMessage::Ping) => {
            let reply = serde_json::to_string(&ServerMessage::Pong).unwrap();
            write.send(Message::Text(reply)).await.ok();
        }
        Err(e) => {
            let reply = ServerMessage::Error {
                message: format!("Invalid message: {}", e),
            };
            let json = serde_json::to_string(&reply).unwrap();
            write.send(Message::Text(json)).await.ok();
        }
        _ => {}
    }
}

The #[serde(tag = "type")] attribute creates tagged enums, so each message includes a "type" field in the JSON. This is a common pattern for WebSocket protocols where the client and server exchange different kinds of messages on the same connection.

Broadcasting with Shared State

A real-time application usually needs to send messages to all connected clients or a subset of them. The standard approach in Rust is to wrap a shared collection in Arc<Mutex<>> or, for better performance, use Tokio’s broadcast or mpsc channels.

Here is a broadcast approach using tokio::sync::broadcast:

use std::sync::Arc;
use futures_util::{SinkExt, StreamExt};
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tokio_tungstenite::accept_async;
use tungstenite::Message;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:9001").await.unwrap();
    let (tx, _) = broadcast::channel::<String>(100);

    println!("Listening on ws://127.0.0.1:9001");

    while let Ok((stream, addr)) = listener.accept().await {
        let tx = tx.clone();
        let mut rx = tx.subscribe();

        tokio::spawn(async move {
            let ws_stream = match accept_async(stream).await {
                Ok(ws) => ws,
                Err(_) => return,
            };

            let (mut write, mut read) = ws_stream.split();

            let write_task = tokio::spawn(async move {
                while let Ok(msg) = rx.recv().await {
                    if write.send(Message::Text(msg)).await.is_err() {
                        break;
                    }
                }
            });

            while let Some(Ok(msg)) = read.next().await {
                if let Message::Text(text) = msg {
                    let formatted = format!("[{}]: {}", addr, text);
                    let _ = tx.send(formatted);
                }
            }

            write_task.abort();
            println!("{} disconnected", addr);
        });
    }
}

Each client task subscribes to the broadcast channel. When any client sends a text message, it is published to the channel, and every subscriber (including the sender) receives it. The broadcast channel has a fixed capacity; if a receiver falls behind, it will get a RecvError::Lagged error and skip the missed messages.

For more advanced room-based routing, you can use a HashMap<String, broadcast::Sender<String>> wrapped in Arc<RwLock<>> to manage multiple named channels.

Ping/Pong Handling

The WebSocket protocol defines ping and pong frames as a keepalive mechanism. Tungstenite handles pong responses automatically by default. When a ping frame arrives, the library sends back a pong frame with the same payload without requiring any action from your code.

If you want to actively send pings to detect dead connections, you can do so with a timer:

use tokio::time::{interval, Duration};
use futures_util::SinkExt;
use tungstenite::Message;

// Inside your connection handler, after splitting:
let mut ping_interval = interval(Duration::from_secs(30));

loop {
    tokio::select! {
        msg = read.next() => {
            match msg {
                Some(Ok(Message::Pong(_))) => {
                    // Client is alive
                }
                Some(Ok(other)) => {
                    // Handle other messages
                }
                _ => break,
            }
        }
        _ = ping_interval.tick() => {
            if write.send(Message::Ping(vec![].into())).await.is_err() {
                break;
            }
        }
    }
}

You should use tokio::select! to multiplex reading messages and sending pings on the same task. If the client does not respond to a ping within a reasonable time, you can close the connection. A typical timeout is two or three missed ping intervals.

For more details on close frame behavior and close codes, the protocol specifies that both sides should exchange close frames before dropping the TCP connection.

Error Handling

Tungstenite defines tungstenite::Error with several variants you should handle:

use tungstenite::Error;

match result {
    Err(Error::ConnectionClosed) => {
        // The connection was closed normally.
        println!("Connection closed");
    }
    Err(Error::AlreadyClosed) => {
        // You tried to send on a closed connection.
        println!("Attempted to use closed connection");
    }
    Err(Error::Protocol(reason)) => {
        // The peer violated the WebSocket protocol.
        eprintln!("Protocol violation: {}", reason);
    }
    Err(Error::Io(e)) => {
        // Underlying TCP I/O error.
        eprintln!("I/O error: {}", e);
    }
    Err(Error::Capacity(msg)) => {
        // Message or frame exceeded the configured size limit.
        eprintln!("Capacity error: {}", msg);
    }
    Err(e) => {
        eprintln!("Other error: {}", e);
    }
    Ok(_) => {}
}

The ConnectionClosed variant is expected during normal shutdown. Do not treat it as a fatal error. Protocol errors indicate a misbehaving client, which can happen when bots or broken clients connect. Capacity errors fire when a message exceeds the configured maximum size, which is an important safeguard against memory exhaustion from malicious payloads.

To set the maximum message size, configure the WebSocket connection:

use tungstenite::protocol::WebSocketConfig;

let config = WebSocketConfig {
    max_message_size: Some(64 * 1024 * 1024), // 64 MB
    max_frame_size: Some(16 * 1024 * 1024),   // 16 MB
    ..Default::default()
};

let ws_stream = accept_async_with_config(stream, Some(config)).await?;

TLS Connections

For production deployments, you should encrypt WebSocket traffic using TLS (the wss:// scheme). Tokio-tungstenite supports TLS through either native-tls or rustls.

To use rustls (a pure-Rust TLS implementation), enable the feature:

[dependencies]
tokio-tungstenite = { version = "0.24", features = ["rustls-tls-webpki-roots"] }

With this feature enabled, connect_async will automatically handle wss:// URLs:

let (ws_stream, _) = connect_async("wss://echo.websocket.org").await?;

For a server with TLS, you need to load your certificate and key, then wrap the TCP stream before passing it to the WebSocket acceptor:

use tokio_rustls::TlsAcceptor;
use tokio_rustls::rustls::{ServerConfig, Certificate, PrivateKey};
use std::sync::Arc;

fn load_tls_config() -> Arc<ServerConfig> {
    let cert_pem = std::fs::read("cert.pem").unwrap();
    let key_pem = std::fs::read("key.pem").unwrap();

    let certs: Vec<Certificate> = rustls_pemfile::certs(&mut &*cert_pem)
        .unwrap()
        .into_iter()
        .map(Certificate)
        .collect();

    let key = rustls_pemfile::pkcs8_private_keys(&mut &*key_pem)
        .unwrap()
        .remove(0);

    let config = ServerConfig::builder()
        .with_safe_defaults()
        .with_no_client_auth()
        .with_single_cert(certs, PrivateKey(key))
        .unwrap();

    Arc::new(config)
}

#[tokio::main]
async fn main() {
    let tls_config = load_tls_config();
    let tls_acceptor = TlsAcceptor::from(tls_config);
    let listener = TcpListener::bind("0.0.0.0:9443").await.unwrap();

    while let Ok((stream, addr)) = listener.accept().await {
        let acceptor = tls_acceptor.clone();
        tokio::spawn(async move {
            let tls_stream = match acceptor.accept(stream).await {
                Ok(s) => s,
                Err(e) => {
                    eprintln!("TLS handshake failed for {}: {}", addr, e);
                    return;
                }
            };
            let ws_stream = accept_async(tls_stream).await.unwrap();
            // Handle WebSocket messages as before
        });
    }
}

Production Tips

Buffer Sizes and Backpressure

Set max_message_size and max_frame_size in WebSocketConfig to prevent a single client from consuming excessive memory. The defaults are 64 MB and 16 MB respectively, which may be too generous for your use case. For a chat application, 1 MB is usually more than enough.

When broadcasting to many clients, a slow consumer can cause messages to queue up. Use Tokio’s broadcast channel with a reasonable capacity, and handle the Lagged error by dropping the slow client or skipping messages. Never let an unbounded queue grow without limits.

Graceful Shutdown

Use tokio::signal to catch SIGTERM or SIGINT, then stop accepting new connections and close existing ones:

use tokio::signal;
use tokio_util::sync::CancellationToken;

let token = CancellationToken::new();
let shutdown_token = token.clone();

tokio::spawn(async move {
    signal::ctrl_c().await.unwrap();
    shutdown_token.cancel();
});

loop {
    tokio::select! {
        Ok((stream, addr)) = listener.accept() => {
            let token = token.clone();
            tokio::spawn(async move {
                // Pass token into connection handler
                // Check token.is_cancelled() periodically
            });
        }
        _ = token.cancelled() => {
            println!("Shutting down");
            break;
        }
    }
}

Pass the cancellation token into each connection handler so that active connections can send a close frame before exiting.

Connection Limits

Track the number of active connections with an AtomicUsize or a semaphore. Reject new connections when the count exceeds your limit. This prevents resource exhaustion under load:

use tokio::sync::Semaphore;
use std::sync::Arc;

let max_connections = Arc::new(Semaphore::new(10_000));

while let Ok((stream, addr)) = listener.accept().await {
    let permit = match max_connections.clone().try_acquire_owned() {
        Ok(p) => p,
        Err(_) => {
            eprintln!("Connection limit reached, rejecting {}", addr);
            continue;
        }
    };

    tokio::spawn(async move {
        let _permit = permit; // Held until this task exits
        // Handle connection...
    });
}

Logging and Metrics

Use the tracing crate for structured logging. Assign each connection a span with its address so that all log messages from that connection are tagged automatically. For metrics, track connection count, messages per second, and error rates, and export them with a Prometheus endpoint or similar.

Alternatives

Tungstenite is not the only way to add WebSocket support to a Rust application. Depending on your architecture, one of these alternatives might be a better fit.

Axum

Axum is a web framework built on top of Tokio and Hyper. It has first-class WebSocket support through its extract::ws module. If you are already building an HTTP API with Axum, adding WebSocket endpoints is straightforward and you do not need tungstenite directly. Under the hood, Axum uses tungstenite for its WebSocket implementation.

Warp

Warp is a filter-based web framework that includes WebSocket support. Its composable filter model works well when you want to combine WebSocket routes with REST endpoints. Warp also uses tungstenite internally.

Actix-Web

Actix-Web has its own WebSocket implementation based on the actor model. If you are using the Actix ecosystem, its WebSocket support integrates naturally with actors and message passing. The API is different from tungstenite, so you would not mix them.

When to Choose Tungstenite Directly

Use tungstenite or tokio-tungstenite when you want a standalone WebSocket server without an HTTP framework, when you need fine-grained control over the connection lifecycle, or when you are building a client that connects to an external WebSocket service. If you already have a web framework in place, use its built-in WebSocket support instead.

FAQ

How do I send binary data with tungstenite?

Use Message::Binary instead of Message::Text. The binary variant takes a Vec<u8> as its payload. On the receiving end, match on Message::Binary(data) to get the raw bytes. This is useful for sending protocol buffers, images, or any non-text payload. The WebSocket protocol treats text and binary frames identically in terms of framing; the only difference is the opcode in the frame header.

let image_data: Vec<u8> = std::fs::read("photo.png").unwrap();
write.send(Message::Binary(image_data)).await.unwrap();

What is the maximum message size?

By default, tungstenite allows messages up to 64 MB and individual frames up to 16 MB. You can change these limits through WebSocketConfig. Setting them too high makes your server vulnerable to memory exhaustion attacks. Setting them too low will reject legitimate large payloads. Choose a limit based on your application’s actual needs. For a JSON-based API, 1 MB is usually generous. For file transfer, you might need 100 MB or more, but consider using chunked uploads over HTTP instead.

How do I authenticate WebSocket connections?

Authentication typically happens during the HTTP upgrade request, before the WebSocket handshake completes. On the client side, send a token in a header or query parameter. On the server side, inspect the initial HTTP request before accepting the WebSocket connection. With tokio-tungstenite, you can read the TCP stream first, parse the HTTP request headers manually, and then pass the stream to accept_async only if authentication succeeds. Alternatively, require the client to send an authentication message as the first WebSocket message after connecting, and close the connection if it does not arrive within a timeout.

Can I use tungstenite with other async runtimes?

Tungstenite itself is runtime-agnostic because the core crate is synchronous. The tokio-tungstenite wrapper is specific to Tokio. If you use async-std, there is a community crate called async-tungstenite that provides the same async API on top of async-std or other runtimes. The message types and error types are shared across all variants since they come from the base tungstenite crate.