Tasuke Hubのロゴ

ITを中心に困っている人を助けるメディア

分かりやすく解決策を提供することで、あなたの困ったをサポート。 全ての人々がスムーズに生活できる世界を目指します。

TypeScriptとNode.jsでリアルタイムAPI設計完全ガイド:WebSocketから始めるスケーラブルなアプリケーション構築法

記事のサムネイル

リアルタイムAPIが必要になる場面と基本設計思想

リアルタイムAPIは、従来のHTTPリクエスト・レスポンスモデルでは対応が困難な場面で威力を発揮します。

代表的な使用場面

チャットアプリケーション、ライブコメント機能、株価や為替のリアルタイム更新、オンラインゲーム、コラボレーションツール(GoogleDocsのような同時編集)などで必須の技術です。

設計思想の基本原則

// リアルタイムAPIの基本概念
interface RealtimeAPIDesign {
  // 1. 双方向通信
  bidirectional: boolean;
  
  // 2. 低遅延
  latency: 'low' | 'medium' | 'high';
  
  // 3. 状態管理
  stateManagement: 'stateful' | 'stateless';
  
  // 4. スケーラビリティ
  scalability: {
    horizontal: boolean;
    vertical: boolean;
  };
}

従来のREST APIとの最大の違いは、サーバーからクライアントへの能動的な通信が可能な点です。これにより、ポーリングによる無駄な通信を削減し、よりリアルタイムなユーザー体験を実現できます。

TypeScriptとNode.jsでWebSocket通信の基礎実装

WebSocketを使ったリアルタイム通信の基本実装から始めましょう。

サーバーサイドの基本実装

// server.ts
import { WebSocket, WebSocketServer } from 'ws';
import { IncomingMessage } from 'http';

interface ExtendedWebSocket extends WebSocket {
  id: string;
  userId?: string;
}

class WebSocketManager {
  private wss: WebSocketServer;
  private clients: Map<string, ExtendedWebSocket> = new Map();

  constructor(port: number) {
    this.wss = new WebSocketServer({ port });
    this.initialize();
  }

  private initialize(): void {
    this.wss.on('connection', (ws: ExtendedWebSocket, req: IncomingMessage) => {
      // 一意のIDを割り当て
      ws.id = this.generateId();
      this.clients.set(ws.id, ws);

      console.log(`新しい接続: ${ws.id}`);

      ws.on('message', (data: Buffer) => {
        this.handleMessage(ws, data.toString());
      });

      ws.on('close', () => {
        this.clients.delete(ws.id);
        console.log(`接続終了: ${ws.id}`);
      });
    });
  }

  private handleMessage(ws: ExtendedWebSocket, message: string): void {
    try {
      const data = JSON.parse(message);
      this.broadcastToAll(data, ws.id);
    } catch (error) {
      ws.send(JSON.stringify({ error: 'Invalid JSON' }));
    }
  }

  private broadcastToAll(data: any, senderId: string): void {
    this.clients.forEach((client, id) => {
      if (id !== senderId && client.readyState === WebSocket.OPEN) {
        client.send(JSON.stringify({ ...data, from: senderId }));
      }
    });
  }

  private generateId(): string {
    return Math.random().toString(36).substring(2, 15);
  }
}

// サーバー起動
const wsManager = new WebSocketManager(8080);
console.log('WebSocketサーバーがポート8080で起動しました');

クライアントサイドの実装

// client.ts
class RealtimeClient {
  private ws: WebSocket | null = null;
  private reconnectInterval = 1000;
  private maxReconnectAttempts = 5;
  private reconnectAttempts = 0;

  constructor(private url: string) {
    this.connect();
  }

  private connect(): void {
    try {
      this.ws = new WebSocket(this.url);
      this.setupEventListeners();
    } catch (error) {
      console.error('接続エラー:', error);
      this.handleReconnect();
    }
  }

  private setupEventListeners(): void {
    if (!this.ws) return;

    this.ws.onopen = () => {
      console.log('WebSocket接続が確立されました');
      this.reconnectAttempts = 0;
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.handleMessage(data);
    };

    this.ws.onclose = () => {
      console.log('WebSocket接続が閉じられました');
      this.handleReconnect();
    };

    this.ws.onerror = (error) => {
      console.error('WebSocketエラー:', error);
    };
  }

  private handleMessage(data: any): void {
    // メッセージ処理のロジック
    console.log('受信:', data);
  }

  private handleReconnect(): void {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      setTimeout(() => {
        console.log(`再接続試行 ${this.reconnectAttempts}/${this.maxReconnectAttempts}`);
        this.connect();
      }, this.reconnectInterval);
    }
  }

  public send(data: any): void {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(data));
    } else {
      console.warn('WebSocket接続が確立されていません');
    }
  }
}

// 使用例
const client = new RealtimeClient('ws://localhost:8080');
client.send({ type: 'message', content: 'Hello, WebSocket!' });

この基本実装では、接続管理、メッセージ送信、エラーハンドリング、自動再接続機能を含めています。次のセクションでより高度な機能を実装していきます。

Socket.ioを活用した高度なリアルタイム機能の構築

Socket.ioは生のWebSocketよりも高機能で、実践的なリアルタイムアプリケーション開発に適しています。

Socket.ioサーバーの実装

// socket-server.ts
import { Server } from 'socket.io';
import { createServer } from 'http';

interface UserData {
  id: string;
  username: string;
  room?: string;
}

class SocketIOManager {
  private io: Server;
  private users: Map<string, UserData> = new Map();

  constructor(port: number) {
    const httpServer = createServer();
    this.io = new Server(httpServer, {
      cors: { origin: "*" },
      pingTimeout: 60000,
      pingInterval: 25000
    });
    
    this.setupEventHandlers();
    httpServer.listen(port);
    console.log(`Socket.IOサーバーがポート${port}で起動しました`);
  }

  private setupEventHandlers(): void {
    this.io.on('connection', (socket) => {
      console.log('新しい接続:', socket.id);

      // ユーザー登録
      socket.on('register', (userData: Omit<UserData, 'id'>) => {
        this.users.set(socket.id, { ...userData, id: socket.id });
        socket.emit('registered', { id: socket.id, ...userData });
      });

      // ルーム参加
      socket.on('join-room', (roomName: string) => {
        const user = this.users.get(socket.id);
        if (user) {
          socket.join(roomName);
          user.room = roomName;
          socket.to(roomName).emit('user-joined', user);
        }
      });

      // メッセージ送信
      socket.on('send-message', (data: { message: string; timestamp: number }) => {
        const user = this.users.get(socket.id);
        if (user?.room) {
          this.io.to(user.room).emit('message', {
            ...data,
            from: user,
            messageId: this.generateMessageId()
          });
        }
      });

      // プライベートメッセージ
      socket.on('private-message', (data: { to: string; message: string }) => {
        const sender = this.users.get(socket.id);
        const recipient = this.users.get(data.to);
        
        if (sender && recipient) {
          socket.to(data.to).emit('private-message', {
            message: data.message,
            from: sender,
            timestamp: Date.now()
          });
        }
      });

      // 接続終了
      socket.on('disconnect', () => {
        const user = this.users.get(socket.id);
        if (user?.room) {
          socket.to(user.room).emit('user-left', user);
        }
        this.users.delete(socket.id);
        console.log('接続終了:', socket.id);
      });
    });
  }

  private generateMessageId(): string {
    return `msg_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
  }
}

new SocketIOManager(3000);

型安全なクライアント実装

// socket-client.ts
import { io, Socket } from 'socket.io-client';

interface ServerToClientEvents {
  'registered': (user: UserData) => void;
  'user-joined': (user: UserData) => void;
  'user-left': (user: UserData) => void;
  'message': (message: MessageData) => void;
  'private-message': (message: PrivateMessageData) => void;
}

interface ClientToServerEvents {
  'register': (userData: Omit<UserData, 'id'>) => void;
  'join-room': (roomName: string) => void;
  'send-message': (data: { message: string; timestamp: number }) => void;
  'private-message': (data: { to: string; message: string }) => void;
}

class RealtimeChatClient {
  private socket: Socket<ServerToClientEvents, ClientToServerEvents>;
  private currentUser: UserData | null = null;

  constructor(serverUrl: string) {
    this.socket = io(serverUrl, {
      autoConnect: false,
      timeout: 20000,
      retries: 3
    });
    
    this.setupEventListeners();
  }

  private setupEventListeners(): void {
    this.socket.on('connect', () => {
      console.log('サーバーに接続しました');
    });

    this.socket.on('registered', (user) => {
      this.currentUser = user;
      console.log('ユーザー登録完了:', user);
    });

    this.socket.on('message', (message) => {
      this.handleMessage(message);
    });

    this.socket.on('private-message', (message) => {
      this.handlePrivateMessage(message);
    });

    this.socket.on('disconnect', (reason) => {
      console.log('接続が切断されました:', reason);
    });
  }

  public connect(username: string): Promise<UserData> {
    return new Promise((resolve, reject) => {
      this.socket.connect();
      
      this.socket.once('registered', (user) => {
        resolve(user);
      });

      this.socket.once('connect_error', (error) => {
        reject(error);
      });

      this.socket.emit('register', { username });
    });
  }

  public joinRoom(roomName: string): void {
    this.socket.emit('join-room', roomName);
  }

  public sendMessage(message: string): void {
    this.socket.emit('send-message', {
      message,
      timestamp: Date.now()
    });
  }

  public sendPrivateMessage(to: string, message: string): void {
    this.socket.emit('private-message', { to, message });
  }

  private handleMessage(message: MessageData): void {
    console.log('メッセージ受信:', message);
    // UIの更新ロジック
  }

  private handlePrivateMessage(message: PrivateMessageData): void {
    console.log('プライベートメッセージ受信:', message);
    // プライベートメッセージのUI更新
  }
}

// 使用例
const client = new RealtimeChatClient('http://localhost:3000');

client.connect('ユーザー名')
  .then(user => {
    console.log('接続成功:', user);
    client.joinRoom('general');
    client.sendMessage('Hello, everyone!');
  })
  .catch(error => {
    console.error('接続失敗:', error);
  });

Socket.ioを使うことで、ルーム機能、自動再接続、型安全な通信が簡単に実装できます。

スケーラブルなリアルタイムアーキテクチャの設計パターン

大規模なリアルタイムアプリケーションでは、単一サーバーでは限界があります。効果的なスケーリング戦略が必要です。

Redisアダプターを使った水平スケーリング

// scaled-socket-server.ts
import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import { createServer } from 'http';

class ScalableSocketIOManager {
  private io: Server;
  private redisClient: any;

  constructor(port: number, redisUrl: string) {
    const httpServer = createServer();
    this.io = new Server(httpServer, {
      cors: { origin: "*" },
      adapter: this.setupRedisAdapter(redisUrl)
    });

    this.setupEventHandlers();
    httpServer.listen(port);
    console.log(`スケーラブルSocket.IOサーバーがポート${port}で起動しました`);
  }

  private setupRedisAdapter(redisUrl: string) {
    const pubClient = createClient({ url: redisUrl });
    const subClient = pubClient.duplicate();

    Promise.all([pubClient.connect(), subClient.connect()]).then(() => {
      console.log('Redis接続完了');
    });

    return createAdapter(pubClient, subClient);
  }

  private setupEventHandlers(): void {
    this.io.on('connection', (socket) => {
      // イベントハンドリングは同様
      socket.on('broadcast-to-all', (data) => {
        // 全サーバーインスタンス間でブロードキャスト
        this.io.emit('global-message', data);
      });

      socket.on('join-global-room', (roomName) => {
        socket.join(roomName);
        // Redis経由で他のサーバーにも通知
        this.io.to(roomName).emit('user-joined-global', {
          socketId: socket.id,
          serverId: process.env.SERVER_ID || 'unknown'
        });
      });
    });
  }
}

new ScalableSocketIOManager(
  parseInt(process.env.PORT || '3000'),
  process.env.REDIS_URL || 'redis://localhost:6379'
);

ロードバランサー設定例(Nginx)

# nginx.conf
upstream websocket_backend {
    ip_hash; # セッション継続性のため
    server localhost:3000;
    server localhost:3001;
    server localhost:3002;
}

server {
    listen 80;
    server_name your-domain.com;

    location /socket.io/ {
        proxy_pass http://websocket_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # WebSocket用のタイムアウト設定
        proxy_read_timeout 86400;
        proxy_send_timeout 86400;
    }
}

マイクロサービス向けイベント駆動アーキテクチャ

// event-driven-realtime-service.ts
import { EventEmitter } from 'events';
import { Server } from 'socket.io';

interface RealtimeEvent {
  type: string;
  payload: any;
  metadata: {
    timestamp: number;
    sourceService: string;
    targetRoom?: string;
  };
}

class RealtimeEventBus extends EventEmitter {
  private io: Server;
  private serviceId: string;

  constructor(io: Server, serviceId: string) {
    super();
    this.io = io;
    this.serviceId = serviceId;
    this.setupInternalEventHandlers();
  }

  private setupInternalEventHandlers(): void {
    // 他のマイクロサービスからのイベントを受信
    this.on('external-event', (event: RealtimeEvent) => {
      this.broadcastEvent(event);
    });

    // 内部イベントの処理
    this.on('internal-event', (event: RealtimeEvent) => {
      this.processInternalEvent(event);
    });
  }

  public emitToRoom(room: string, eventType: string, data: any): void {
    const event: RealtimeEvent = {
      type: eventType,
      payload: data,
      metadata: {
        timestamp: Date.now(),
        sourceService: this.serviceId,
        targetRoom: room
      }
    };

    this.io.to(room).emit(eventType, event);
    this.emit('event-sent', event);
  }

  public emitToAll(eventType: string, data: any): void {
    const event: RealtimeEvent = {
      type: eventType,
      payload: data,
      metadata: {
        timestamp: Date.now(),
        sourceService: this.serviceId
      }
    };

    this.io.emit(eventType, event);
    this.emit('event-sent', event);
  }

  private broadcastEvent(event: RealtimeEvent): void {
    if (event.metadata.targetRoom) {
      this.io.to(event.metadata.targetRoom).emit(event.type, event);
    } else {
      this.io.emit(event.type, event);
    }
  }

  private processInternalEvent(event: RealtimeEvent): void {
    console.log(`[${this.serviceId}] 内部イベント処理:`, event.type);
    // ビジネスロジックの実装
  }
}

// マイクロサービス統合例
class MicroserviceRealtimeManager {
  private eventBus: RealtimeEventBus;
  private httpClient: any; // HTTPクライアント(axiosなど)

  constructor(io: Server, serviceId: string) {
    this.eventBus = new RealtimeEventBus(io, serviceId);
    this.setupCrossServiceCommunication();
  }

  private setupCrossServiceCommunication(): void {
    // 他のサービスからのWebhookを受信
    this.eventBus.on('webhook-received', async (data) => {
      await this.notifyOtherServices(data);
    });
  }

  private async notifyOtherServices(data: any): Promise<void> {
    // 他のマイクロサービスへHTTP通知
    const services = ['user-service', 'notification-service', 'analytics-service'];
    
    for (const service of services) {
      try {
        await this.httpClient.post(`http://${service}/realtime-event`, data);
      } catch (error) {
        console.error(`${service}への通知失敗:`, error);
      }
    }
  }

  public handleUserAction(userId: string, action: string, data: any): void {
    // ユーザーアクションを他のユーザーにリアルタイム通知
    this.eventBus.emitToRoom(`user-${userId}`, 'user-action', {
      userId,
      action,
      data,
      timestamp: Date.now()
    });

    // 分析サービスにも送信
    this.eventBus.emit('internal-event', {
      type: 'user-action-analytics',
      payload: { userId, action, data },
      metadata: {
        timestamp: Date.now(),
        sourceService: 'realtime-service'
      }
    });
  }
}

このようなアーキテクチャにより、複数サーバーインスタンス間での一貫したリアルタイム通信と、マイクロサービス間の効率的な連携が実現できます。

エラーハンドリングと接続管理のベストプラクティス

堅牢なリアルタイムアプリケーションには、適切なエラーハンドリングと接続管理が不可欠です。

包括的なエラーハンドリング実装

// robust-error-handling.ts
enum ErrorType {
  CONNECTION_FAILED = 'CONNECTION_FAILED',
  MESSAGE_PARSING_ERROR = 'MESSAGE_PARSING_ERROR',
  AUTHENTICATION_ERROR = 'AUTHENTICATION_ERROR',
  RATE_LIMIT_EXCEEDED = 'RATE_LIMIT_EXCEEDED',
  SERVER_ERROR = 'SERVER_ERROR'
}

interface RealtimeError {
  type: ErrorType;
  message: string;
  details?: any;
  timestamp: number;
}

class RobustRealtimeClient {
  private socket: Socket | null = null;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;
  private reconnectDelay = 1000;
  private heartbeatInterval: NodeJS.Timeout | null = null;
  private messageQueue: any[] = [];

  constructor(private url: string) {
    this.connect();
  }

  private async connect(): Promise<void> {
    try {
      this.socket = io(this.url, {
        timeout: 10000,
        forceNew: true
      });

      this.setupEventHandlers();
      this.startHeartbeat();
      
    } catch (error) {
      this.handleError({
        type: ErrorType.CONNECTION_FAILED,
        message: 'WebSocket接続に失敗しました',
        details: error,
        timestamp: Date.now()
      });
    }
  }

  private setupEventHandlers(): void {
    if (!this.socket) return;

    this.socket.on('connect', () => {
      console.log('接続成功');
      this.reconnectAttempts = 0;
      this.flushMessageQueue();
    });

    this.socket.on('connect_error', (error) => {
      this.handleError({
        type: ErrorType.CONNECTION_FAILED,
        message: '接続エラーが発生しました',
        details: error,
        timestamp: Date.now()
      });
      this.handleReconnect();
    });

    this.socket.on('disconnect', (reason) => {
      console.log('接続が切断されました:', reason);
      this.handleReconnect();
    });
  }

  private handleError(error: RealtimeError): void {
    console.error(`[${error.type}] ${error.message}:`, error.details);
    
    // エラーレポートの送信
    this.reportError(error);
    
    // ユーザーへの通知
    this.notifyUser(error);
  }

  private async reportError(error: RealtimeError): Promise<void> {
    try {
      await fetch('/api/error-reporting', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(error)
      });
    } catch (reportingError) {
      console.error('エラーレポート送信失敗:', reportingError);
    }
  }

  private handleReconnect(): void {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      this.handleError({
        type: ErrorType.CONNECTION_FAILED,
        message: '最大再接続試行回数に達しました',
        timestamp: Date.now()
      });
      return;
    }

    this.reconnectAttempts++;
    const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
    
    setTimeout(() => {
      console.log(`再接続試行 ${this.reconnectAttempts}/${this.maxReconnectAttempts}`);
      this.connect();
    }, delay);
  }

  private startHeartbeat(): void {
    this.heartbeatInterval = setInterval(() => {
      if (this.socket?.connected) {
        this.socket.emit('ping', Date.now());
      }
    }, 30000);
  }

  private flushMessageQueue(): void {
    while (this.messageQueue.length > 0) {
      const message = this.messageQueue.shift();
      this.sendMessage(message);
    }
  }

  public sendMessage(data: any): void {
    if (!this.socket?.connected) {
      this.messageQueue.push(data);
      return;
    }

    try {
      this.socket.emit('message', data);
    } catch (error) {
      this.handleError({
        type: ErrorType.MESSAGE_PARSING_ERROR,
        message: 'メッセージ送信に失敗しました',
        details: error,
        timestamp: Date.now()
      });
    }
  }

  private notifyUser(error: RealtimeError): void {
    // ユーザーフレンドリーなエラーメッセージの表示
    const userMessage = this.getUserFriendlyMessage(error.type);
    // UI更新ロジック(実際の実装に応じて)
    console.log('ユーザー通知:', userMessage);
  }

  private getUserFriendlyMessage(errorType: ErrorType): string {
    switch (errorType) {
      case ErrorType.CONNECTION_FAILED:
        return 'ネットワーク接続を確認してください';
      case ErrorType.AUTHENTICATION_ERROR:
        return 'ログインが必要です';
      case ErrorType.RATE_LIMIT_EXCEEDED:
        return 'しばらく待ってから再試行してください';
      default:
        return '一時的な問題が発生しました';
    }
  }
}

接続状態管理とレート制限

// connection-manager.ts
class ConnectionManager {
  private connections: Map<string, Socket> = new Map();
  private rateLimiter: Map<string, number[]> = new Map();
  private readonly MAX_CONNECTIONS_PER_IP = 10;
  private readonly RATE_LIMIT_WINDOW = 60000; // 1分
  private readonly MAX_MESSAGES_PER_WINDOW = 100;

  public addConnection(socket: Socket, ipAddress: string): boolean {
    const currentConnections = this.getConnectionsByIP(ipAddress);
    
    if (currentConnections >= this.MAX_CONNECTIONS_PER_IP) {
      socket.emit('error', { 
        type: 'CONNECTION_LIMIT_EXCEEDED',
        message: '接続数の上限に達しました' 
      });
      socket.disconnect();
      return false;
    }

    this.connections.set(socket.id, socket);
    this.setupRateLimit(socket, ipAddress);
    return true;
  }

  private setupRateLimit(socket: Socket, ipAddress: string): void {
    socket.on('message', (data) => {
      if (!this.checkRateLimit(ipAddress)) {
        socket.emit('error', {
          type: 'RATE_LIMIT_EXCEEDED',
          message: 'メッセージ送信頻度が高すぎます'
        });
        return;
      }
      // メッセージ処理の続行
    });
  }

  private checkRateLimit(ipAddress: string): boolean {
    const now = Date.now();
    const timestamps = this.rateLimiter.get(ipAddress) || [];
    
    // 古いタイムスタンプを削除
    const validTimestamps = timestamps.filter(
      timestamp => now - timestamp < this.RATE_LIMIT_WINDOW
    );
    
    if (validTimestamps.length >= this.MAX_MESSAGES_PER_WINDOW) {
      return false;
    }

    validTimestamps.push(now);
    this.rateLimiter.set(ipAddress, validTimestamps);
    return true;
  }

  private getConnectionsByIP(ipAddress: string): number {
    // 実装の詳細は省略
    return 0;
  }
}

これらの実装により、ネットワーク障害やサーバーエラーに対して resilient なリアルタイムアプリケーションを構築できます。

本番環境でのパフォーマンス最適化とモニタリング

本番環境でのリアルタイムアプリケーションでは、パフォーマンス最適化と適切なモニタリングが重要です。

パフォーマンス最適化テクニック

// performance-optimization.ts
class OptimizedRealtimeServer {
  private io: Server;
  private connectionPool: Map<string, Socket> = new Map();
  private messageBuffer: Map<string, any[]> = new Map();
  private bufferFlushInterval = 100; // 100ms

  constructor(port: number) {
    this.io = new Server(createServer().listen(port), {
      // パフォーマンス最適化設定
      pingInterval: 25000,
      pingTimeout: 20000,
      upgradeTimeout: 10000,
      maxHttpBufferSize: 1e6, // 1MB
      compression: true,
      perMessageDeflate: true
    });

    this.setupOptimizedHandlers();
    this.startBufferFlush();
  }

  private setupOptimizedHandlers(): void {
    this.io.on('connection', (socket) => {
      // 接続プールに追加
      this.connectionPool.set(socket.id, socket);

      // バッチ処理でメッセージを処理
      socket.on('batch-messages', (messages: any[]) => {
        this.processBatchMessages(socket, messages);
      });

      // 効率的なルーム管理
      socket.on('join-optimized-room', (roomName: string) => {
        socket.join(roomName);
        // ルーム参加通知を遅延送信
        this.addToBuffer(roomName, {
          type: 'user-joined',
          userId: socket.id,
          timestamp: Date.now()
        });
      });

      socket.on('disconnect', () => {
        this.connectionPool.delete(socket.id);
      });
    });
  }

  private processBatchMessages(socket: Socket, messages: any[]): void {
    // メッセージをバッチで処理
    messages.forEach(message => {
      if (message.targetRoom) {
        this.addToBuffer(message.targetRoom, message);
      }
    });
  }

  private addToBuffer(room: string, message: any): void {
    if (!this.messageBuffer.has(room)) {
      this.messageBuffer.set(room, []);
    }
    this.messageBuffer.get(room)!.push(message);
  }

  private startBufferFlush(): void {
    setInterval(() => {
      this.messageBuffer.forEach((messages, room) => {
        if (messages.length > 0) {
          // バッチでメッセージを送信
          this.io.to(room).emit('batch-update', messages);
          this.messageBuffer.set(room, []);
        }
      });
    }, this.bufferFlushInterval);
  }
}

リアルタイム監視とメトリクス収集

// monitoring.ts
interface RealtimeMetrics {
  activeConnections: number;
  messagesPerSecond: number;
  averageLatency: number;
  errorRate: number;
  memoryUsage: number;
}

class RealtimeMonitor {
  private metrics: RealtimeMetrics = {
    activeConnections: 0,
    messagesPerSecond: 0,
    averageLatency: 0,
    errorRate: 0,
    memoryUsage: 0
  };

  private messageCount = 0;
  private errorCount = 0;
  private latencySum = 0;
  private latencyCount = 0;

  constructor(private io: Server) {
    this.setupMonitoring();
    this.startMetricsCollection();
  }

  private setupMonitoring(): void {
    this.io.on('connection', (socket) => {
      this.metrics.activeConnections++;

      socket.on('message', (data, callback) => {
        const startTime = Date.now();
        this.messageCount++;

        // メッセージ処理後のレイテンシ測定
        process.nextTick(() => {
          const latency = Date.now() - startTime;
          this.latencySum += latency;
          this.latencyCount++;
          
          if (callback) callback({ latency });
        });
      });

      socket.on('error', () => {
        this.errorCount++;
      });

      socket.on('disconnect', () => {
        this.metrics.activeConnections--;
      });
    });
  }

  private startMetricsCollection(): void {
    setInterval(() => {
      // メトリクス計算
      this.metrics.messagesPerSecond = this.messageCount;
      this.metrics.averageLatency = this.latencyCount > 0 
        ? this.latencySum / this.latencyCount 
        : 0;
      this.metrics.errorRate = this.errorCount / Math.max(this.messageCount, 1);
      this.metrics.memoryUsage = process.memoryUsage().heapUsed;

      // ログ出力
      console.log('リアルタイムメトリクス:', this.metrics);

      // 外部監視システムに送信
      this.sendToMonitoringSystem(this.metrics);

      // カウンターリセット
      this.resetCounters();
    }, 60000); // 1分ごと
  }

  private async sendToMonitoringSystem(metrics: RealtimeMetrics): Promise<void> {
    try {
      await fetch('https://monitoring-system.com/api/metrics', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          service: 'realtime-api',
          timestamp: Date.now(),
          metrics
        })
      });
    } catch (error) {
      console.error('監視システムへのメトリクス送信失敗:', error);
    }
  }

  private resetCounters(): void {
    this.messageCount = 0;
    this.errorCount = 0;
    this.latencySum = 0;
    this.latencyCount = 0;
  }

  public getMetrics(): RealtimeMetrics {
    return { ...this.metrics };
  }
}

これらの最適化により、大規模な本番環境でも安定したリアルタイム通信を提供できます。TypeScriptの型安全性とNode.jsの非同期処理を活用して、スケーラブルで保守性の高いリアルタイムAPIを構築しましょう。

TH

Tasuke Hub管理人

東証プライム市場上場企業エンジニア

情報系修士卒業後、大手IT企業にてフルスタックエンジニアとして活躍。 Webアプリケーション開発からクラウドインフラ構築まで幅広い技術に精通し、 複数のプロジェクトでリードエンジニアを担当。 技術ブログやオープンソースへの貢献を通じて、日本のIT技術コミュニティに積極的に関わっている。

🎓情報系修士🏢東証プライム上場企業💻フルスタックエンジニア📝技術ブログ執筆者

このトピックはこちらの書籍で勉強するのがおすすめ!

この記事の内容をさらに深く理解したい方におすすめの一冊です。実践的な知識を身につけたい方は、ぜひチェックしてみてください!

おすすめ記事

おすすめコンテンツ