Skip to content

Redis Token State

The Redis connector packages provide a production-ready RevocationStore backed by Redis and a stream consumer that populates it automatically from the caracal.sessions.revoke Redis stream. Use them to replace InMemoryRevocationStore in any multi-replica deployment.

  • TypeScript: @caracalai/revocation-redis
  • Python: caracalai-revocation-redis
Terminal window
# TypeScript
npm install @caracalai/revocation-redis
# Python
pip install caracalai-revocation-redis

TypeScript — @caracalai/revocation-redis

Section titled “TypeScript — @caracalai/revocation-redis”
import { RedisRevocationStore } from '@caracalai/revocation-redis';
interface RedisRevocationStoreOptions {
keyPrefix?: string; // Redis key prefix; default: 'caracal:revoked:sessions:'
defaultTtlMs?: number; // Default TTL on markRevoked; default: 86_400_000 (24 hours)
failClosed?: boolean; // On Redis error, treat SID as revoked; default: true
}
class RedisRevocationStore implements RevocationStore {
constructor(redis: RedisRevocationClient, opts?: RedisRevocationStoreOptions)
async isRevoked(sid: string): Promise<boolean>
async markRevoked(sid: string, ttlMs?: number): Promise<void>
}

RedisRevocationClient is a minimal interface. Any Redis client that implements get and set (with PX TTL support) satisfies it — ioredis and the official redis npm package both qualify.

interface RedisRevocationClient {
get(key: string): Promise<string | null>
set(key: string, value: string, mode: 'PX', ttlMs: number): Promise<unknown>
// xgroup, xreadgroup, xack are required only for RedisRevocationConsumer
xgroup?(...args: string[]): Promise<unknown>
xreadgroup?(...args: (string | number)[]): Promise<RedisStreamResult | null>
xack?(stream: string, group: string, id: string): Promise<unknown>
}

failClosed: true (the default) means that if isRevoked() encounters a Redis error, it returns true — the session is treated as revoked and the request is rejected. This prevents serving revoked sessions during Redis outages. Set failClosed: false only if your availability requirements outweigh the risk of briefly serving a revoked session during a Redis failure.

Constants:

export const REVOCATION_STREAM = 'caracal.sessions.revoke'
export const DEFAULT_REVOCATION_TTL_MS = 86_400_000 // 24 hours

The consumer subscribes to the caracal.sessions.revoke Redis stream and calls markRevoked() for each message it processes.

import { RedisRevocationConsumer } from '@caracalai/revocation-redis';
interface RedisRevocationConsumerOptions {
consumer: string; // Unique consumer name per replica; REQUIRED
stream?: string; // Stream name; default: 'caracal.sessions.revoke'
group?: string; // Consumer group name; default: 'resource-revocation'
batchSize?: number; // Messages per poll; default: 50
blockMs?: number; // XREADGROUP BLOCK duration ms; default: 0 (non-blocking)
streamHmacKey?: Buffer; // HMAC key for _sig field verification
requireSignature?: boolean; // Default: true when streamHmacKey is set, false otherwise
}
class RedisRevocationConsumer {
constructor(
redis: RedisRevocationClient,
store: RedisRevocationStore,
opts: RedisRevocationConsumerOptions,
)
async ensureGroup(): Promise<void>
async pollOnce(): Promise<number> // Returns count of messages processed
}

ensureGroup() creates the consumer group on the stream with XGROUP CREATE ... MKSTREAM. It ignores the BUSYGROUP error if the group already exists — safe to call on every startup.

pollOnce() calls XREADGROUP to fetch up to batchSize messages, calls markRevoked() for each message containing a session_id field, and calls XACK for each message (including messages with invalid or missing signatures). Returns the number of messages processed.

Stream message format:

Messages on caracal.sessions.revoke are flat Redis stream entries with at minimum:

session_id <sid>
_sig <hmac_hex> # present when the Gateway has streamHmacKey configured

HMAC verification:

When streamHmacKey is provided, the consumer verifies the _sig field in each message using HMAC-SHA256 with a timing-safe comparison. The HMAC payload is constructed from the stream name and the sorted message fields. Messages that fail signature verification are skipped (not revoked) but are still acknowledged. Messages missing _sig when requireSignature: true are also skipped and acknowledged.


from caracalai_revocation_redis import RedisRevocationStore
class RedisRevocationStore:
def __init__(
self,
redis: RedisClient,
key_prefix: str = 'caracal:revoked:sessions:',
default_ttl_ms: int = 86_400_000,
fail_closed: bool = True,
) -> None: ...
def is_revoked(self, sid: str) -> bool
def mark_revoked(self, sid: str, ttl_ms: int | None = None) -> None

RedisClient is a structural protocol — any Redis client with get(key) and set(key, value, px=ttl_ms) satisfies it. redis-py satisfies this protocol.


from caracalai_revocation_redis import RedisRevocationConsumer
class RedisRevocationConsumer:
def __init__(
self,
redis: Any,
store: RedisRevocationStore,
consumer: str,
stream: str = 'caracal.sessions.revoke',
group: str = 'resource-revocation',
batch_size: int = 50,
block_ms: int = 0,
stream_hmac_key: bytes | None = None,
require_signature: bool | None = None,
) -> None: ...
def ensure_group(self) -> None
def poll_once(self) -> int

Both ensure_group() and poll_once() are synchronous. Run poll_once() in a background thread or a synchronous worker loop. Do not call it from an async event loop without wrapping it in asyncio.to_thread().

HMAC signature verification behavior is identical to the TypeScript implementation: _sig is verified with hmac.compare_digest() for timing safety.

Constants:

REVOCATION_STREAM = 'caracal.sessions.revoke'
DEFAULT_REVOCATION_TTL_MS = 86_400_000 # 24 hours
STREAM_SIG_FIELD = '_sig'

import { RedisRevocationStore, RedisRevocationConsumer } from '@caracalai/revocation-redis';
import { createClient } from 'redis';
const redis = createClient({ url: process.env.REDIS_URL });
await redis.connect();
const revocations = new RedisRevocationStore(redis, {
failClosed: true,
defaultTtlMs: 86_400_000,
});
const consumer = new RedisRevocationConsumer(redis, revocations, {
consumer: `api-${process.env.INSTANCE_ID ?? 'default'}`,
stream: 'caracal.sessions.revoke',
group: 'resource-revocation',
batchSize: 50,
streamHmacKey: Buffer.from(process.env.CARACAL_STREAM_HMAC_KEY!, 'hex'),
});
await consumer.ensureGroup();
// Poll in background at 1-second intervals
const poll = () => consumer.pollOnce().then(() => setTimeout(poll, 1000));
poll();
import os
import threading
import redis
from caracalai_revocation_redis import RedisRevocationStore, RedisRevocationConsumer
r = redis.Redis.from_url(os.environ['REDIS_URL'])
revocations = RedisRevocationStore(r, fail_closed=True)
consumer = RedisRevocationConsumer(
redis=r,
store=revocations,
consumer=f'api-{os.environ.get("INSTANCE_ID", "default")}',
stream_hmac_key=bytes.fromhex(os.environ['CARACAL_STREAM_HMAC_KEY']),
)
consumer.ensure_group()
def _poll_loop():
while True:
consumer.poll_once()
threading.Thread(target=_poll_loop, daemon=True).start()

Each running replica must use a distinct consumer name within the same group. If two replicas share a consumer name, one may consume and acknowledge messages the other never processes. Use INSTANCE_ID, the pod name, or the hostname to derive a unique name per replica.

The Gateway publishes to the caracal.sessions.revoke stream within seconds of a session revocation. Services that run RedisRevocationConsumer pick up the event within one pollOnce() cycle — typically under 5 seconds end-to-end. Services that do not run a consumer continue to accept mandates from revoked sessions until the mandate’s 15-minute TTL expires.