Quick Start¶
Producer¶
import asyncio
from redis_stream_queue import StreamClient, StreamProducer
async def main():
client = StreamClient(host="localhost")
producer = StreamProducer(client=client, stream="orders", group="order_workers")
await producer.ensure_group() # idempotent — safe to call on every startup
msg_id = await producer.push({"order_id": 1})
print(f"pushed: {msg_id}")
await client.close()
asyncio.run(main())
Consumer¶
import asyncio
from redis_stream_queue import StreamClient, StreamConsumer, ConsumerConfig
async def handle(messages):
for msg in messages:
print(f"processing: {msg.data}")
return [m.id for m in messages] # return IDs to ACK; omit to leave in PEL
async def on_dlq(msg, reason):
print(f"DLQ [{reason}]: {msg.data}")
async def main():
client = StreamClient(host="localhost")
config = ConsumerConfig(
group="order_workers",
dlq_stream="orders_dlq",
batch_size=100,
block_ms=5_000,
max_deliveries=3,
)
consumer = StreamConsumer(
client=client,
stream="orders",
config=config,
handler=handle,
dlq_handler=on_dlq,
)
await consumer.run() # infinite loop; Ctrl-C / CancelledError to stop
asyncio.run(main())
Handler contract
Return a list of IDs to ACK. Return [] to ACK nothing (messages stay in PEL for retry). Never return None — that triggers a warning and no ACK.
Consumer Throughput Metrics¶
consumer.metrics() is non-blocking and makes no Redis calls — safe to poll from any monitoring loop or health endpoint.
import asyncio
from redis_stream_queue import StreamClient, StreamConsumer, ConsumerConfig
async def monitor(consumer):
while True:
m = consumer.metrics()
print(
f"in={m.tps_in:.1f} msg/s out={m.tps_out:.1f} msg/s total={m.tps_total:.1f} msg/s "
f"avg={m.avg_tps:.1f} msg/s "
f"read={m.total_read} acked={m.total_acked} "
f"dlq={m.total_dlq} errors={m.total_errors} "
f"uptime={m.uptime_secs:.0f}s"
)
await asyncio.sleep(5)
async def main():
client = StreamClient(host="localhost")
config = ConsumerConfig(group="order_workers")
consumer = StreamConsumer(client=client, stream="orders", config=config, handler=handle)
await asyncio.gather(consumer.run(), monitor(consumer))
| Field | Type | Description |
|---|---|---|
tps_in |
float |
Reads/sec — XREADGROUP + XAUTOCLAIM, sliding 60 s window |
tps_out |
float |
Acked/sec — sliding 60 s window |
tps_total |
float |
tps_in + tps_out |
avg_tps |
float |
total_acked / uptime_secs since first message |
total_read |
int |
Messages pulled from stream (new + reclaimed via XAUTOCLAIM) |
total_acked |
int |
Successfully processed and ACKed by handler |
total_dlq |
int |
Routed to DLQ (decode_error + max_deliveries combined) |
total_errors |
int |
Handler exceptions — message stays in PEL for retry |
uptime_secs |
float |
Seconds since first message was processed |
Producer Throughput Metrics¶
m = producer.metrics()
print(f"push={m.tps:.1f} msg/s avg={m.avg_tps:.1f} msg/s total={m.total_pushed} uptime={m.uptime_secs:.0f}s")
| Field | Type | Description |
|---|---|---|
total_pushed |
int |
Messages pushed since instance creation |
tps |
float |
Pushed/sec — sliding 60 s window |
avg_tps |
float |
total_pushed / uptime_secs since first push |
uptime_secs |
float |
Seconds since first push |
Process-Wide Metrics (Multiple Instances)¶
Each StreamConsumer and StreamProducer auto-registers in a process-level weakref registry on creation. Dead instances are evicted automatically by GC.
# Multiple consumers in same asyncio event loop
c1 = StreamConsumer(client=client, stream="orders", config=cfg_a, handler=handle_a)
c2 = StreamConsumer(client=client, stream="payments", config=cfg_b, handler=handle_b)
c3 = StreamConsumer(client=client, stream="events", config=cfg_c, handler=handle_c)
await asyncio.gather(c1.run(), c2.run(), c3.run())
# From a monitoring task running concurrently:
for m in StreamConsumer.all_metrics():
print(f"in={m.tps_in:.1f} out={m.tps_out:.1f} acked={m.total_acked}")
# Multiple producers
p1 = StreamProducer(client=client, stream="orders")
p2 = StreamProducer(client=client, stream="payments")
for m in StreamProducer.all_metrics():
print(f"tps={m.tps:.1f} pushed={m.total_pushed}")
Multi-pod scope
all_metrics() is in-process only — it sees instances in this pod, not other pods.
For cross-pod aggregation, expose metrics via a health endpoint and scrape with Prometheus or similar.
Stream / Group Monitoring¶
from redis_stream_queue import StreamClient, ConsumerGroup
async def main():
client = StreamClient(host="localhost")
cg = ConsumerGroup(client, stream="orders", group="order_workers")
# Stream-level stats (requires Redis calls)
stats = await cg.stats(dlq_stream="orders_dlq")
print(f"length={stats.stream_length} lag={stats.lag} pel={stats.group_pel_size}")
for c in stats.consumers:
print(f" consumer={c.name} pending={c.pending} idle={c.idle_ms}ms")
# Health check
health = await cg.health_check(max_lag=1_000, max_idle_ms=60_000)
print(f"healthy={health['healthy']} issues={health['issues']}")
# Inspect stuck messages
pending = await cg.pending_details(count=50)
for entry in pending:
print(f" {entry.id} consumer={entry.consumer} deliveries={entry.delivery_count}")