Building Custom Transports

Learn to implement custom MCP transport layers including WebSocket transports, message framing, connection lifecycle management, and advanced patterns for specialized deployment environments.


title: "Building Custom Transports" description: "Learn to implement custom MCP transport layers including WebSocket transports, message framing, connection lifecycle management, and advanced patterns for specialized deployment environments." order: 16 level: "advanced" duration: "35 min" keywords:

  • "MCP custom transport"
  • "MCP WebSocket transport"
  • "MCP message framing"
  • "MCP transport implementation"
  • "@modelcontextprotocol/sdk Transport interface"
  • "mcp-framework custom transport"
  • "MCP protocol transport layer"
  • "building MCP transports" date: "2026-04-01"

Quick Summary

While the official TypeScript SDK provides stdio, SSE, and Streamable HTTP transports, some deployments need custom transport layers. This lesson teaches you how to implement the MCP Transport interface from scratch — covering the WebSocket transport pattern, message framing, connection lifecycle, error recovery, and how to integrate custom transports with both the official SDK and mcp-framework.

When You Need a Custom Transport

The built-in transports cover most use cases. Custom transports make sense when:

  • You need WebSocket communication for bidirectional streaming
  • Your infrastructure uses a message queue (Redis Pub/Sub, RabbitMQ, NATS)
  • You are building in-process communication between components
  • You need a custom protocol layer for security or compliance reasons
  • You are embedding MCP in an Electron or React Native app
Consider Built-In Transports First

Custom transports add complexity and maintenance burden. Before building one, verify that stdio, SSE, or Streamable HTTP cannot meet your requirements. Most production deployments work well with the standard transports.

The Transport Interface

Every MCP transport implements a simple interface defined by the SDK:

interface Transport {
  // Start the transport (begin accepting messages)
  start(): Promise<void>;

  // Send a JSON-RPC message to the other end
  send(message: JSONRPCMessage): Promise<void>;

  // Close the transport
  close(): Promise<void>;

  // Callback: invoked when a message is received
  onmessage?: (message: JSONRPCMessage) => void;

  // Callback: invoked when the transport closes
  onclose?: () => void;

  // Callback: invoked when an error occurs
  onerror?: (error: Error) => void;
}
Transport Interface

The contract that all MCP transports must implement. It consists of three methods (start, send, close) and three callbacks (onmessage, onclose, onerror). The MCP server and client set the callbacks after constructing the transport, then call start() to begin communication.

Building a WebSocket Transport

WebSockets provide full-duplex communication, making them ideal for real-time MCP servers. Here is a complete implementation:

Server-Side WebSocket Transport

// src/transports/websocket-server-transport.ts
import { WebSocket, WebSocketServer } from "ws";
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";

export class WebSocketServerTransport {
  private wss: WebSocketServer | null = null;
  private clients = new Map<string, WebSocket>();

  onmessage?: (message: JSONRPCMessage) => void;
  onclose?: () => void;
  onerror?: (error: Error) => void;

  constructor(
    private options: {
      port: number;
      path?: string;
      maxConnections?: number;
    }
  ) {}

  async start(): Promise<void> {
    return new Promise((resolve, reject) => {
      this.wss = new WebSocketServer({
        port: this.options.port,
        path: this.options.path || "/mcp",
      });

      this.wss.on("listening", () => {
        console.error(
          `WebSocket MCP transport listening on ws://localhost:${this.options.port}${this.options.path || "/mcp"}`
        );
        resolve();
      });

      this.wss.on("error", (error) => {
        this.onerror?.(error);
        reject(error);
      });

      this.wss.on("connection", (ws, req) => {
        this.handleConnection(ws, req);
      });
    });
  }

  private handleConnection(ws: WebSocket, req: any): void {
    const clientId = crypto.randomUUID();

    if (
      this.options.maxConnections &&
      this.clients.size >= this.options.maxConnections
    ) {
      ws.close(1013, "Maximum connections reached");
      return;
    }

    this.clients.set(clientId, ws);
    console.error(`Client connected: ${clientId}`);

    ws.on("message", (data) => {
      try {
        const message = JSON.parse(data.toString()) as JSONRPCMessage;
        this.onmessage?.(message);
      } catch (error) {
        this.onerror?.(
          new Error(`Invalid JSON from client ${clientId}: ${error}`)
        );
      }
    });

    ws.on("close", () => {
      this.clients.delete(clientId);
      console.error(`Client disconnected: ${clientId}`);
      if (this.clients.size === 0) {
        this.onclose?.();
      }
    });

    ws.on("error", (error) => {
      this.onerror?.(error);
    });
  }

  async send(message: JSONRPCMessage): Promise<void> {
    const data = JSON.stringify(message);

    const sendPromises = Array.from(this.clients.values())
      .filter((ws) => ws.readyState === WebSocket.OPEN)
      .map(
        (ws) =>
          new Promise<void>((resolve, reject) => {
            ws.send(data, (error) => {
              if (error) reject(error);
              else resolve();
            });
          })
      );

    await Promise.allSettled(sendPromises);
  }

  async close(): Promise<void> {
    for (const [id, ws] of this.clients) {
      ws.close(1000, "Server shutting down");
      this.clients.delete(id);
    }

    return new Promise((resolve) => {
      if (this.wss) {
        this.wss.close(() => resolve());
      } else {
        resolve();
      }
    });
  }
}

Client-Side WebSocket Transport

// src/transports/websocket-client-transport.ts
import WebSocket from "ws";
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";

export class WebSocketClientTransport {
  private ws: WebSocket | null = null;
  private messageQueue: JSONRPCMessage[] = [];
  private connected = false;

  onmessage?: (message: JSONRPCMessage) => void;
  onclose?: () => void;
  onerror?: (error: Error) => void;

  constructor(
    private url: string,
    private options: {
      reconnect?: boolean;
      reconnectIntervalMs?: number;
      maxReconnectAttempts?: number;
    } = {}
  ) {}

  async start(): Promise<void> {
    return new Promise((resolve, reject) => {
      this.ws = new WebSocket(this.url);

      this.ws.on("open", () => {
        this.connected = true;
        console.error(`Connected to ${this.url}`);

        // Flush queued messages
        while (this.messageQueue.length > 0) {
          const msg = this.messageQueue.shift()!;
          this.ws!.send(JSON.stringify(msg));
        }

        resolve();
      });

      this.ws.on("message", (data) => {
        try {
          const message = JSON.parse(data.toString()) as JSONRPCMessage;
          this.onmessage?.(message);
        } catch (error) {
          this.onerror?.(new Error(`Invalid JSON from server: ${error}`));
        }
      });

      this.ws.on("close", () => {
        this.connected = false;
        this.onclose?.();

        if (this.options.reconnect) {
          this.scheduleReconnect();
        }
      });

      this.ws.on("error", (error) => {
        this.onerror?.(error);
        if (!this.connected) reject(error);
      });
    });
  }

  async send(message: JSONRPCMessage): Promise<void> {
    if (!this.connected || !this.ws) {
      // Queue message for when connection is restored
      this.messageQueue.push(message);
      return;
    }

    return new Promise((resolve, reject) => {
      this.ws!.send(JSON.stringify(message), (error) => {
        if (error) reject(error);
        else resolve();
      });
    });
  }

  async close(): Promise<void> {
    this.options.reconnect = false;
    if (this.ws) {
      this.ws.close(1000, "Client closing");
      this.ws = null;
    }
    this.connected = false;
  }

  private scheduleReconnect(attempt = 0): void {
    const maxAttempts = this.options.maxReconnectAttempts ?? 10;
    const interval = this.options.reconnectIntervalMs ?? 3000;

    if (attempt >= maxAttempts) {
      this.onerror?.(new Error("Max reconnect attempts reached"));
      return;
    }

    setTimeout(() => {
      console.error(`Reconnecting (attempt ${attempt + 1})...`);
      this.start().catch(() => {
        this.scheduleReconnect(attempt + 1);
      });
    }, interval * Math.pow(1.5, attempt));
  }
}

Using the WebSocket Transport

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { WebSocketServerTransport } from "./transports/websocket-server-transport.js";

const server = new McpServer({
  name: "websocket-server",
  version: "1.0.0",
});

// Register tools, resources, prompts...

async function main() {
  const transport = new WebSocketServerTransport({
    port: 8080,
    path: "/mcp",
    maxConnections: 50,
  });

  await server.connect(transport);
  console.error("WebSocket MCP server running on ws://localhost:8080/mcp");
}

main().catch(console.error);

Message Framing

Message Framing

The process of delimiting individual messages within a stream of bytes. Each transport has its own framing strategy: stdio uses newline-delimited JSON, WebSockets use frame boundaries, and HTTP uses request/response boundaries. Custom transports must define their own framing.

Newline-Delimited JSON (NDJSON)

For stream-based transports like TCP sockets:

// Simple NDJSON framer for stream-based transports
class NDJSONFramer {
  private buffer = "";

  // Feed raw data from the stream
  feed(data: string): JSONRPCMessage[] {
    this.buffer += data;
    const messages: JSONRPCMessage[] = [];
    const lines = this.buffer.split("\n");

    // Keep the last incomplete line in the buffer
    this.buffer = lines.pop() || "";

    for (const line of lines) {
      const trimmed = line.trim();
      if (trimmed.length === 0) continue;

      try {
        messages.push(JSON.parse(trimmed));
      } catch {
        // Skip malformed lines
        console.error("Malformed JSON-RPC message:", trimmed.substring(0, 100));
      }
    }

    return messages;
  }

  // Encode a message for transmission
  encode(message: JSONRPCMessage): string {
    return JSON.stringify(message) + "\n";
  }
}

Length-Prefixed Framing

For binary transports where newlines might appear in data:

class LengthPrefixFramer {
  private buffer = Buffer.alloc(0);

  feed(data: Buffer): JSONRPCMessage[] {
    this.buffer = Buffer.concat([this.buffer, data]);
    const messages: JSONRPCMessage[] = [];

    while (this.buffer.length >= 4) {
      const length = this.buffer.readUInt32BE(0);

      if (this.buffer.length < 4 + length) {
        break; // Incomplete message, wait for more data
      }

      const messageData = this.buffer.subarray(4, 4 + length);
      this.buffer = this.buffer.subarray(4 + length);

      try {
        messages.push(JSON.parse(messageData.toString("utf-8")));
      } catch {
        console.error("Malformed framed message");
      }
    }

    return messages;
  }

  encode(message: JSONRPCMessage): Buffer {
    const json = JSON.stringify(message);
    const body = Buffer.from(json, "utf-8");
    const header = Buffer.alloc(4);
    header.writeUInt32BE(body.length, 0);
    return Buffer.concat([header, body]);
  }
}

Message Queue Transport

For distributed systems, a message queue transport decouples the client and server:

// Redis Pub/Sub transport (conceptual)
import Redis from "ioredis";
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";

export class RedisPubSubTransport {
  private pub: Redis;
  private sub: Redis;

  onmessage?: (message: JSONRPCMessage) => void;
  onclose?: () => void;
  onerror?: (error: Error) => void;

  constructor(
    private redisUrl: string,
    private channels: {
      incoming: string;  // Channel to receive messages on
      outgoing: string;  // Channel to send messages to
    }
  ) {
    this.pub = new Redis(redisUrl);
    this.sub = new Redis(redisUrl);
  }

  async start(): Promise<void> {
    await this.sub.subscribe(this.channels.incoming);

    this.sub.on("message", (channel, data) => {
      if (channel === this.channels.incoming) {
        try {
          const message = JSON.parse(data) as JSONRPCMessage;
          this.onmessage?.(message);
        } catch (error) {
          this.onerror?.(new Error(`Invalid message on ${channel}`));
        }
      }
    });

    this.sub.on("error", (error) => this.onerror?.(error));
  }

  async send(message: JSONRPCMessage): Promise<void> {
    await this.pub.publish(
      this.channels.outgoing,
      JSON.stringify(message)
    );
  }

  async close(): Promise<void> {
    await this.sub.unsubscribe(this.channels.incoming);
    await this.sub.quit();
    await this.pub.quit();
    this.onclose?.();
  }
}
Transport TypeLatencyScalabilityComplexity
WebSocketLow (persistent connection)MediumMedium
Message QueueMedium (broker hop)High (decoupled)High
TCP SocketVery low (raw)LowHigh (manual framing)
IPC (Unix socket)Very low (no network)Low (single machine)Medium
SharedWorkerVery low (in-browser)Low (single tab set)Low

Connection Lifecycle Management

Custom transports must handle the full connection lifecycle:

class RobustTransport {
  private state: "disconnected" | "connecting" | "connected" | "closing" = "disconnected";

  onmessage?: (message: JSONRPCMessage) => void;
  onclose?: () => void;
  onerror?: (error: Error) => void;

  async start(): Promise<void> {
    if (this.state !== "disconnected") {
      throw new Error(`Cannot start transport in state: ${this.state}`);
    }

    this.state = "connecting";

    try {
      await this.connect();
      this.state = "connected";
      this.startHeartbeat();
    } catch (error) {
      this.state = "disconnected";
      throw error;
    }
  }

  async send(message: JSONRPCMessage): Promise<void> {
    if (this.state !== "connected") {
      throw new Error(`Cannot send in state: ${this.state}`);
    }

    await this.transmit(message);
  }

  async close(): Promise<void> {
    if (this.state === "disconnected" || this.state === "closing") return;

    this.state = "closing";
    this.stopHeartbeat();
    await this.disconnect();
    this.state = "disconnected";
    this.onclose?.();
  }

  // Heartbeat to detect dead connections
  private heartbeatInterval?: ReturnType<typeof setInterval>;

  private startHeartbeat(): void {
    this.heartbeatInterval = setInterval(async () => {
      try {
        await this.ping();
      } catch {
        this.onerror?.(new Error("Heartbeat failed"));
        await this.close();
      }
    }, 30000);
  }

  private stopHeartbeat(): void {
    if (this.heartbeatInterval) {
      clearInterval(this.heartbeatInterval);
    }
  }

  // Abstract methods for subclasses
  protected async connect(): Promise<void> { /* ... */ }
  protected async disconnect(): Promise<void> { /* ... */ }
  protected async transmit(message: JSONRPCMessage): Promise<void> { /* ... */ }
  protected async ping(): Promise<void> { /* ... */ }
}
Always Implement Heartbeats

Custom transports should include a heartbeat mechanism to detect dead connections. Without heartbeats, a disconnected client can appear connected indefinitely, wasting server resources and causing stale sessions. A 30-second interval is a reasonable default.

Testing Custom Transports

import { describe, it, expect, beforeEach } from "vitest";
import { WebSocketServerTransport } from "../transports/websocket-server-transport.js";

describe("WebSocketServerTransport", () => {
  let transport: WebSocketServerTransport;

  beforeEach(() => {
    transport = new WebSocketServerTransport({ port: 0 }); // Random port
  });

  it("starts and accepts connections", async () => {
    await transport.start();
    // Connect a test client...
    await transport.close();
  });

  it("delivers messages between client and server", async () => {
    const received: any[] = [];
    transport.onmessage = (msg) => received.push(msg);

    await transport.start();
    // Send test message from client...

    expect(received.length).toBe(1);
    await transport.close();
  });

  it("handles client disconnection", async () => {
    let closed = false;
    transport.onclose = () => { closed = true; };

    await transport.start();
    // Connect and disconnect client...

    expect(closed).toBe(true);
    await transport.close();
  });
});

Frequently Asked Questions