gcx/backend/migrations/024_create_outbox.sql

46 lines
2.4 KiB
SQL

-- 024: Transactional Outbox table (Outbox Pattern for guaranteed Kafka delivery)
-- Every service writes domain events to this table in the SAME transaction as the business data.
-- A separate relay process (OutboxRelay) polls this table and publishes to Kafka.
-- This guarantees exactly-once semantics: no event is lost, no event is duplicated.
--
-- Idempotency: consumers use (aggregate_id + event_id) as idempotency key.
-- Events expire after 24h (idempotency window).
CREATE TABLE IF NOT EXISTS outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL, -- e.g. 'User', 'Coupon', 'Order', 'Trade'
aggregate_id UUID NOT NULL, -- ID of the business entity
event_type VARCHAR(100) NOT NULL, -- e.g. 'user.registered', 'trade.matched'
topic VARCHAR(100) NOT NULL, -- Kafka topic name
partition_key VARCHAR(100), -- Kafka partition key (for ordering)
payload JSONB NOT NULL, -- Event payload
headers JSONB DEFAULT '{}', -- Additional headers (traceId, source, etc.)
status VARCHAR(20) NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'published', 'failed')),
retry_count SMALLINT NOT NULL DEFAULT 0,
max_retries SMALLINT NOT NULL DEFAULT 5,
published_at TIMESTAMPTZ,
expires_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() + INTERVAL '24 hours'),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Index for the relay poller: find pending events efficiently
CREATE INDEX idx_outbox_status_created ON outbox(status, created_at) WHERE status = 'pending';
-- Index for idempotency lookups
CREATE INDEX idx_outbox_aggregate ON outbox(aggregate_type, aggregate_id);
-- Index for cleanup of expired events
CREATE INDEX idx_outbox_expires ON outbox(expires_at) WHERE status = 'published';
-- Idempotency tracking: consumers record processed event IDs here
CREATE TABLE IF NOT EXISTS processed_events (
event_id UUID PRIMARY KEY,
consumer_group VARCHAR(100) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() + INTERVAL '24 hours')
);
CREATE INDEX idx_processed_events_consumer ON processed_events(consumer_group);
CREATE INDEX idx_processed_events_expires ON processed_events(expires_at);
-- Cleanup job: remove expired outbox entries and processed_events (run daily)
-- This keeps the tables lean while maintaining the 24h idempotency window.