Skip to content

Quick Start

Producer

import asyncio
from redis_stream_queue import StreamClient, StreamProducer

async def main():
    client = StreamClient(host="localhost")
    producer = StreamProducer(client=client, stream="orders", group="order_workers")

    await producer.ensure_group()           # idempotent — safe to call on every startup
    msg_id = await producer.push({"order_id": 1})
    print(f"pushed: {msg_id}")
    await client.close()

asyncio.run(main())

Consumer

import asyncio
from redis_stream_queue import StreamClient, StreamConsumer, ConsumerConfig

async def handle(messages):
    for msg in messages:
        print(f"processing: {msg.data}")
    return [m.id for m in messages]     # return IDs to ACK; omit to leave in PEL

async def on_dlq(msg, reason):
    print(f"DLQ [{reason}]: {msg.data}")

async def main():
    client = StreamClient(host="localhost")
    config = ConsumerConfig(
        group="order_workers",
        dlq_stream="orders_dlq",
        batch_size=100,
        block_ms=5_000,
        max_deliveries=3,
    )
    consumer = StreamConsumer(
        client=client,
        stream="orders",
        config=config,
        handler=handle,
        dlq_handler=on_dlq,
    )
    await consumer.run()    # infinite loop; Ctrl-C / CancelledError to stop

asyncio.run(main())

Handler contract

Return a list of IDs to ACK. Return [] to ACK nothing (messages stay in PEL for retry). Never return None — that triggers a warning and no ACK.

Consumer Throughput Metrics

consumer.metrics() is non-blocking and makes no Redis calls — safe to poll from any monitoring loop or health endpoint.

import asyncio
from redis_stream_queue import StreamClient, StreamConsumer, ConsumerConfig

async def monitor(consumer):
    while True:
        m = consumer.metrics()
        print(
            f"in={m.tps_in:.1f} msg/s  out={m.tps_out:.1f} msg/s  total={m.tps_total:.1f} msg/s  "
            f"avg={m.avg_tps:.1f} msg/s  "
            f"read={m.total_read}  acked={m.total_acked}  "
            f"dlq={m.total_dlq}  errors={m.total_errors}  "
            f"uptime={m.uptime_secs:.0f}s"
        )
        await asyncio.sleep(5)

async def main():
    client = StreamClient(host="localhost")
    config = ConsumerConfig(group="order_workers")
    consumer = StreamConsumer(client=client, stream="orders", config=config, handler=handle)

    await asyncio.gather(consumer.run(), monitor(consumer))
Field Type Description
tps_in float Reads/sec — XREADGROUP + XAUTOCLAIM, sliding 60 s window
tps_out float Acked/sec — sliding 60 s window
tps_total float tps_in + tps_out
avg_tps float total_acked / uptime_secs since first message
total_read int Messages pulled from stream (new + reclaimed via XAUTOCLAIM)
total_acked int Successfully processed and ACKed by handler
total_dlq int Routed to DLQ (decode_error + max_deliveries combined)
total_errors int Handler exceptions — message stays in PEL for retry
uptime_secs float Seconds since first message was processed

Producer Throughput Metrics

m = producer.metrics()
print(f"push={m.tps:.1f} msg/s  avg={m.avg_tps:.1f} msg/s  total={m.total_pushed}  uptime={m.uptime_secs:.0f}s")
Field Type Description
total_pushed int Messages pushed since instance creation
tps float Pushed/sec — sliding 60 s window
avg_tps float total_pushed / uptime_secs since first push
uptime_secs float Seconds since first push

Process-Wide Metrics (Multiple Instances)

Each StreamConsumer and StreamProducer auto-registers in a process-level weakref registry on creation. Dead instances are evicted automatically by GC.

# Multiple consumers in same asyncio event loop
c1 = StreamConsumer(client=client, stream="orders", config=cfg_a, handler=handle_a)
c2 = StreamConsumer(client=client, stream="payments", config=cfg_b, handler=handle_b)
c3 = StreamConsumer(client=client, stream="events", config=cfg_c, handler=handle_c)

await asyncio.gather(c1.run(), c2.run(), c3.run())

# From a monitoring task running concurrently:
for m in StreamConsumer.all_metrics():
    print(f"in={m.tps_in:.1f}  out={m.tps_out:.1f}  acked={m.total_acked}")

# Multiple producers
p1 = StreamProducer(client=client, stream="orders")
p2 = StreamProducer(client=client, stream="payments")

for m in StreamProducer.all_metrics():
    print(f"tps={m.tps:.1f}  pushed={m.total_pushed}")

Multi-pod scope

all_metrics() is in-process only — it sees instances in this pod, not other pods. For cross-pod aggregation, expose metrics via a health endpoint and scrape with Prometheus or similar.

Stream / Group Monitoring

from redis_stream_queue import StreamClient, ConsumerGroup

async def main():
    client = StreamClient(host="localhost")
    cg = ConsumerGroup(client, stream="orders", group="order_workers")

    # Stream-level stats (requires Redis calls)
    stats = await cg.stats(dlq_stream="orders_dlq")
    print(f"length={stats.stream_length}  lag={stats.lag}  pel={stats.group_pel_size}")
    for c in stats.consumers:
        print(f"  consumer={c.name}  pending={c.pending}  idle={c.idle_ms}ms")

    # Health check
    health = await cg.health_check(max_lag=1_000, max_idle_ms=60_000)
    print(f"healthy={health['healthy']}  issues={health['issues']}")

    # Inspect stuck messages
    pending = await cg.pending_details(count=50)
    for entry in pending:
        print(f"  {entry.id}  consumer={entry.consumer}  deliveries={entry.delivery_count}")