Skip to content

Event Streams and Outbox

Caracal uses Redis Streams as its event bus. State changes that need to propagate across service boundaries — session revocation, policy activation, key rotation, audit events — are published to named streams and consumed by subscriber services. All production stream messages are HMAC-signed. The transactional outbox pattern guarantees that events are published if and only if the database transaction that produced them committed.

StreamProducerConsumer group(s)Purpose
caracal.audit.eventsSTSaudit-ingestor, siem-exportEvery OPA evaluation (allow and deny)
caracal.audit.events.dlqAudit serviceaudit-dlq-observerEvents that failed ingest after max retries
caracal.sessions.revokeCoordinator, APIsts-revocation, gateway-revocationSession termination and delegation revocation
caracal.policy.invalidateAPIopa-enginePolicy set activation, forcing OPA bundle reload
caracal.keys.invalidateAPIsts-keysZone signing key rotation, flushing STS key cache
caracal.agents.lifecycleCoordinatorcoordinator-relayAgent spawn, suspend, resume, terminate events
caracal.invocations.lifecycleCoordinatorinvocations-observerAgent invocation state transitions
caracal.delegations.invalidateCoordinatordelegations-observerDelegation edge creation and revocation
caracal.providers.ratelimitAPIRate limit tracking for credential providers

At startup the init container runs provision-streams.sh before any service starts. For each stream and consumer group:

Terminal window
redis-cli XGROUP CREATE "${stream}" "${group}" '$' MKSTREAM

MKSTREAM creates the stream if it does not exist. $ positions the group cursor at the end of the stream — each group only receives messages published after it was created. The script is idempotent: BUSYGROUP errors (group already exists) are ignored.

The init container exits with code 0 after all groups are created. Services wait for the init container’s successful completion before starting.


Direct XADD calls inside a database transaction create a dual-write problem: if Redis is unavailable at commit time, the event is lost. Caracal solves this with a transactional outbox.

Write path:

BEGIN;
UPDATE agent_sessions SET status = 'terminated' ...;
INSERT INTO caracal_outbox
(id, producer, topic, dedupe_key, payload_json, status, available_at)
VALUES (uuid_v7(), 'coordinator', 'caracal.sessions.revoke',
$dedupe_key, $payload, 'pending', now());
COMMIT;

The event is written to the database in the same transaction as the state change. If the transaction rolls back, both the state change and the event row are rolled back together. If the transaction commits, the event is guaranteed to be eventually published.

Publish path (background job):

LOOP every {poll_interval}:
SELECT id, topic, payload_json, attempts
FROM caracal_outbox
WHERE producer = 'coordinator'
AND status = 'pending'
AND available_at <= now()
ORDER BY created_at
LIMIT {batch_size}
FOR UPDATE SKIP LOCKED
FOR EACH row:
XADD {topic} MAXLEN ~ {maxlen} * {fields from payload_json}
IF success: UPDATE status = 'published', published_at = now()
IF transient: UPDATE attempts++, available_at += backoff(attempts)
IF max_retries: UPDATE status = 'dead'

FOR UPDATE SKIP LOCKED prevents two publisher instances from processing the same row simultaneously. dead rows remain in the table for manual inspection.

The dedupe_key column has a unique constraint per (producer, topic, dedupe_key). This prevents the same logical event from being inserted twice under any retry condition.

Backoff formula: min(baseMs × 2^attempt, 5000) / 2 + random() × 5000 / 2. Maximum delay per retry: 5 seconds.


In production, all messages written to Redis streams include an HMAC-SHA256 signature:

key = STREAMS_HMAC_KEY (≥32 bytes, hex-encoded)
input = "{stream_name}\n" + sorted(field_name=field_value pairs)
sig = HMAC-SHA256(key, input)
field = "_sig" → hex(sig) [written into the stream message]

The _sig field is excluded from its own signature. Consumers verify the signature before acting on any message. An unsigned message in production mode is dropped with a warning logged. Consumers in development mode (no STREAMS_HMAC_KEY) skip verification.

This protects against an attacker with Redis write access injecting forged revocations, forged policy invalidations, or spoofed audit events.


The STS publishes to caracal.audit.events asynchronously via the AuditBuffer. Each message carries:

FieldContent
idAudit event UUID
dataJSON-encoded audit event
sigHMAC-SHA256 over data under AUDIT_HMAC_KEY

The audit-ingestor consumer group is processed by the Audit service. It reads batches of 100 messages, verifies the sig field, writes the event to Postgres with an HMAC chain (see Cryptography and Keys), and sends XACK only after the Postgres INSERT commits. Transient failures leave the message in the Pending Entry List for retry. After maxDeliv failed deliveries the message is moved to caracal.audit.events.dlq and acknowledged from the main stream.

The siem-export consumer group allows an external SIEM to consume the same stream independently, without affecting the ingestor’s cursor position.


The caracal.sessions.revoke stream carries session termination events:

Fields:
zone_id → zone identifier
session_id → session SID (the 'sid' claim in mandates)
_sig → HMAC-SHA256 signature

STS consumer (sts-revocation): On each message, calls db.RevokeSession(ctx, zoneID, sessionID) to mark the session as revoked in Postgres. Subsequent exchange requests for that agent_session_id load the revoked status from DB and fail before OPA evaluation.

Gateway consumer (gateway-revocation): Updates an in-memory revocationStore. On every inbound request the Gateway calls revocationStore.IsRevoked(sid) using the sid claim from the mandate. If revoked, the request is rejected with HTTP 401. During active streaming responses, the Gateway checks at every 4 KB chunk boundary and truncates the stream if revocation is detected. Revocation entries are retained for 24 hours and garbage-collected on a 30-minute cycle.


When an operator activates a policy set via the API, the API publishes to caracal.policy.invalidate:

Fields:
zone_id → zone whose OPA bundle should be reloaded
_sig → HMAC-SHA256 signature

The STS subscribes via the opa-engine consumer group. On receipt it calls opa.Reload(ctx, zoneID), which re-fetches the zone’s policy bundle from Postgres and recompiles it. A 60-second Postgres poll runs as a safety net: even if the stream message is missed, the OPA engine detects the new policy version within one poll cycle.


When the API rotates a zone’s signing key, it publishes to caracal.keys.invalidate:

Fields:
zone_id → zone whose key cache should be flushed
_sig → HMAC-SHA256 signature

The STS subscribes via sts-keys. On receipt it calls KeyCache.Invalidate(zoneID), evicting the in-memory entry. The next token issuance for that zone fetches and decrypts the new signing key from Postgres. Without this stream signal, a rotated key would take up to 15 minutes to take effect (the in-memory key cache TTL).


All consumers use XREADGROUP with XACK sent only after successful processing — at-least-once delivery. A consumer that crashes between XREADGROUP and XACK leaves messages in the Pending Entry List. A reapLoop goroutine reclaims idle PEL entries after a configurable idle threshold.

The STS assigns each consumer instance a unique ID as {hostname}-{pid}. Multiple STS replicas can consume the same consumer group; Redis distributes messages across active consumers within the group.