Skip to content

Protect an MCP Server

The MCP transport packages provide authenticate() and extractBearer() as the two building blocks for mandate-gated MCP servers. Every tool call must arrive with an Authorization: Bearer <mandate> header. The mandate is verified against the zone’s JWKS and checked against the revocation store before the tool handler runs.

  • An MCP server that receives HTTP requests (not stdio transport — stdio has no auth header).
  • A Caracal zone with the STS JWKS endpoint reachable from the server process.
  • A revocation store (in-memory for development; Redis-backed for production).
Terminal window
npm install @caracalai/transport-mcp @caracalai/revocation
import { authenticate, extractBearer } from '@caracalai/transport-mcp';
import { InMemoryRevocationStore } from '@caracalai/revocation';
const revocations = new InMemoryRevocationStore();
async function handleToolCall(req: Request): Promise<Response> {
const token = extractBearer(req.headers.get('Authorization') ?? undefined);
if (!token) {
return Response.json({ error: 'missing_token' }, { status: 401 });
}
const result = await authenticate(token, {
issuer: process.env.CARACAL_STS_URL!, // e.g. http://sts:8080
audience: 'resource://my-mcp-server',
zoneId: process.env.CARACAL_ZONE_ID!,
requiredScopes: ['tool:call'],
requireAgent: true,
revocations,
});
if (!result.ok) {
const status = result.error.code === 'missing_token' ? 401 : 403;
return Response.json({ error: result.error.code }, { status });
}
const claims = result.principal;
// claims.sub, claims.agentSessionId, claims.scope, claims.zoneId, ...
return runTool(claims, req);
}
FieldTypeRequiredDescription
issuerstringYesSTS base URL; JWKS fetched from {issuer}/.well-known/jwks.json
audiencestringYesMust match the aud claim in the mandate
zoneIdstringNoRejects mandates from a different zone
requiredScopesstring[]NoAll listed scopes must be present in the mandate
requireAgentbooleanNoRejects mandates without agent_session_id
requireDelegationbooleanNoRejects mandates without delegation_edge_id
requireChainContainsstring[]NoAll listed application IDs must appear in delegation_chain
maxHopCountnumberNoRejects mandates with hop_count above this value (default: 10)
revocationsRevocationStoreYesChecked against the mandate’s sid claim
CodeHTTPMeaning
missing_token401No Authorization header or not a bearer token
invalid_token401JWT signature invalid, expired, or malformed
invalid_zone401zone_id claim does not match zoneId option
insufficient_scope403Mandate is missing one or more requiredScopes
session_revoked403The mandate’s sid is in the revocation store
agent_required403requireAgent is true but agent_session_id is absent
delegation_required403requireDelegation is true but delegation_edge_id is absent
chain_mismatch403A required application ID is not in delegation_chain
hop_count_exceeded403hop_count exceeds maxHopCount

Terminal window
pip install caracalai-transport-mcp caracalai-revocation
from caracalai_transport_mcp import authenticate, extract_bearer
from caracalai_revocation import InMemoryRevocationStore
revocations = InMemoryRevocationStore()
async def handle_tool_call(request) -> dict:
token = extract_bearer(request.headers.get('Authorization'))
if not token:
return {'error': 'missing_token'}, 401
result = await authenticate(
token,
issuer='http://sts:8080',
audience='resource://my-mcp-server',
expected_zone_id='my-zone',
required_scopes=['tool:call'],
require_agent=True,
revocations=revocations,
)
if not result.ok:
status = 401 if result.error.code in ('missing_token', 'invalid_token') else 403
return {'error': result.error.code, 'description': result.error.description}, status
claims = result.principal
# claims.sub, claims.agent_session_id, claims.scope, claims.zone_id, ...
return await run_tool(claims, request)
@dataclass(frozen=True)
class AuthResult:
principal: Claims | None # Populated on success
error: AuthError | None # Populated on failure
@property
def ok(self) -> bool: ...
@dataclass(frozen=True)
class AuthError:
code: str # One of the error codes listed above
description: str

In production, replace InMemoryRevocationStore with the Redis-backed store so all replicas share revocation state:

TypeScript:

import { RedisRevocationStore } from '@caracalai/revocation-redis';
import { createClient } from 'redis';
const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();
const revocations = new RedisRevocationStore(redis, {
keyPrefix: 'caracal:revoked:sessions:',
defaultTtlMs: 24 * 60 * 60 * 1000,
failClosed: true,
});

Python:

import redis
from caracalai_revocation_redis import RedisRevocationStore, RedisRevocationConsumer
r = redis.Redis.from_url(os.environ['REDIS_URL'])
revocations = RedisRevocationStore(redis=r, fail_closed=True)
consumer = RedisRevocationConsumer(
redis=r,
store=revocations,
consumer='mcp-server-1',
)
consumer.ensure_group()
# Run consumer.poll_once() in a background thread/task

fail_closed=True means any Redis error during is_revoked() is treated as revoked — the mandate is rejected rather than let through. Set fail_closed=False only if availability is more important than security in your threat model.