Advanced Tool Patterns
Master advanced MCP tool patterns including complex Zod input schemas, async tool operations, robust error handling, tool composition, and progress reporting for production-grade servers.
title: "Advanced Tool Patterns" description: "Master advanced MCP tool patterns including complex Zod input schemas, async tool operations, robust error handling, tool composition, and progress reporting for production-grade servers." order: 10 level: "intermediate" duration: "30 min" keywords:
- "MCP advanced tools"
- "MCP tool composition"
- "MCP error handling tools"
- "MCP progress reporting"
- "Zod complex schemas MCP"
- "async MCP tools"
- "mcp-framework tools"
- "@modelcontextprotocol/sdk tools"
- "MCP tool patterns" date: "2026-04-01"
Production MCP servers need tools that go beyond simple request-response. This lesson covers complex Zod schemas with unions, arrays, and nested objects; async tools with external API calls; structured error handling; composing multiple tools together; and reporting progress for long-running operations. You will learn patterns that work with both the official TypeScript SDK and mcp-framework.
Complex Input Schemas with Zod
Real-world tools need schemas that go well beyond simple strings and numbers. Zod gives you the full power of TypeScript's type system at runtime.
Nested Objects
import { z } from "zod";
server.tool(
"create-issue",
"Create a new issue in the project tracker",
{
title: z.string().min(1).max(200).describe("Issue title"),
body: z.string().optional().describe("Detailed description"),
labels: z.array(z.string()).default([]).describe("Labels to apply"),
assignee: z.object({
id: z.string().describe("User ID"),
role: z.enum(["developer", "reviewer", "tester"])
.describe("Assignment role"),
}).optional().describe("Person to assign"),
priority: z.union([
z.literal("critical"),
z.literal("high"),
z.literal("medium"),
z.literal("low"),
]).default("medium").describe("Issue priority level"),
metadata: z.record(z.string(), z.unknown())
.optional()
.describe("Arbitrary key-value metadata"),
},
async ({ title, body, labels, assignee, priority, metadata }) => {
const issue = await tracker.createIssue({
title, body, labels, assignee, priority, metadata,
});
return {
content: [{
type: "text",
text: `Created issue #${issue.id}: ${issue.title}`,
}],
};
}
);
Discriminated Unions
When a tool accepts different shapes of input depending on a type field, use discriminated unions:
const DataSourceSchema = z.discriminatedUnion("type", [
z.object({
type: z.literal("url"),
url: z.string().url().describe("URL to fetch data from"),
headers: z.record(z.string()).optional().describe("Custom headers"),
}),
z.object({
type: z.literal("file"),
path: z.string().describe("Local file path"),
encoding: z.enum(["utf-8", "base64"]).default("utf-8"),
}),
z.object({
type: z.literal("inline"),
content: z.string().describe("Inline data content"),
format: z.enum(["json", "csv", "text"]).default("text"),
}),
]);
server.tool(
"import-data",
"Import data from various sources",
{ source: DataSourceSchema },
async ({ source }) => {
switch (source.type) {
case "url":
return await importFromUrl(source.url, source.headers);
case "file":
return await importFromFile(source.path, source.encoding);
case "inline":
return await importInline(source.content, source.format);
}
}
);
Keep schemas as flat as possible — deeply nested objects make it harder for AI models to construct correct inputs. Use discriminated unions instead of optional fields when different "modes" of a tool require different parameters. Always provide .describe() on every field.
Array and Tuple Schemas
server.tool(
"batch-process",
"Process multiple items in a single operation",
{
items: z.array(
z.object({
id: z.string().describe("Item identifier"),
action: z.enum(["update", "delete", "archive"])
.describe("Action to perform"),
data: z.record(z.unknown()).optional()
.describe("Data for update action"),
})
).min(1).max(50).describe("Items to process (1-50)"),
options: z.object({
dryRun: z.boolean().default(false)
.describe("Simulate without making changes"),
stopOnError: z.boolean().default(true)
.describe("Stop processing on first error"),
}).default({}).describe("Processing options"),
},
async ({ items, options }) => {
const results = [];
for (const item of items) {
try {
const result = await processItem(item, options);
results.push({ id: item.id, status: "success", result });
} catch (error) {
results.push({ id: item.id, status: "error", error: String(error) });
if (options.stopOnError) break;
}
}
return {
content: [{ type: "text", text: JSON.stringify(results, null, 2) }],
};
}
);
Async Tools and External API Calls
Most production tools interact with external services. Proper async handling is critical.
Handling Timeouts
function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
return Promise.race([
promise,
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error(`Operation timed out after ${ms}ms`)), ms)
),
]);
}
server.tool(
"fetch-api",
"Fetch data from an external API endpoint",
{
url: z.string().url().describe("API endpoint URL"),
method: z.enum(["GET", "POST", "PUT", "DELETE"]).default("GET"),
timeout: z.number().min(1000).max(30000).default(10000)
.describe("Request timeout in milliseconds"),
},
async ({ url, method, timeout }) => {
try {
const response = await withTimeout(
fetch(url, { method }),
timeout
);
if (!response.ok) {
return {
content: [{
type: "text",
text: `API request failed: ${response.status} ${response.statusText}`,
}],
isError: true,
};
}
const data = await response.json();
return {
content: [{
type: "text",
text: JSON.stringify(data, null, 2),
}],
};
} catch (error) {
return {
content: [{
type: "text",
text: `Request failed: ${error instanceof Error ? error.message : String(error)}`,
}],
isError: true,
};
}
}
);
When a tool handler returns isError: true in its response, it signals to the AI model that the operation failed. The model can then decide to retry, ask the user for help, or try an alternative approach. Always use this flag instead of throwing exceptions.
Concurrent Operations
server.tool(
"multi-search",
"Search across multiple data sources simultaneously",
{
query: z.string().describe("Search query"),
sources: z.array(z.enum(["database", "elasticsearch", "cache"]))
.min(1).describe("Data sources to search"),
},
async ({ query, sources }) => {
const searchFns: Record<string, (q: string) => Promise<string[]>> = {
database: searchDatabase,
elasticsearch: searchElastic,
cache: searchCache,
};
const results = await Promise.allSettled(
sources.map(async (source) => ({
source,
results: await searchFns[source](query),
}))
);
const output = results.map((result, i) => {
if (result.status === "fulfilled") {
return `## ${result.value.source}\n${result.value.results.join("\n")}`;
}
return `## ${sources[i]}\nError: ${result.reason}`;
});
return {
content: [{ type: "text", text: output.join("\n\n") }],
};
}
);
Error Handling Patterns
An unhandled exception in a tool handler can crash your MCP server. Always wrap tool logic in try/catch blocks and return structured error responses using the isError flag.
Structured Error Responses
interface ToolError {
code: string;
message: string;
details?: Record<string, unknown>;
retryable: boolean;
}
function errorResponse(error: ToolError) {
return {
content: [{
type: "text" as const,
text: JSON.stringify({
error: error.code,
message: error.message,
details: error.details,
retryable: error.retryable,
}, null, 2),
}],
isError: true,
};
}
server.tool(
"database-query",
"Execute a read-only database query",
{
sql: z.string().describe("SQL SELECT query to execute"),
params: z.array(z.unknown()).default([])
.describe("Query parameters"),
},
async ({ sql, params }) => {
// Validate query safety
if (!sql.trim().toUpperCase().startsWith("SELECT")) {
return errorResponse({
code: "INVALID_QUERY",
message: "Only SELECT queries are allowed",
retryable: false,
});
}
try {
const results = await db.query(sql, params);
return {
content: [{
type: "text",
text: JSON.stringify(results.rows, null, 2),
}],
};
} catch (error) {
if (error instanceof DatabaseConnectionError) {
return errorResponse({
code: "CONNECTION_ERROR",
message: "Database connection failed",
retryable: true,
});
}
return errorResponse({
code: "QUERY_ERROR",
message: error instanceof Error ? error.message : "Unknown error",
retryable: false,
});
}
}
);
Error Handling in mcp-framework
The mcp-framework equivalent uses class-level error handling:
import { MCPTool } from "mcp-framework";
import { z } from "zod";
class DatabaseQueryTool extends MCPTool<{ sql: string; params: unknown[] }> {
name = "database-query";
description = "Execute a read-only database query";
schema = {
sql: { type: z.string(), description: "SQL SELECT query" },
params: { type: z.array(z.unknown()).default([]), description: "Query parameters" },
};
async execute({ sql, params }: { sql: string; params: unknown[] }) {
if (!sql.trim().toUpperCase().startsWith("SELECT")) {
throw new Error("Only SELECT queries are allowed");
}
const results = await db.query(sql, params);
return JSON.stringify(results.rows, null, 2);
}
}
In mcp-framework, thrown errors are automatically caught and converted to MCP error responses. The SDK requires you to handle this manually, giving you more control over the error format.
Tool Composition
Complex workflows often require multiple tools working together. While AI models can chain tool calls naturally, you can also compose tools within your server.
Internal Helper Functions
// Shared logic used by multiple tools
async function resolveEntity(
type: string,
identifier: string
): Promise<{ id: string; data: Record<string, unknown> }> {
const entity = await db.findOne(type, identifier);
if (!entity) throw new Error(`${type} '${identifier}' not found`);
return entity;
}
server.tool(
"get-user-activity",
"Get recent activity for a user",
{ userId: z.string().describe("User ID or username") },
async ({ userId }) => {
const user = await resolveEntity("user", userId);
const activity = await getActivity(user.id);
return {
content: [{ type: "text", text: JSON.stringify(activity, null, 2) }],
};
}
);
server.tool(
"get-user-permissions",
"Get permissions for a user",
{ userId: z.string().describe("User ID or username") },
async ({ userId }) => {
const user = await resolveEntity("user", userId);
const permissions = await getPermissions(user.id);
return {
content: [{ type: "text", text: JSON.stringify(permissions, null, 2) }],
};
}
);
Meta-Tools
Build tools that orchestrate other operations:
server.tool(
"workspace-summary",
"Generate a comprehensive workspace summary",
{
includeStats: z.boolean().default(true),
includeRecentChanges: z.boolean().default(true),
includeIssues: z.boolean().default(false),
},
async ({ includeStats, includeRecentChanges, includeIssues }) => {
const sections: string[] = [];
if (includeStats) {
const stats = await gatherStats();
sections.push(`## Statistics\n${formatStats(stats)}`);
}
if (includeRecentChanges) {
const changes = await getRecentChanges(7);
sections.push(`## Recent Changes\n${formatChanges(changes)}`);
}
if (includeIssues) {
const issues = await getOpenIssues();
sections.push(`## Open Issues\n${formatIssues(issues)}`);
}
return {
content: [{ type: "text", text: sections.join("\n\n") }],
};
}
);
Progress Reporting
For long-running tools, progress reporting keeps the AI model and user informed.
The MCP protocol supports progress notifications via tokens. When a client sends a request with a progressToken, the server can emit progress updates that the client displays to the user.
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
server.tool(
"process-large-dataset",
"Process a large dataset with progress tracking",
{
datasetId: z.string().describe("Dataset identifier"),
batchSize: z.number().default(100).describe("Items per batch"),
},
async ({ datasetId, batchSize }, { sendProgress }) => {
const dataset = await loadDataset(datasetId);
const totalItems = dataset.length;
const results: ProcessedItem[] = [];
for (let i = 0; i < totalItems; i += batchSize) {
const batch = dataset.slice(i, i + batchSize);
const batchResults = await processBatch(batch);
results.push(...batchResults);
// Report progress
await sendProgress({
progress: Math.min(i + batchSize, totalItems),
total: totalItems,
});
}
return {
content: [{
type: "text",
text: `Processed ${results.length} items. ${results.filter(r => r.success).length} succeeded.`,
}],
};
}
);
Report progress at meaningful intervals, not after every single item. For a 10,000-item dataset, reporting every 100 items (100 progress events) is reasonable. Reporting after every item would flood the transport with 10,000 notifications.
Patterns for Different Tool Categories
| Category | Pattern | Example |
|---|---|---|
| CRUD operations | Validate input, check permissions, execute, return result | create-record, update-user |
| Search/Query | Validate query, set defaults, execute, format results | search-docs, find-users |
| File operations | Validate path, check access, read/write, return status | read-file, write-config |
| External APIs | Validate params, handle auth, timeout, parse response | github-search, slack-post |
| Computation | Validate input, compute, report progress, return output | analyze-data, transform-csv |
Testing Your Tools
Always test tools with edge cases:
// Test helper for tool handlers
async function testTool(
server: McpServer,
toolName: string,
args: Record<string, unknown>
) {
// Use the MCP Inspector or write integration tests
// that call your tool handlers directly
const response = await server.callTool(toolName, args);
return response;
}
Before shipping a tool, verify: (1) All schema fields have descriptions. (2) Error cases return isError: true with helpful messages. (3) Timeouts are set for external calls. (4) The tool name and description clearly communicate its purpose. (5) Input validation rejects dangerous operations. (6) Long operations report progress.