Building Custom Transports
Learn to implement custom MCP transport layers including WebSocket transports, message framing, connection lifecycle management, and advanced patterns for specialized deployment environments.
title: "Building Custom Transports" description: "Learn to implement custom MCP transport layers including WebSocket transports, message framing, connection lifecycle management, and advanced patterns for specialized deployment environments." order: 16 level: "advanced" duration: "35 min" keywords:
- "MCP custom transport"
- "MCP WebSocket transport"
- "MCP message framing"
- "MCP transport implementation"
- "@modelcontextprotocol/sdk Transport interface"
- "mcp-framework custom transport"
- "MCP protocol transport layer"
- "building MCP transports" date: "2026-04-01"
While the official TypeScript SDK provides stdio, SSE, and Streamable HTTP transports, some deployments need custom transport layers. This lesson teaches you how to implement the MCP Transport interface from scratch — covering the WebSocket transport pattern, message framing, connection lifecycle, error recovery, and how to integrate custom transports with both the official SDK and mcp-framework.
When You Need a Custom Transport
The built-in transports cover most use cases. Custom transports make sense when:
- You need WebSocket communication for bidirectional streaming
- Your infrastructure uses a message queue (Redis Pub/Sub, RabbitMQ, NATS)
- You are building in-process communication between components
- You need a custom protocol layer for security or compliance reasons
- You are embedding MCP in an Electron or React Native app
Custom transports add complexity and maintenance burden. Before building one, verify that stdio, SSE, or Streamable HTTP cannot meet your requirements. Most production deployments work well with the standard transports.
The Transport Interface
Every MCP transport implements a simple interface defined by the SDK:
interface Transport {
// Start the transport (begin accepting messages)
start(): Promise<void>;
// Send a JSON-RPC message to the other end
send(message: JSONRPCMessage): Promise<void>;
// Close the transport
close(): Promise<void>;
// Callback: invoked when a message is received
onmessage?: (message: JSONRPCMessage) => void;
// Callback: invoked when the transport closes
onclose?: () => void;
// Callback: invoked when an error occurs
onerror?: (error: Error) => void;
}
The contract that all MCP transports must implement. It consists of three methods (start, send, close) and three callbacks (onmessage, onclose, onerror). The MCP server and client set the callbacks after constructing the transport, then call start() to begin communication.
Building a WebSocket Transport
WebSockets provide full-duplex communication, making them ideal for real-time MCP servers. Here is a complete implementation:
Server-Side WebSocket Transport
// src/transports/websocket-server-transport.ts
import { WebSocket, WebSocketServer } from "ws";
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
export class WebSocketServerTransport {
private wss: WebSocketServer | null = null;
private clients = new Map<string, WebSocket>();
onmessage?: (message: JSONRPCMessage) => void;
onclose?: () => void;
onerror?: (error: Error) => void;
constructor(
private options: {
port: number;
path?: string;
maxConnections?: number;
}
) {}
async start(): Promise<void> {
return new Promise((resolve, reject) => {
this.wss = new WebSocketServer({
port: this.options.port,
path: this.options.path || "/mcp",
});
this.wss.on("listening", () => {
console.error(
`WebSocket MCP transport listening on ws://localhost:${this.options.port}${this.options.path || "/mcp"}`
);
resolve();
});
this.wss.on("error", (error) => {
this.onerror?.(error);
reject(error);
});
this.wss.on("connection", (ws, req) => {
this.handleConnection(ws, req);
});
});
}
private handleConnection(ws: WebSocket, req: any): void {
const clientId = crypto.randomUUID();
if (
this.options.maxConnections &&
this.clients.size >= this.options.maxConnections
) {
ws.close(1013, "Maximum connections reached");
return;
}
this.clients.set(clientId, ws);
console.error(`Client connected: ${clientId}`);
ws.on("message", (data) => {
try {
const message = JSON.parse(data.toString()) as JSONRPCMessage;
this.onmessage?.(message);
} catch (error) {
this.onerror?.(
new Error(`Invalid JSON from client ${clientId}: ${error}`)
);
}
});
ws.on("close", () => {
this.clients.delete(clientId);
console.error(`Client disconnected: ${clientId}`);
if (this.clients.size === 0) {
this.onclose?.();
}
});
ws.on("error", (error) => {
this.onerror?.(error);
});
}
async send(message: JSONRPCMessage): Promise<void> {
const data = JSON.stringify(message);
const sendPromises = Array.from(this.clients.values())
.filter((ws) => ws.readyState === WebSocket.OPEN)
.map(
(ws) =>
new Promise<void>((resolve, reject) => {
ws.send(data, (error) => {
if (error) reject(error);
else resolve();
});
})
);
await Promise.allSettled(sendPromises);
}
async close(): Promise<void> {
for (const [id, ws] of this.clients) {
ws.close(1000, "Server shutting down");
this.clients.delete(id);
}
return new Promise((resolve) => {
if (this.wss) {
this.wss.close(() => resolve());
} else {
resolve();
}
});
}
}
Client-Side WebSocket Transport
// src/transports/websocket-client-transport.ts
import WebSocket from "ws";
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
export class WebSocketClientTransport {
private ws: WebSocket | null = null;
private messageQueue: JSONRPCMessage[] = [];
private connected = false;
onmessage?: (message: JSONRPCMessage) => void;
onclose?: () => void;
onerror?: (error: Error) => void;
constructor(
private url: string,
private options: {
reconnect?: boolean;
reconnectIntervalMs?: number;
maxReconnectAttempts?: number;
} = {}
) {}
async start(): Promise<void> {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.url);
this.ws.on("open", () => {
this.connected = true;
console.error(`Connected to ${this.url}`);
// Flush queued messages
while (this.messageQueue.length > 0) {
const msg = this.messageQueue.shift()!;
this.ws!.send(JSON.stringify(msg));
}
resolve();
});
this.ws.on("message", (data) => {
try {
const message = JSON.parse(data.toString()) as JSONRPCMessage;
this.onmessage?.(message);
} catch (error) {
this.onerror?.(new Error(`Invalid JSON from server: ${error}`));
}
});
this.ws.on("close", () => {
this.connected = false;
this.onclose?.();
if (this.options.reconnect) {
this.scheduleReconnect();
}
});
this.ws.on("error", (error) => {
this.onerror?.(error);
if (!this.connected) reject(error);
});
});
}
async send(message: JSONRPCMessage): Promise<void> {
if (!this.connected || !this.ws) {
// Queue message for when connection is restored
this.messageQueue.push(message);
return;
}
return new Promise((resolve, reject) => {
this.ws!.send(JSON.stringify(message), (error) => {
if (error) reject(error);
else resolve();
});
});
}
async close(): Promise<void> {
this.options.reconnect = false;
if (this.ws) {
this.ws.close(1000, "Client closing");
this.ws = null;
}
this.connected = false;
}
private scheduleReconnect(attempt = 0): void {
const maxAttempts = this.options.maxReconnectAttempts ?? 10;
const interval = this.options.reconnectIntervalMs ?? 3000;
if (attempt >= maxAttempts) {
this.onerror?.(new Error("Max reconnect attempts reached"));
return;
}
setTimeout(() => {
console.error(`Reconnecting (attempt ${attempt + 1})...`);
this.start().catch(() => {
this.scheduleReconnect(attempt + 1);
});
}, interval * Math.pow(1.5, attempt));
}
}
Using the WebSocket Transport
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { WebSocketServerTransport } from "./transports/websocket-server-transport.js";
const server = new McpServer({
name: "websocket-server",
version: "1.0.0",
});
// Register tools, resources, prompts...
async function main() {
const transport = new WebSocketServerTransport({
port: 8080,
path: "/mcp",
maxConnections: 50,
});
await server.connect(transport);
console.error("WebSocket MCP server running on ws://localhost:8080/mcp");
}
main().catch(console.error);
Message Framing
The process of delimiting individual messages within a stream of bytes. Each transport has its own framing strategy: stdio uses newline-delimited JSON, WebSockets use frame boundaries, and HTTP uses request/response boundaries. Custom transports must define their own framing.
Newline-Delimited JSON (NDJSON)
For stream-based transports like TCP sockets:
// Simple NDJSON framer for stream-based transports
class NDJSONFramer {
private buffer = "";
// Feed raw data from the stream
feed(data: string): JSONRPCMessage[] {
this.buffer += data;
const messages: JSONRPCMessage[] = [];
const lines = this.buffer.split("\n");
// Keep the last incomplete line in the buffer
this.buffer = lines.pop() || "";
for (const line of lines) {
const trimmed = line.trim();
if (trimmed.length === 0) continue;
try {
messages.push(JSON.parse(trimmed));
} catch {
// Skip malformed lines
console.error("Malformed JSON-RPC message:", trimmed.substring(0, 100));
}
}
return messages;
}
// Encode a message for transmission
encode(message: JSONRPCMessage): string {
return JSON.stringify(message) + "\n";
}
}
Length-Prefixed Framing
For binary transports where newlines might appear in data:
class LengthPrefixFramer {
private buffer = Buffer.alloc(0);
feed(data: Buffer): JSONRPCMessage[] {
this.buffer = Buffer.concat([this.buffer, data]);
const messages: JSONRPCMessage[] = [];
while (this.buffer.length >= 4) {
const length = this.buffer.readUInt32BE(0);
if (this.buffer.length < 4 + length) {
break; // Incomplete message, wait for more data
}
const messageData = this.buffer.subarray(4, 4 + length);
this.buffer = this.buffer.subarray(4 + length);
try {
messages.push(JSON.parse(messageData.toString("utf-8")));
} catch {
console.error("Malformed framed message");
}
}
return messages;
}
encode(message: JSONRPCMessage): Buffer {
const json = JSON.stringify(message);
const body = Buffer.from(json, "utf-8");
const header = Buffer.alloc(4);
header.writeUInt32BE(body.length, 0);
return Buffer.concat([header, body]);
}
}
Message Queue Transport
For distributed systems, a message queue transport decouples the client and server:
// Redis Pub/Sub transport (conceptual)
import Redis from "ioredis";
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
export class RedisPubSubTransport {
private pub: Redis;
private sub: Redis;
onmessage?: (message: JSONRPCMessage) => void;
onclose?: () => void;
onerror?: (error: Error) => void;
constructor(
private redisUrl: string,
private channels: {
incoming: string; // Channel to receive messages on
outgoing: string; // Channel to send messages to
}
) {
this.pub = new Redis(redisUrl);
this.sub = new Redis(redisUrl);
}
async start(): Promise<void> {
await this.sub.subscribe(this.channels.incoming);
this.sub.on("message", (channel, data) => {
if (channel === this.channels.incoming) {
try {
const message = JSON.parse(data) as JSONRPCMessage;
this.onmessage?.(message);
} catch (error) {
this.onerror?.(new Error(`Invalid message on ${channel}`));
}
}
});
this.sub.on("error", (error) => this.onerror?.(error));
}
async send(message: JSONRPCMessage): Promise<void> {
await this.pub.publish(
this.channels.outgoing,
JSON.stringify(message)
);
}
async close(): Promise<void> {
await this.sub.unsubscribe(this.channels.incoming);
await this.sub.quit();
await this.pub.quit();
this.onclose?.();
}
}
| Transport Type | Latency | Scalability | Complexity |
|---|---|---|---|
| WebSocket | Low (persistent connection) | Medium | Medium |
| Message Queue | Medium (broker hop) | High (decoupled) | High |
| TCP Socket | Very low (raw) | Low | High (manual framing) |
| IPC (Unix socket) | Very low (no network) | Low (single machine) | Medium |
| SharedWorker | Very low (in-browser) | Low (single tab set) | Low |
Connection Lifecycle Management
Custom transports must handle the full connection lifecycle:
class RobustTransport {
private state: "disconnected" | "connecting" | "connected" | "closing" = "disconnected";
onmessage?: (message: JSONRPCMessage) => void;
onclose?: () => void;
onerror?: (error: Error) => void;
async start(): Promise<void> {
if (this.state !== "disconnected") {
throw new Error(`Cannot start transport in state: ${this.state}`);
}
this.state = "connecting";
try {
await this.connect();
this.state = "connected";
this.startHeartbeat();
} catch (error) {
this.state = "disconnected";
throw error;
}
}
async send(message: JSONRPCMessage): Promise<void> {
if (this.state !== "connected") {
throw new Error(`Cannot send in state: ${this.state}`);
}
await this.transmit(message);
}
async close(): Promise<void> {
if (this.state === "disconnected" || this.state === "closing") return;
this.state = "closing";
this.stopHeartbeat();
await this.disconnect();
this.state = "disconnected";
this.onclose?.();
}
// Heartbeat to detect dead connections
private heartbeatInterval?: ReturnType<typeof setInterval>;
private startHeartbeat(): void {
this.heartbeatInterval = setInterval(async () => {
try {
await this.ping();
} catch {
this.onerror?.(new Error("Heartbeat failed"));
await this.close();
}
}, 30000);
}
private stopHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
}
// Abstract methods for subclasses
protected async connect(): Promise<void> { /* ... */ }
protected async disconnect(): Promise<void> { /* ... */ }
protected async transmit(message: JSONRPCMessage): Promise<void> { /* ... */ }
protected async ping(): Promise<void> { /* ... */ }
}
Custom transports should include a heartbeat mechanism to detect dead connections. Without heartbeats, a disconnected client can appear connected indefinitely, wasting server resources and causing stale sessions. A 30-second interval is a reasonable default.
Testing Custom Transports
import { describe, it, expect, beforeEach } from "vitest";
import { WebSocketServerTransport } from "../transports/websocket-server-transport.js";
describe("WebSocketServerTransport", () => {
let transport: WebSocketServerTransport;
beforeEach(() => {
transport = new WebSocketServerTransport({ port: 0 }); // Random port
});
it("starts and accepts connections", async () => {
await transport.start();
// Connect a test client...
await transport.close();
});
it("delivers messages between client and server", async () => {
const received: any[] = [];
transport.onmessage = (msg) => received.push(msg);
await transport.start();
// Send test message from client...
expect(received.length).toBe(1);
await transport.close();
});
it("handles client disconnection", async () => {
let closed = false;
transport.onclose = () => { closed = true; };
await transport.start();
// Connect and disconnect client...
expect(closed).toBe(true);
await transport.close();
});
});