Skip to content

redis-stream-queue

Async Python library for Redis Streams consumer groups with built-in crash recovery, DLQ, and monitoring.

Get started Quick Start


Features

  • Producer — push messages from any number of pods; XADD is atomic, no coordination needed
  • Consumer — callback-based read → ACK loop; partial ACK supported
  • Crash recoveryXAUTOCLAIM cursor loop sweeps full PEL per iteration; NOGROUP auto-recovery if stream deleted externally
  • Dead-letter queue — decode errors and poison pills (exceeding max_deliveries) routed to DLQ handler
  • Consumer metricstps_in / tps_out / tps_total (60 s sliding window), avg TPS, read/acked/DLQ/error counters
  • Producer metrics — push TPS, total pushed, avg TPS, uptime
  • Process-wide aggregationStreamConsumer.all_metrics() / StreamProducer.all_metrics() collect from all live instances via weakref registry; zero Redis overhead
  • Stream monitoring — lag, PEL size, per-consumer idle time, health checks via ConsumerGroup
  • Pluggable serializers — JSON (default), msgpack, pickle — or bring your own
  • Redis Clusterfrom_cluster() and from_url() factory methods
  • Multi-pod safe — unique worker names auto-generated per pod ({group}_{hostname}_{rand4})

Requirements

  • Python ≥ 3.11
  • Redis ≥ 6.2 (XAUTOCLAIM support)