Skip to content

API Reference

StreamClient

redis_stream_queue.StreamClient

Async Redis Stream client with singleton connection pool per (host, port, db). Supports single-node, cluster, and URL connection modes.

autoclaim async

autoclaim(
    stream: str,
    group: str,
    consumer: str,
    min_idle_ms: int,
    count: int,
    max_passes: int | None = None,
) -> list[StreamMessage]

Reclaim idle PEL entries via XAUTOCLAIM.

Follows the cursor until the full PEL is swept (cursor returns "0-0"). max_passes caps the number of XAUTOCLAIM calls per invocation; None = unlimited. Stall detection (cursor unchanged) guards against misbehaving Redis forks.

StreamProducer

redis_stream_queue.StreamProducer

Pushes messages onto a Redis Stream.

Safe to use from multiple pods simultaneously — Redis XADD is atomic. Call metrics() for this instance or all_metrics() for all live instances in this process.

push async

push(data: dict) -> str

Serialize and publish one message. Returns the Redis stream entry ID.

push_many async

push_many(data: list[dict]) -> list[str]

Serialize and publish multiple messages via pipeline (one round-trip). Returns list of entry IDs.

ensure_group async

ensure_group(group: str | None = None) -> None

Idempotently create the consumer group for this stream.

all_metrics classmethod

all_metrics() -> list[ProducerMetrics]

Return metrics for all live StreamProducer instances in this process.

metrics

metrics() -> ProducerMetrics

Return current push counters and TPS (non-blocking, no Redis calls).

StreamConsumer

redis_stream_queue.StreamConsumer

Async Redis Stream consumer using consumer groups.

Lifecycle per iteration (run_once): 1. Ensure consumer group exists (idempotent — no-op after first call) 2. XREADGROUP ">" — fetch new messages, pass to handler, ACK returned IDs 3. XAUTOCLAIM — reclaim orphaned messages from crashed consumers, re-process 4. Poison-pill sweep — messages exceeding max_deliveries go to DLQ via RetryHandler

handler(messages) must return the list of IDs it successfully processed. Messages whose IDs are NOT returned stay in PEL for reclaim recovery.

Safe for multiple pods: each pod gets a unique worker_name; consumer groups distribute messages and XAUTOCLAIM provides crash recovery across pods.

Call metrics() for this instance or all_metrics() for all live instances in this process.

all_metrics classmethod

all_metrics() -> list[ConsumerMetrics]

Return metrics for all live StreamConsumer instances in this process.

stop

stop() -> None

Signal the consumer to exit after the current iteration.

metrics

metrics() -> ConsumerMetrics

Return current throughput counters and TPS (non-blocking, no Redis calls).

run async

run() -> None

Infinite loop. Catches all exceptions and restarts. Exits on stop() or CancelledError.

run_once async

run_once() -> None

Execute one full consumer iteration.

ConsumerConfig

redis_stream_queue.ConsumerConfig dataclass

Configuration for a stream consumer.

ConsumerGroup

redis_stream_queue.ConsumerGroup

Manages and inspects a Redis Stream consumer group.

Provides group creation (idempotent), stats, health checks, and pending entry inspection.

Group existence is tracked in a class-level registry keyed by (pool_key, stream, group), so multiple instances targeting the same group share the cache. Call reset() to force re-creation (e.g. after external FLUSHALL or XGROUP DESTROY).

ensure async

ensure(
    dlq_stream: str | None = None,
    dlq_group: str | None = None,
) -> None

Create consumer group (and DLQ group) if not already known. No-op otherwise.

reset

reset() -> None

Remove this group from the known registry — next ensure() will re-create it.

stats async

stats(dlq_stream: str | None = None) -> StreamStats

Return stream length, lag, PEL size, and per-consumer info.

health_check async

health_check(
    max_lag: int = 10000,
    max_idle_ms: int = 60000,
    dlq_stream: str | None = None,
) -> dict

Return health status dict with issue descriptions.

pending_details async

pending_details(count: int = 100) -> list[PendingEntry]

Return details for all pending (unacknowledged) entries.

RetryHandler

redis_stream_queue.RetryHandler

Detects poison pills (messages exceeding max_deliveries) and routes them to DLQ.

Integrates with StreamConsumer to offload all retry/DLQ concerns from the main loop.

handle_poison_pills async

handle_poison_pills() -> int

Find messages in PEL that exceeded max_deliveries, route to DLQ, then ACK. Returns count of poison pills processed. Called once per consumer loop iteration.

send_to_dlq async

send_to_dlq(msg: StreamMessage, reason: str) -> None

Route a single message to the DLQ handler. Swallows handler errors.


Data Classes

StreamMessage

redis_stream_queue.StreamMessage dataclass

PendingEntry

redis_stream_queue.PendingEntry dataclass

ConsumerMetrics

redis_stream_queue.ConsumerMetrics dataclass

Runtime throughput and error counters for a StreamConsumer.

ProducerMetrics

redis_stream_queue.ProducerMetrics dataclass

Runtime throughput counters for a StreamProducer.

StreamStats

redis_stream_queue.StreamStats dataclass

ConsumerInfo

redis_stream_queue.ConsumerInfo dataclass


Serializers

Serializer Protocol

redis_stream_queue.Serializer

Bases: Protocol

JsonSerializer

redis_stream_queue.JsonSerializer

MsgpackSerializer

redis_stream_queue.MsgpackSerializer

PickleSerializer

redis_stream_queue.PickleSerializer


Type Aliases

DLQHandler

redis_stream_queue.DLQHandler module-attribute

DLQHandler = Callable[[StreamMessage, str], Awaitable[None]]