Skip to content

Identity Package

The identity packages verify ES256-signed Caracal mandates against a zone’s JWKS endpoint. They handle JWKS fetching and caching internally. Each package exposes the same conceptual API — verify / Verify — with consistent Config and Claims types across languages.

Terminal window
npm install @caracalai/identity
import { verify } from '@caracalai/identity';
async function verify(token: string, config: JwtConfig): Promise<Claims>

Fetches the JWKS from {config.issuer}/.well-known/jwks.json, selects the key matching the kid header, verifies the ES256 signature, checks expiry, and enforces all config constraints. JWKS responses are cached for 5 minutes per issuer with stale-while-revalidate on error.

interface JwtConfig {
issuer: string; // STS base URL; JWKS at {issuer}/.well-known/jwks.json
audience: string; // Must be present in the 'aud' claim
zoneId?: string; // If set, 'zone_id' claim must match
requiredScopes?: string[]; // All listed scopes must be in the 'scope' claim
requireAgent?: boolean; // Reject tokens without 'agent_session_id'
requireDelegation?: boolean; // Reject tokens without 'delegation_edge_id'
requireChainContains?: string[]; // All IDs must appear in 'delegation_chain'
maxHopCount?: number; // Reject tokens with 'hop_count' above this (default: 10)
}
interface Claims {
sub: string;
zoneId: string;
clientId: string; // Application ID
sid: string; // Session ID (revocation key)
scope: string; // Space-separated OAuth scopes
agentSessionId?: string;
delegationEdgeId?: string;
sourceSessionId?: string;
targetSessionId?: string;
delegationPath?: string[];
delegationChain?: ChainHop[];
graphEpoch?: number;
hopCount?: number;
}
interface ChainHop {
applicationId: string;
agentSessionId?: string;
delegationEdgeId?: string;
}
// Check if a space-separated scope string contains the target scope
function hasScope(scopeStr: string, target: string): boolean
// Check if an application ID appears in the delegation chain or as clientId
function verifyChainContains(claims: Claims, applicationId: string): boolean

All errors extend Error. Catch the specific class to identify the failure:

ClassMeaning
TokenInvalidErrorJWT signature invalid, expired, or malformed
ZoneInvalidErrorzone_id claim does not match config.zoneId
ScopeInsufficientErrorMissing a required scope; has .missingScope: string
AgentIdentityRequiredErrorrequireAgent is true but agent_session_id is absent
DelegationRequiredErrorrequireDelegation is true but delegation_edge_id is absent
ChainMismatchErrorRequired app not in chain; has .missingApplicationId: string
HopCountExceededErrorhop_count exceeds maxHopCount

Terminal window
pip install caracalai-identity
from caracalai_identity import verify_config, JwtConfig
async def verify_config(token: str, config: JwtConfig) -> Claims: ...

verify_token(token, issuer, audience, ...)

Section titled “verify_token(token, issuer, audience, ...)”

Convenience function for simple cases:

async def verify_token(
token: str,
issuer: str,
audience: str,
required_scopes: list[str] | None = None,
expected_zone_id: str | None = None,
) -> dict[str, Any]: ...

Returns the raw decoded claims dict. Use verify_config for full constraint enforcement.

@dataclass
class JwtConfig:
issuer: str
audience: str
expected_zone_id: str | None = None
required_scopes: list[str] = field(default_factory=list)
require_agent: bool = False
require_delegation: bool = False
require_chain_contains: list[str] = field(default_factory=list)
max_hop_count: int | None = None
@dataclass
class Claims:
sub: str
zone_id: str
client_id: str
sid: str
scope: str
agent_session_id: str | None = None
delegation_edge_id: str | None = None
source_session_id: str | None = None
target_session_id: str | None = None
delegation_path: list[str] = field(default_factory=list)
delegation_chain: list[ChainHop] = field(default_factory=list)
graph_epoch: int | None = None
hop_count: int | None = None
@dataclass
class ChainHop:
application_id: str
agent_session_id: str | None = None
delegation_edge_id: str | None = None
from caracalai_identity import has_scope, verify_chain_contains
def has_scope(scope_str: str, target: str) -> bool: ...
def verify_chain_contains(claims: Claims, application_id: str) -> bool: ...
ClassMeaning
TokenInvalidError(ValueError)JWT signature invalid, expired, or malformed
ZoneInvalidError(ValueError)Zone mismatch
ScopeInsufficientError(PermissionError)Missing scope; .missing_scope: str
AgentIdentityRequiredError(PermissionError)Agent session required but absent
DelegationRequiredError(PermissionError)Delegation edge required but absent
ChainMismatchError(PermissionError)Required app not in chain; .missing_application_id: str
HopCountExceededError(PermissionError)Hop count exceeded

Go — github.com/garudex-labs/caracal/identity

Section titled “Go — github.com/garudex-labs/caracal/identity”
Terminal window
go get github.com/garudex-labs/caracal/identity
import "github.com/garudex-labs/caracal/identity"
func Verify(tokenStr string, cfg Config) (Claims, error)
type Config struct {
Issuer string
Audience string
ZoneID string
RequiredScopes []string
RequireAgent bool
RequireDelegation bool
RequireChainContains []string
MaxHopCount int // 0 uses DefaultMaxHopCount (10)
}
const DefaultMaxHopCount = 10
type Claims struct {
Sub string
ZoneID string
ClientID string
Sid string
Scope string
AgentSessionID string
DelegationEdgeID string
SourceSessionID string
TargetSessionID string
DelegationPath []string
DelegationChain []ChainHop
GraphEpoch int64
HopCount int
}
type ChainHop struct {
ApplicationID string
AgentSessionID string
DelegationEdgeID string
}
func HasScope(scopeStr, target string) bool
func VerifyChainContains(claims Claims, applicationID string) bool
var (
ErrTokenInvalid = errors.New("token validation failed")
ErrZoneInvalid = errors.New("token zone validation failed")
ErrAgentIdentityRequired = errors.New("agent identity required")
ErrDelegationRequired = errors.New("delegation required")
ErrHopCountExceeded = errors.New("hop count exceeded")
)
type ScopeMissingError struct {
Scope string
}
type ChainMismatchError struct {
ApplicationID string
}

Use errors.As to inspect structured errors:

claims, err := identity.Verify(token, cfg)
var scopeErr *identity.ScopeMissingError
if errors.As(err, &scopeErr) {
log.Printf("missing scope: %s", scopeErr.Scope)
}

All three packages cache JWKS responses per issuer with a 5-minute TTL. On cache miss or expiry, the package fetches {issuer}/.well-known/jwks.json. On fetch failure during a stale entry, the stale entry is used (stale-while-revalidate) and an error is logged. The kid header in the JWT determines which key from the JWKS response is used for verification.

The JWKS endpoint returns the two most recent zone signing keys, enabling zero-downtime key rotation.