API Reference¶
StreamClient¶
redis_stream_queue.StreamClient
¶
Async Redis Stream client with singleton connection pool per (host, port, db). Supports single-node, cluster, and URL connection modes.
autoclaim
async
¶
autoclaim(
stream: str,
group: str,
consumer: str,
min_idle_ms: int,
count: int,
max_passes: int | None = None,
) -> list[StreamMessage]
Reclaim idle PEL entries via XAUTOCLAIM.
Follows the cursor until the full PEL is swept (cursor returns "0-0"). max_passes caps the number of XAUTOCLAIM calls per invocation; None = unlimited. Stall detection (cursor unchanged) guards against misbehaving Redis forks.
StreamProducer¶
redis_stream_queue.StreamProducer
¶
Pushes messages onto a Redis Stream.
Safe to use from multiple pods simultaneously — Redis XADD is atomic. Call metrics() for this instance or all_metrics() for all live instances in this process.
push
async
¶
Serialize and publish one message. Returns the Redis stream entry ID.
push_many
async
¶
Serialize and publish multiple messages via pipeline (one round-trip). Returns list of entry IDs.
ensure_group
async
¶
Idempotently create the consumer group for this stream.
all_metrics
classmethod
¶
Return metrics for all live StreamProducer instances in this process.
metrics
¶
Return current push counters and TPS (non-blocking, no Redis calls).
StreamConsumer¶
redis_stream_queue.StreamConsumer
¶
Async Redis Stream consumer using consumer groups.
Lifecycle per iteration (run_once): 1. Ensure consumer group exists (idempotent — no-op after first call) 2. XREADGROUP ">" — fetch new messages, pass to handler, ACK returned IDs 3. XAUTOCLAIM — reclaim orphaned messages from crashed consumers, re-process 4. Poison-pill sweep — messages exceeding max_deliveries go to DLQ via RetryHandler
handler(messages) must return the list of IDs it successfully processed. Messages whose IDs are NOT returned stay in PEL for reclaim recovery.
Safe for multiple pods: each pod gets a unique worker_name; consumer groups distribute messages and XAUTOCLAIM provides crash recovery across pods.
Call metrics() for this instance or all_metrics() for all live instances in this process.
ConsumerConfig¶
redis_stream_queue.ConsumerConfig
dataclass
¶
Configuration for a stream consumer.
ConsumerGroup¶
redis_stream_queue.ConsumerGroup
¶
Manages and inspects a Redis Stream consumer group.
Provides group creation (idempotent), stats, health checks, and pending entry inspection.
Group existence is tracked in a class-level registry keyed by (pool_key, stream, group), so multiple instances targeting the same group share the cache. Call reset() to force re-creation (e.g. after external FLUSHALL or XGROUP DESTROY).
ensure
async
¶
Create consumer group (and DLQ group) if not already known. No-op otherwise.
reset
¶
Remove this group from the known registry — next ensure() will re-create it.
stats
async
¶
Return stream length, lag, PEL size, and per-consumer info.
health_check
async
¶
health_check(
max_lag: int = 10000,
max_idle_ms: int = 60000,
dlq_stream: str | None = None,
) -> dict
Return health status dict with issue descriptions.
pending_details
async
¶
Return details for all pending (unacknowledged) entries.
RetryHandler¶
redis_stream_queue.RetryHandler
¶
Detects poison pills (messages exceeding max_deliveries) and routes them to DLQ.
Integrates with StreamConsumer to offload all retry/DLQ concerns from the main loop.
Data Classes¶
StreamMessage¶
redis_stream_queue.StreamMessage
dataclass
¶
PendingEntry¶
redis_stream_queue.PendingEntry
dataclass
¶
ConsumerMetrics¶
redis_stream_queue.ConsumerMetrics
dataclass
¶
Runtime throughput and error counters for a StreamConsumer.
ProducerMetrics¶
redis_stream_queue.ProducerMetrics
dataclass
¶
Runtime throughput counters for a StreamProducer.
StreamStats¶
redis_stream_queue.StreamStats
dataclass
¶
ConsumerInfo¶
redis_stream_queue.ConsumerInfo
dataclass
¶
Serializers¶
Serializer Protocol¶
redis_stream_queue.Serializer
¶
Bases: Protocol