tests.ws

Angular WebSocket Guide

angular websocket rxjs typescript real-time

Angular WebSocket integration enables real-time bidirectional communication between your application and servers. The framework’s reactive architecture, combined with RxJS observables, provides a powerful foundation for managing WebSocket connections. This guide demonstrates how to implement WebSocket functionality in Angular using the native RxJS webSocket() function and Socket.IO, covering service patterns, reconnection strategies, and testing approaches.

RxJS webSocket() Function

The RxJS library includes a built-in webSocket() function that creates an observable WebSocket connection. This function returns a WebSocketSubject that acts as both an observable and observer, allowing you to send and receive messages through a unified interface.

import { webSocket } from 'rxjs/webSocket';

const socket$ = webSocket('wss://api.example.com/socket');

socket$.subscribe({
  next: (message) => console.log('Received:', message),
  error: (error) => console.error('WebSocket error:', error),
  complete: () => console.log('Connection closed')
});

// Send messages
socket$.next({ type: 'subscribe', channel: 'updates' });

The webSocket() function accepts either a URL string or a configuration object with advanced options:

import { webSocket, WebSocketSubjectConfig } from 'rxjs/webSocket';

const config: WebSocketSubjectConfig<any> = {
  url: 'wss://api.example.com/socket',
  protocol: 'v1.socket.protocol',
  openObserver: {
    next: () => console.log('Connection established')
  },
  closeObserver: {
    next: () => console.log('Connection closed')
  },
  serializer: (value) => JSON.stringify(value),
  deserializer: (event) => JSON.parse(event.data)
};

const socket$ = webSocket(config);

Learn more about the underlying technology in our WebSocket fundamentals guide.

Creating a WebSocket Service

Building an Angular WebSocket service centralizes connection management and provides a reusable interface for components. This pattern ensures proper lifecycle handling and enables dependency injection across your application.

// websocket.service.ts
import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Observable, Subject, timer } from 'rxjs';
import { retryWhen, tap, delayWhen } from 'rxjs/operators';

export interface WebSocketMessage {
  type: string;
  payload: any;
  timestamp?: number;
}

@Injectable({
  providedIn: 'root'
})
export class WebSocketService {
  private socket$: WebSocketSubject<WebSocketMessage> | null = null;
  private messagesSubject$ = new Subject<WebSocketMessage>();
  public messages$ = this.messagesSubject$.asObservable();

  private readonly WS_ENDPOINT = 'wss://api.example.com/socket';
  private reconnectAttempts = 0;
  private readonly MAX_RECONNECT_ATTEMPTS = 5;

  constructor() {}

  public connect(): void {
    if (this.socket$) {
      return;
    }

    this.socket$ = webSocket({
      url: this.WS_ENDPOINT,
      openObserver: {
        next: () => {
          console.log('WebSocket connected');
          this.reconnectAttempts = 0;
        }
      },
      closeObserver: {
        next: () => {
          console.log('WebSocket disconnected');
          this.socket$ = null;
        }
      }
    });

    this.socket$
      .pipe(
        retryWhen(errors =>
          errors.pipe(
            tap(error => {
              this.reconnectAttempts++;
              console.error('WebSocket error:', error);
            }),
            delayWhen(() => {
              const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
              return timer(delay);
            })
          )
        )
      )
      .subscribe({
        next: (message) => this.messagesSubject$.next(message),
        error: (error) => console.error('Unrecoverable WebSocket error:', error)
      });
  }

  public send(message: WebSocketMessage): void {
    if (this.socket$) {
      this.socket$.next(message);
    } else {
      console.error('WebSocket not connected');
    }
  }

  public disconnect(): void {
    if (this.socket$) {
      this.socket$.complete();
      this.socket$ = null;
    }
  }

  public onMessage(type: string): Observable<WebSocketMessage> {
    return new Observable(observer => {
      const subscription = this.messages$.subscribe({
        next: (message) => {
          if (message.type === type) {
            observer.next(message);
          }
        },
        error: (error) => observer.error(error),
        complete: () => observer.complete()
      });

      return () => subscription.unsubscribe();
    });
  }
}

This service provides methods to connect, send messages, disconnect, and filter incoming messages by type. The implementation handles connection lifecycle and prepares for reconnection logic.

Connecting Components

Components consume the WebSocket service through dependency injection and subscribe to message streams. Proper cleanup in the ngOnDestroy lifecycle hook prevents memory leaks.

// chat.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { WebSocketService, WebSocketMessage } from './websocket.service';
import { Subscription } from 'rxjs';

@Component({
  selector: 'app-chat',
  template: `
    <div class="chat-container">
      <div class="messages">
        <div *ngFor="let msg of messages" class="message">
          <span class="user">{{ msg.payload.user }}:</span>
          <span class="text">{{ msg.payload.text }}</span>
        </div>
      </div>
      <div class="input-area">
        <input
          [(ngModel)]="messageText"
          (keyup.enter)="sendMessage()"
          placeholder="Type a message..."
        />
        <button (click)="sendMessage()">Send</button>
      </div>
    </div>
  `
})
export class ChatComponent implements OnInit, OnDestroy {
  messages: WebSocketMessage[] = [];
  messageText = '';
  private subscription: Subscription = new Subscription();

  constructor(private wsService: WebSocketService) {}

  ngOnInit(): void {
    this.wsService.connect();

    const chatSub = this.wsService.onMessage('chat')
      .subscribe({
        next: (message) => {
          this.messages.push(message);
        },
        error: (error) => console.error('Chat error:', error)
      });

    this.subscription.add(chatSub);
  }

  sendMessage(): void {
    if (this.messageText.trim()) {
      this.wsService.send({
        type: 'chat',
        payload: {
          user: 'CurrentUser',
          text: this.messageText
        },
        timestamp: Date.now()
      });
      this.messageText = '';
    }
  }

  ngOnDestroy(): void {
    this.subscription.unsubscribe();
  }
}

For multiple components sharing the same connection, the service singleton pattern ensures efficient resource usage while each component maintains its own message subscriptions.

Reconnection with RxJS Operators

Implementing robust reconnection logic requires combining RxJS operators to handle errors, delays, and exponential backoff. The retry and delayWhen operators provide fine-grained control over reconnection behavior.

import { webSocket } from 'rxjs/webSocket';
import { retryWhen, delay, tap, take } from 'rxjs/operators';
import { timer, throwError } from 'rxjs';

export class ReconnectingWebSocketService {
  private socket$: WebSocketSubject<any> | null = null;
  private reconnectInterval = 1000;
  private maxReconnectAttempts = 10;

  connect(): Observable<any> {
    return webSocket({
      url: 'wss://api.example.com/socket',
      openObserver: {
        next: () => {
          console.log('Connected successfully');
          this.reconnectInterval = 1000; // Reset on successful connection
        }
      }
    }).pipe(
      retryWhen(errors =>
        errors.pipe(
          tap(error => console.error('Connection error:', error)),
          delayWhen((_, index) => {
            const attempt = index + 1;

            if (attempt > this.maxReconnectAttempts) {
              return throwError(() => new Error('Max reconnection attempts reached'));
            }

            // Exponential backoff: 1s, 2s, 4s, 8s, up to 30s
            const delayMs = Math.min(
              this.reconnectInterval * Math.pow(2, index),
              30000
            );

            console.log(`Reconnecting in ${delayMs}ms (attempt ${attempt})`);
            return timer(delayMs);
          })
        )
      )
    );
  }
}

For production environments, consider adding connection health checks:

import { interval, EMPTY } from 'rxjs';
import { switchMap } from 'rxjs/operators';

export class HealthCheckWebSocketService extends WebSocketService {
  private pingInterval = 30000; // 30 seconds

  connect(): void {
    super.connect();
    this.startHealthCheck();
  }

  private startHealthCheck(): void {
    interval(this.pingInterval)
      .pipe(
        switchMap(() => {
          if (this.socket$) {
            this.send({ type: 'ping', payload: {} });
            return EMPTY;
          }
          return EMPTY;
        })
      )
      .subscribe();
  }
}

Compare this approach with traditional implementations in our JavaScript WebSocket guide.

Socket.IO with Angular

Socket.IO provides enhanced features including automatic reconnection, fallback transport options, and room-based messaging. Integrate Socket.IO in Angular using the official client library.

Install the Socket.IO client:

npm install socket.io-client
npm install --save-dev @types/socket.io-client

Create a Socket.IO service:

// socket-io.service.ts
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { io, Socket } from 'socket.io-client';

@Injectable({
  providedIn: 'root'
})
export class SocketIoService {
  private socket: Socket | null = null;

  constructor() {}

  connect(url: string, options?: any): void {
    this.socket = io(url, {
      transports: ['websocket', 'polling'],
      reconnection: true,
      reconnectionDelay: 1000,
      reconnectionDelayMax: 5000,
      reconnectionAttempts: 5,
      ...options
    });

    this.socket.on('connect', () => {
      console.log('Socket.IO connected:', this.socket?.id);
    });

    this.socket.on('disconnect', (reason) => {
      console.log('Socket.IO disconnected:', reason);
    });

    this.socket.on('connect_error', (error) => {
      console.error('Socket.IO connection error:', error);
    });
  }

  emit(eventName: string, data: any): void {
    if (this.socket) {
      this.socket.emit(eventName, data);
    }
  }

  on(eventName: string): Observable<any> {
    return new Observable(observer => {
      if (!this.socket) {
        observer.error('Socket not initialized');
        return;
      }

      this.socket.on(eventName, (data: any) => {
        observer.next(data);
      });

      return () => {
        if (this.socket) {
          this.socket.off(eventName);
        }
      };
    });
  }

  disconnect(): void {
    if (this.socket) {
      this.socket.disconnect();
      this.socket = null;
    }
  }

  joinRoom(room: string): void {
    this.emit('join', { room });
  }

  leaveRoom(room: string): void {
    this.emit('leave', { room });
  }
}

Using Socket.IO in components:

import { Component, OnInit, OnDestroy } from '@angular/core';
import { SocketIoService } from './socket-io.service';
import { Subscription } from 'rxjs';

@Component({
  selector: 'app-notifications',
  template: `
    <div class="notifications">
      <div *ngFor="let notification of notifications" class="notification">
        {{ notification.message }}
      </div>
    </div>
  `
})
export class NotificationsComponent implements OnInit, OnDestroy {
  notifications: any[] = [];
  private subscription = new Subscription();

  constructor(private socketService: SocketIoService) {}

  ngOnInit(): void {
    this.socketService.connect('https://api.example.com');

    const notificationSub = this.socketService.on('notification')
      .subscribe({
        next: (data) => {
          this.notifications.unshift(data);
        }
      });

    this.subscription.add(notificationSub);
    this.socketService.joinRoom('user-notifications');
  }

  ngOnDestroy(): void {
    this.socketService.leaveRoom('user-notifications');
    this.subscription.unsubscribe();
    this.socketService.disconnect();
  }
}

Explore more Socket.IO patterns in our dedicated Socket.IO guide.

Testing WebSocket Services

Testing WebSocket services in Angular requires mocking the WebSocket connection and simulating message flows. Use Jasmine or Jest with RxJS testing utilities.

// websocket.service.spec.ts
import { TestBed } from '@angular/core/testing';
import { WebSocketService } from './websocket.service';
import { WebSocketSubject } from 'rxjs/webSocket';
import { Subject } from 'rxjs';

describe('WebSocketService', () => {
  let service: WebSocketService;
  let mockWebSocket: Subject<any>;

  beforeEach(() => {
    mockWebSocket = new Subject();

    TestBed.configureTestingModule({
      providers: [WebSocketService]
    });

    service = TestBed.inject(WebSocketService);

    // Mock the webSocket function
    spyOn<any>(service as any, 'createWebSocket').and.returnValue(mockWebSocket);
  });

  it('should connect and receive messages', (done) => {
    service.connect();

    service.messages$.subscribe(message => {
      expect(message.type).toBe('test');
      expect(message.payload).toBe('hello');
      done();
    });

    mockWebSocket.next({ type: 'test', payload: 'hello' });
  });

  it('should send messages through the socket', () => {
    const nextSpy = spyOn(mockWebSocket, 'next');
    service.connect();

    const testMessage = { type: 'test', payload: 'data' };
    service.send(testMessage);

    expect(nextSpy).toHaveBeenCalledWith(testMessage);
  });

  it('should filter messages by type', (done) => {
    service.connect();

    service.onMessage('chat').subscribe(message => {
      expect(message.type).toBe('chat');
      done();
    });

    mockWebSocket.next({ type: 'system', payload: 'ignore' });
    mockWebSocket.next({ type: 'chat', payload: 'show this' });
  });

  it('should handle disconnection', () => {
    service.connect();
    const completeSpy = spyOn(mockWebSocket, 'complete');

    service.disconnect();

    expect(completeSpy).toHaveBeenCalled();
  });
});

Testing Socket.IO services:

// socket-io.service.spec.ts
import { TestBed } from '@angular/core/testing';
import { SocketIoService } from './socket-io.service';

describe('SocketIoService', () => {
  let service: SocketIoService;
  let mockSocket: any;

  beforeEach(() => {
    mockSocket = {
      on: jasmine.createSpy('on'),
      emit: jasmine.createSpy('emit'),
      disconnect: jasmine.createSpy('disconnect'),
      off: jasmine.createSpy('off')
    };

    TestBed.configureTestingModule({
      providers: [SocketIoService]
    });

    service = TestBed.inject(SocketIoService);
    (service as any).socket = mockSocket;
  });

  it('should emit events', () => {
    service.emit('test-event', { data: 'value' });

    expect(mockSocket.emit).toHaveBeenCalledWith('test-event', { data: 'value' });
  });

  it('should create observable for events', (done) => {
    let eventCallback: Function;

    mockSocket.on.and.callFake((event: string, callback: Function) => {
      eventCallback = callback;
    });

    service.on('test-event').subscribe(data => {
      expect(data).toBe('test-data');
      done();
    });

    eventCallback('test-data');
  });
});

Learn comprehensive testing strategies in our WebSocket testing guide.

Production Tips

Deploying Angular WebSocket applications requires consideration of performance, security, and reliability.

Environment Configuration: Use Angular environment files to manage WebSocket endpoints:

// environment.prod.ts
export const environment = {
  production: true,
  wsEndpoint: 'wss://api.production.com/socket'
};

// environment.ts
export const environment = {
  production: false,
  wsEndpoint: 'ws://localhost:8080/socket'
};

// In service
import { environment } from '../environments/environment';

@Injectable({ providedIn: 'root' })
export class WebSocketService {
  private readonly WS_ENDPOINT = environment.wsEndpoint;
}

Connection Pooling: Reuse connections across components:

@Injectable({ providedIn: 'root' })
export class WebSocketConnectionPool {
  private connections = new Map<string, WebSocketSubject<any>>();

  getConnection(url: string): WebSocketSubject<any> {
    if (!this.connections.has(url)) {
      this.connections.set(url, webSocket(url));
    }
    return this.connections.get(url)!;
  }

  closeConnection(url: string): void {
    const connection = this.connections.get(url);
    if (connection) {
      connection.complete();
      this.connections.delete(url);
    }
  }
}

Authentication: Include tokens in WebSocket connections:

connect(token: string): void {
  this.socket$ = webSocket({
    url: `${this.WS_ENDPOINT}?token=${token}`,
    protocol: ['bearer', token]
  });
}

Message Queuing: Queue messages when disconnected:

private messageQueue: WebSocketMessage[] = [];
private isConnected = false;

send(message: WebSocketMessage): void {
  if (this.isConnected && this.socket$) {
    this.socket$.next(message);
  } else {
    this.messageQueue.push(message);
  }
}

private flushQueue(): void {
  while (this.messageQueue.length > 0) {
    const message = this.messageQueue.shift();
    if (message && this.socket$) {
      this.socket$.next(message);
    }
  }
}

Memory Management: Implement proper cleanup:

@Component({
  selector: 'app-root'
})
export class AppComponent implements OnDestroy {
  private destroy$ = new Subject<void>();

  ngOnInit(): void {
    this.wsService.messages$
      .pipe(takeUntil(this.destroy$))
      .subscribe(message => this.handleMessage(message));
  }

  ngOnDestroy(): void {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

Frequently Asked Questions

How do I handle authentication with Angular WebSocket connections?

Pass authentication tokens through WebSocket connection parameters or headers. For initial connection, include the token in the URL query string or as a protocol parameter. After connection, send an authentication message as the first event. Store tokens securely using Angular’s HTTP interceptors and include them when establishing WebSocket connections:

const token = this.authService.getToken();
this.socket$ = webSocket({
  url: `wss://api.example.com/socket?token=${token}`,
  openObserver: {
    next: () => {
      this.send({ type: 'auth', payload: { token } });
    }
  }
});

What is the difference between RxJS webSocket and Socket.IO for Angular?

RxJS webSocket provides a lightweight observable wrapper around native WebSocket APIs, integrating seamlessly with Angular’s reactive patterns. Socket.IO offers additional features including automatic reconnection, transport fallbacks (long-polling when WebSocket is unavailable), room-based messaging, and broader browser compatibility. Choose RxJS webSocket for simple real-time communication with full control, and Socket.IO when you need robust fallback mechanisms and advanced features.

How do I prevent memory leaks in Angular WebSocket applications?

Always unsubscribe from WebSocket observables in the ngOnDestroy lifecycle hook. Use the Subscription object to collect all subscriptions and unsubscribe them together, or leverage RxJS operators like takeUntil with a destroy subject. Ensure WebSocket connections are closed when components are destroyed. Implement proper cleanup in services that maintain persistent connections across multiple components.

Can I use Angular WebSocket with server-sent events (SSE)?

While WebSocket provides bidirectional communication, server-sent events offer unidirectional server-to-client streaming over HTTP. Angular can consume SSE using the EventSource API wrapped in an observable. For bidirectional communication, WebSocket is the appropriate choice. For scenarios where only server-to-client messages are needed, SSE provides a simpler alternative with automatic reconnection. Evaluate your communication requirements to choose between WebSocket, SSE, or HTTP polling.