Consumer Loop Internals¶
Four-Step Lifecycle¶
Each run_once() call executes four steps:
1. ensure() — XGROUP CREATE mkstream; no-op if group known in class-level registry.
Registry entry removed on NOGROUP — re-creation runs on next iteration.
2. XREADGROUP ">" — fetch new undelivered messages
├─ decode error → dlq_handler(msg, "decode_error") + XACK (no retry)
├─ handler(msgs) → XACK returned IDs; total_acked += n; tps_out tracker updated
├─ tps_in updated with len(raw_messages)
├─ handler → None → warning logged; no XACK (treat as explicit "ACK nothing")
└─ unacked IDs stay in PEL for XAUTOCLAIM recovery
3. XAUTOCLAIM cursor loop — reclaims msgs idle > min_idle_claim_ms
├─ follows cursor until Redis returns "0-0" (full PEL swept) or stall detected
├─ max_claim_passes caps iterations if set; None = unlimited (default)
├─ tps_in updated with len(claimed) per batch
└─ reclaimed msgs → same handler → XACK; tps_out updated
4. XPENDING sweep — find entries with delivery_count >= max_deliveries
├─ no dlq_handler → warning logged with IDs; still ACKed (message cleared)
├─ msg missing from stream (XDEL'd) → warning logged; still ACKed
└─ poison pills → dlq_handler(msg, "max_deliveries") + batched XACK
run() wraps run_once() in an infinite loop:
├─ CancelledError → re-raised immediately (clean shutdown)
├─ NOGROUP error → registry entry cleared; sleep 1s; re-enter loop
└─ any other error → logged; sleep 1s; re-enter loop
Metrics Updated Per Iteration¶
total_read— incremented on every XREADGROUP / XAUTOCLAIM batchtps_in— sliding tracker records every read batch (new + reclaimed)total_acked— incremented after each successful handler → XACKtps_out— sliding tracker records every ack batchtotal_dlq— incremented for decode errors + poison pill ACKstotal_errors— incremented for handler exceptions