Skip to content

Consumer Loop Internals

Four-Step Lifecycle

Each run_once() call executes four steps:

1. ensure()           — XGROUP CREATE mkstream; no-op if group known in class-level registry.
                        Registry entry removed on NOGROUP — re-creation runs on next iteration.

2. XREADGROUP ">"     — fetch new undelivered messages
   ├─ decode error    → dlq_handler(msg, "decode_error") + XACK (no retry)
   ├─ handler(msgs)   → XACK returned IDs; total_acked += n; tps_out tracker updated
   ├─ tps_in          updated with len(raw_messages)
   ├─ handler → None  → warning logged; no XACK (treat as explicit "ACK nothing")
   └─ unacked IDs     stay in PEL for XAUTOCLAIM recovery

3. XAUTOCLAIM cursor loop — reclaims msgs idle > min_idle_claim_ms
   ├─ follows cursor until Redis returns "0-0" (full PEL swept) or stall detected
   ├─ max_claim_passes caps iterations if set; None = unlimited (default)
   ├─ tps_in          updated with len(claimed) per batch
   └─ reclaimed msgs  → same handler → XACK; tps_out updated

4. XPENDING sweep     — find entries with delivery_count >= max_deliveries
   ├─ no dlq_handler  → warning logged with IDs; still ACKed (message cleared)
   ├─ msg missing from stream (XDEL'd) → warning logged; still ACKed
   └─ poison pills    → dlq_handler(msg, "max_deliveries") + batched XACK

run() wraps run_once() in an infinite loop:

├─ CancelledError  → re-raised immediately (clean shutdown)
├─ NOGROUP error   → registry entry cleared; sleep 1s; re-enter loop
└─ any other error → logged; sleep 1s; re-enter loop

Metrics Updated Per Iteration

  • total_read — incremented on every XREADGROUP / XAUTOCLAIM batch
  • tps_in — sliding tracker records every read batch (new + reclaimed)
  • total_acked — incremented after each successful handler → XACK
  • tps_out — sliding tracker records every ack batch
  • total_dlq — incremented for decode errors + poison pill ACKs
  • total_errors — incremented for handler exceptions

Sequence Diagrams

1. Producer: Push Message

uml diagram

2. Consumer: Normal Message Processing

uml diagram

3. Crash Recovery: XAUTOCLAIM Cursor Loop

uml diagram

4. Decode Error → Immediate DLQ

uml diagram

5. Poison Pill → DLQ After Max Deliveries

uml diagram

6. NOGROUP Auto-Recovery

uml diagram

7. Full run_once() Lifecycle with Metrics

uml diagram