tests.ws

GraphQL Subscriptions over WebSocket

graphql websocket subscriptions apollo real-time

GraphQL subscriptions enable real-time data streaming from server to client using WebSocket connections. Unlike queries and mutations that follow a request-response pattern, subscription graphql operations maintain persistent connections that push updates as events occur. This makes WebSocket the natural transport protocol for GraphQL subscriptions, providing full-duplex communication channels required for server-initiated data delivery.

The graphql websocket connection stays open throughout the subscription lifecycle, allowing servers to push new data whenever the subscribed event triggers. This architecture powers real-time features like live chat messages, stock price updates, collaborative editing, and activity feeds without polling overhead.

Why GraphQL Uses WebSocket for Subscriptions

GraphQL subscriptions require bidirectional communication where servers can initiate data transfer to clients. Traditional HTTP requests fail this requirement since they follow a strict request-response model. WebSocket protocol solves this by establishing persistent connections that support both client-to-server and server-to-client messaging.

When implementing apollo graphql subscriptions or any graphql over websockets solution, the connection lifecycle involves:

  1. Client initiates WebSocket handshake
  2. Protocol negotiation for graphql-ws subprotocol
  3. Connection initialization with authentication
  4. Subscription registration with GraphQL operation
  5. Server pushes data updates as events occur
  6. Connection cleanup on unsubscribe or disconnect

This differs from REST or standard GraphQL queries where each request creates a new HTTP connection. The persistent nature of graphql socket connections reduces latency and eliminates the need for polling or long-polling workarounds.

The graphql-ws Protocol

The graphql-ws library implements the modern WebSocket subprotocol for GraphQL subscriptions. It replaced the deprecated subscriptions-transport-ws library that used the older graphql-ws subprotocol (confusingly different despite similar naming). The new protocol specification provides better error handling, connection management, and TypeScript support.

The graphql ws protocol defines specific message types:

  • ConnectionInit: Client initiates connection with optional auth payload
  • ConnectionAck: Server acknowledges successful connection
  • Subscribe: Client registers a subscription operation
  • Next: Server sends subscription data updates
  • Error: Server reports operation errors
  • Complete: Subscription or connection termination

This standardized protocol ensures compatibility between different GraphQL server implementations and client libraries. Both Apollo Server and NestJS GraphQL modules support graphql-ws, making it the current standard for apollo subscription implementations.

Server Setup with graphql-ws

Setting up a GraphQL WebSocket server requires the graphql-ws package and a WebSocket server instance. Here’s a standalone implementation using Node.js WebSocket:

import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { createServer } from 'http';

// Define GraphQL schema
const typeDefs = `
  type Query {
    hello: String
  }

  type Subscription {
    messageAdded: Message
    counterUpdated: Int
  }

  type Message {
    id: ID!
    content: String!
    timestamp: String!
  }
`;

// Create async iterators for subscriptions
const resolvers = {
  Query: {
    hello: () => 'Hello World',
  },
  Subscription: {
    messageAdded: {
      subscribe: async function* () {
        let id = 0;
        while (true) {
          await new Promise(resolve => setTimeout(resolve, 2000));
          yield {
            messageAdded: {
              id: String(++id),
              content: `Message ${id}`,
              timestamp: new Date().toISOString(),
            },
          };
        }
      },
    },
    counterUpdated: {
      subscribe: async function* () {
        let count = 0;
        while (true) {
          await new Promise(resolve => setTimeout(resolve, 1000));
          yield { counterUpdated: ++count };
        }
      },
    },
  },
};

// Build executable schema
const schema = makeExecutableSchema({ typeDefs, resolvers });

// Create HTTP server
const httpServer = createServer((req, res) => {
  res.writeHead(404);
  res.end();
});

// Create WebSocket server
const wsServer = new WebSocketServer({
  server: httpServer,
  path: '/graphql',
});

// Integrate graphql-ws
useServer(
  {
    schema,
    onConnect: async (ctx) => {
      console.log('Client connected');
      return true;
    },
    onDisconnect: (ctx, code, reason) => {
      console.log('Client disconnected:', code, reason);
    },
    onSubscribe: (ctx, msg) => {
      console.log('Subscription started:', msg.payload.query);
    },
  },
  wsServer
);

httpServer.listen(4000, () => {
  console.log('GraphQL WebSocket server running on ws://localhost:4000/graphql');
});

This server implements subscribe graphql operations using async generator functions. The useServer function from graphql-ws handles protocol message parsing, subscription lifecycle management, and error propagation.

Apollo Server with Subscriptions

Apollo Server integrates graphql-ws for apollo client subscriptions support. Version 4+ requires explicit WebSocket server configuration:

import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import express from 'express';
import { createServer } from 'http';
import cors from 'cors';
import { PubSub } from 'graphql-subscriptions';

const pubsub = new PubSub();

const typeDefs = `
  type Query {
    posts: [Post!]!
  }

  type Mutation {
    createPost(title: String!, content: String!): Post!
  }

  type Subscription {
    postCreated: Post!
  }

  type Post {
    id: ID!
    title: String!
    content: String!
    createdAt: String!
  }
`;

const resolvers = {
  Query: {
    posts: () => [],
  },
  Mutation: {
    createPost: async (_: any, { title, content }: any) => {
      const post = {
        id: String(Date.now()),
        title,
        content,
        createdAt: new Date().toISOString(),
      };
      await pubsub.publish('POST_CREATED', { postCreated: post });
      return post;
    },
  },
  Subscription: {
    postCreated: {
      subscribe: () => pubsub.asyncIterator(['POST_CREATED']),
    },
  },
};

const schema = makeExecutableSchema({ typeDefs, resolvers });

const app = express();
const httpServer = createServer(app);

// WebSocket server for subscriptions
const wsServer = new WebSocketServer({
  server: httpServer,
  path: '/graphql',
});

const serverCleanup = useServer({ schema }, wsServer);

// Apollo Server instance
const apolloServer = new ApolloServer({
  schema,
  plugins: [
    ApolloServerPluginDrainHttpServer({ httpServer }),
    {
      async serverWillStart() {
        return {
          async drainServer() {
            await serverCleanup.dispose();
          },
        };
      },
    },
  ],
});

await apolloServer.start();

app.use(
  '/graphql',
  cors<cors.CorsRequest>(),
  express.json(),
  expressMiddleware(apolloServer)
);

httpServer.listen(4000, () => {
  console.log('Server running on http://localhost:4000/graphql');
  console.log('Subscriptions on ws://localhost:4000/graphql');
});

The PubSub class provides in-memory publish-subscribe functionality for triggering subscription updates. When mutations execute, they publish events that active subscriptions receive through async iterators.

Apollo Client Subscription Setup

Configuring apollo subscription support in React applications requires the graphql-ws client and split transport links:

import { ApolloClient, InMemoryCache, HttpLink, split } from '@apollo/client';
import { GraphQLWsLink } from '@apollo/client/link/subscriptions';
import { getMainDefinition } from '@apollo/client/utilities';
import { createClient } from 'graphql-ws';

// HTTP link for queries and mutations
const httpLink = new HttpLink({
  uri: 'http://localhost:4000/graphql',
});

// WebSocket link for subscriptions
const wsLink = new GraphQLWsLink(
  createClient({
    url: 'ws://localhost:4000/graphql',
    connectionParams: {
      authToken: localStorage.getItem('token'),
    },
    retryAttempts: 5,
    shouldRetry: () => true,
  })
);

// Split traffic between HTTP and WebSocket
const splitLink = split(
  ({ query }) => {
    const definition = getMainDefinition(query);
    return (
      definition.kind === 'OperationDefinition' &&
      definition.operation === 'subscription'
    );
  },
  wsLink,
  httpLink
);

export const client = new ApolloClient({
  link: splitLink,
  cache: new InMemoryCache(),
});

Using subscriptions in React components with the useSubscription hook:

import { useSubscription, gql, useMutation } from '@apollo/client';
import { useState } from 'react';

const POST_CREATED = gql`
  subscription OnPostCreated {
    postCreated {
      id
      title
      content
      createdAt
    }
  }
`;

const CREATE_POST = gql`
  mutation CreatePost($title: String!, $content: String!) {
    createPost(title: $title, content: $content) {
      id
      title
      content
      createdAt
    }
  }
`;

export function PostFeed() {
  const [posts, setPosts] = useState<any[]>([]);
  const [createPost] = useMutation(CREATE_POST);

  useSubscription(POST_CREATED, {
    onData: ({ data }) => {
      if (data.data?.postCreated) {
        setPosts(prev => [...prev, data.data.postCreated]);
      }
    },
    onError: (error) => {
      console.error('Subscription error:', error);
    },
  });

  const handleSubmit = async (e: React.FormEvent<HTMLFormElement>) => {
    e.preventDefault();
    const formData = new FormData(e.currentTarget);
    await createPost({
      variables: {
        title: formData.get('title'),
        content: formData.get('content'),
      },
    });
    e.currentTarget.reset();
  };

  return (
    <div>
      <form onSubmit={handleSubmit}>
        <input name="title" placeholder="Title" required />
        <textarea name="content" placeholder="Content" required />
        <button type="submit">Create Post</button>
      </form>

      <div>
        {posts.map(post => (
          <article key={post.id}>
            <h3>{post.title}</h3>
            <p>{post.content}</p>
            <time>{new Date(post.createdAt).toLocaleString()}</time>
          </article>
        ))}
      </div>
    </div>
  );
}

The useSubscription hook manages WebSocket connection lifecycle automatically, reconnecting on network failures and cleaning up when components unmount.

NestJS GraphQL Subscriptions

NestJS WebSocket integration provides decorator-based nestjs graphql subscription configuration:

// app.module.ts
import { Module } from '@nestjs/common';
import { GraphQLModule } from '@nestjs/graphql';
import { ApolloDriver, ApolloDriverConfig } from '@nestjs/apollo';
import { PostsModule } from './posts/posts.module';

@Module({
  imports: [
    GraphQLModule.forRoot<ApolloDriverConfig>({
      driver: ApolloDriver,
      autoSchemaFile: true,
      subscriptions: {
        'graphql-ws': true,
        'subscriptions-transport-ws': false,
      },
      installSubscriptionHandlers: true,
    }),
    PostsModule,
  ],
})
export class AppModule {}

Creating subscription resolvers with NestJS:

// posts/posts.resolver.ts
import { Resolver, Query, Mutation, Subscription, Args } from '@nestjs/graphql';
import { PubSub } from 'graphql-subscriptions';
import { Inject } from '@nestjs/common';
import { Post } from './post.model';

const pubSub = new PubSub();

@Resolver(() => Post)
export class PostsResolver {
  @Query(() => [Post])
  async posts() {
    return [];
  }

  @Mutation(() => Post)
  async createPost(
    @Args('title') title: string,
    @Args('content') content: string,
  ) {
    const post = {
      id: String(Date.now()),
      title,
      content,
      createdAt: new Date().toISOString(),
    };
    await pubSub.publish('postCreated', { postCreated: post });
    return post;
  }

  @Subscription(() => Post, {
    filter: (payload, variables) => {
      // Optional filtering logic
      return true;
    },
    resolve: (payload) => {
      return payload.postCreated;
    },
  })
  postCreated() {
    return pubSub.asyncIterator('postCreated');
  }

  @Subscription(() => Post, {
    filter: (payload, variables) => {
      return payload.post.authorId === variables.authorId;
    },
  })
  postsByAuthor(@Args('authorId') authorId: string) {
    return pubSub.asyncIterator('postCreated');
  }
}

Model definition:

// posts/post.model.ts
import { ObjectType, Field, ID } from '@nestjs/graphql';

@ObjectType()
export class Post {
  @Field(() => ID)
  id: string;

  @Field()
  title: string;

  @Field()
  content: string;

  @Field()
  createdAt: string;
}

The @Subscription decorator handles graphql-ws protocol details automatically. The filter option enables server-side filtering before sending updates to clients, reducing unnecessary data transfer.

Scaling with Redis PubSub

The in-memory PubSub class doesn’t scale across multiple server instances. For production graphql redis subscriptions, use Redis as a shared message broker:

import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';

const options = {
  host: process.env.REDIS_HOST || 'localhost',
  port: parseInt(process.env.REDIS_PORT || '6379'),
  retryStrategy: (times: number) => Math.min(times * 50, 2000),
};

const pubsub = new RedisPubSub({
  publisher: new Redis(options),
  subscriber: new Redis(options),
});

// Use in resolvers
const resolvers = {
  Mutation: {
    createPost: async (_: any, { title, content }: any) => {
      const post = {
        id: String(Date.now()),
        title,
        content,
        createdAt: new Date().toISOString(),
      };
      await pubsub.publish('POST_CREATED', { postCreated: post });
      return post;
    },
  },
  Subscription: {
    postCreated: {
      subscribe: () => pubsub.asyncIterator(['POST_CREATED']),
    },
  },
};

Redis PubSub enables horizontal scaling where multiple GraphQL server instances share subscription state. When any server publishes an event, all servers with active subscriptions receive the update and forward it to their connected clients.

For NestJS applications, inject Redis PubSub as a provider:

// pubsub.provider.ts
import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';

export const PUB_SUB = 'PUB_SUB';

export const pubSubProvider = {
  provide: PUB_SUB,
  useFactory: () => {
    return new RedisPubSub({
      publisher: new Redis({
        host: process.env.REDIS_HOST,
        port: parseInt(process.env.REDIS_PORT || '6379'),
      }),
      subscriber: new Redis({
        host: process.env.REDIS_HOST,
        port: parseInt(process.env.REDIS_PORT || '6379'),
      }),
    });
  },
};

// In resolver
import { Inject } from '@nestjs/common';

export class PostsResolver {
  constructor(@Inject(PUB_SUB) private pubSub: RedisPubSub) {}

  @Mutation(() => Post)
  async createPost(@Args('title') title: string) {
    const post = { id: '1', title, createdAt: new Date().toISOString() };
    await this.pubSub.publish('postCreated', { postCreated: post });
    return post;
  }

  @Subscription(() => Post)
  postCreated() {
    return this.pubSub.asyncIterator('postCreated');
  }
}

Testing GraphQL Subscriptions

Testing subscription graphql operations requires WebSocket client simulation. Using the graphql-ws client directly:

import { createClient } from 'graphql-ws';
import WebSocket from 'ws';

describe('GraphQL Subscriptions', () => {
  let client: any;

  beforeEach(() => {
    client = createClient({
      url: 'ws://localhost:4000/graphql',
      webSocketImpl: WebSocket,
    });
  });

  afterEach(() => {
    client.dispose();
  });

  it('should receive subscription updates', (done) => {
    const subscription = client.subscribe(
      {
        query: `
          subscription {
            postCreated {
              id
              title
            }
          }
        `,
      },
      {
        next: (data: any) => {
          expect(data.data.postCreated).toBeDefined();
          expect(data.data.postCreated.title).toBe('Test Post');
          done();
        },
        error: (error: any) => {
          done(error);
        },
        complete: () => {},
      }
    );

    // Trigger event that should emit subscription update
    setTimeout(() => {
      fetch('http://localhost:4000/graphql', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          query: `
            mutation {
              createPost(title: "Test Post", content: "Content") {
                id
              }
            }
          `,
        }),
      });
    }, 100);
  });

  it('should handle subscription errors', (done) => {
    const subscription = client.subscribe(
      {
        query: `
          subscription {
            invalidSubscription {
              id
            }
          }
        `,
      },
      {
        next: () => {},
        error: (error: any) => {
          expect(error).toBeDefined();
          done();
        },
        complete: () => {},
      }
    );
  });
});

For integration tests with Apollo Client:

import { ApolloClient, InMemoryCache, gql } from '@apollo/client';
import { GraphQLWsLink } from '@apollo/client/link/subscriptions';
import { createClient } from 'graphql-ws';
import WebSocket from 'ws';

describe('Apollo Client Subscriptions', () => {
  let client: ApolloClient<any>;

  beforeEach(() => {
    const wsLink = new GraphQLWsLink(
      createClient({
        url: 'ws://localhost:4000/graphql',
        webSocketImpl: WebSocket,
      })
    );

    client = new ApolloClient({
      link: wsLink,
      cache: new InMemoryCache(),
    });
  });

  it('should subscribe and receive updates', (done) => {
    const observable = client.subscribe({
      query: gql`
        subscription {
          postCreated {
            id
            title
          }
        }
      `,
    });

    const subscription = observable.subscribe({
      next: (result) => {
        expect(result.data.postCreated).toBeDefined();
        subscription.unsubscribe();
        done();
      },
      error: (error) => {
        done(error);
      },
    });

    // Trigger mutation
    setTimeout(() => {
      client.mutate({
        mutation: gql`
          mutation {
            createPost(title: "Test", content: "Content") {
              id
            }
          }
        `,
      });
    }, 100);
  });
});

Migration from subscriptions-transport-ws

The deprecated subscriptions-transport-ws library used a different protocol. Migrating to graphql-ws requires updating both server and client code.

Server migration:

// OLD: subscriptions-transport-ws
import { SubscriptionServer } from 'subscriptions-transport-ws';
import { execute, subscribe } from 'graphql';

SubscriptionServer.create(
  { schema, execute, subscribe },
  { server: httpServer, path: '/graphql' }
);

// NEW: graphql-ws
import { useServer } from 'graphql-ws/lib/use/ws';
import { WebSocketServer } from 'ws';

const wsServer = new WebSocketServer({
  server: httpServer,
  path: '/graphql',
});

useServer({ schema }, wsServer);

Client migration:

// OLD: subscriptions-transport-ws
import { WebSocketLink } from '@apollo/client/link/ws';
import { SubscriptionClient } from 'subscriptions-transport-ws';

const wsLink = new WebSocketLink(
  new SubscriptionClient('ws://localhost:4000/graphql', {
    reconnect: true,
  })
);

// NEW: graphql-ws
import { GraphQLWsLink } from '@apollo/client/link/subscriptions';
import { createClient } from 'graphql-ws';

const wsLink = new GraphQLWsLink(
  createClient({
    url: 'ws://localhost:4000/graphql',
    retryAttempts: 5,
    shouldRetry: () => true,
  })
);

Key differences in the new protocol:

  • Better error handling with standardized error messages
  • Improved connection lifecycle management
  • Native TypeScript support with full type definitions
  • Cleaner async iterator patterns for subscriptions
  • Active maintenance and regular updates

The transition is straightforward but requires coordinated deployment if you need to support both protocols temporarily. Configure servers to accept both protocols during migration:

import { useServer as useGraphQLWs } from 'graphql-ws/lib/use/ws';
import { SubscriptionServer } from 'subscriptions-transport-ws';
import { execute, subscribe } from 'graphql';

// Support both protocols
useGraphQLWs({ schema }, wsServer);

SubscriptionServer.create(
  { schema, execute, subscribe },
  { server: httpServer, path: '/graphql' }
);

Frequently Asked Questions

What is the difference between graphql-ws and subscriptions-transport-ws?

The graphql-ws library implements a newer WebSocket subprotocol that replaced subscriptions-transport-ws. The new protocol provides better error handling, improved connection lifecycle management, native TypeScript support, and active maintenance. The old subscriptions-transport-ws library is deprecated and no longer recommended for production use. Migration involves updating both server and client libraries to use the new graphql-ws protocol.

Can I use GraphQL subscriptions without WebSocket?

While graphql over websockets is the standard approach, alternatives exist for specific use cases. Server-Sent Events (SSE) provide unidirectional streaming for subscriptions but lack the bidirectional communication of WebSocket. Some implementations use long-polling as a fallback for restricted networks, though this adds significant overhead. WebSocket remains the recommended transport for GraphQL subscriptions due to its efficiency, browser support, and standardization in the GraphQL ecosystem.

How do I authenticate GraphQL WebSocket connections?

Authentication for graphql socket connections happens during the connection initialization phase. The client sends authentication credentials in the connectionParams when creating the WebSocket connection. The server validates these credentials in the onConnect callback before accepting the connection. This approach differs from HTTP-based authentication since WebSocket connections persist across multiple subscription operations. Token expiration requires connection termination and re-authentication with fresh tokens.

How do I handle subscription performance with many concurrent clients?

Optimizing graphql redis subscriptions for high concurrency involves several strategies. Use Redis PubSub to distribute subscription state across multiple server instances for horizontal scaling. Implement server-side filtering in subscription resolvers to reduce unnecessary data transfer to clients. Use batching and throttling for high-frequency updates to prevent overwhelming clients. Monitor WebSocket connection counts and implement connection limits per client to prevent resource exhaustion. Consider using GraphQL field-level subscriptions instead of full object subscriptions to minimize payload sizes.