Skip to content

Serializers

Built-in Serializers

Serializer Extra Notes
JsonSerializer none Default; human-readable, broadest compatibility
MsgpackSerializer [msgpack] Smaller wire size, faster encode/decode
PickleSerializer none Any Python type; requires same Python version on both ends

PickleSerializer security warning

Do not use PickleSerializer with untrusted data. Pickle deserialization can execute arbitrary code.

Serializer must match on both producer and consumer:

from redis_stream_queue import MsgpackSerializer

producer = StreamProducer(..., serializer=MsgpackSerializer())
consumer = StreamConsumer(..., serializer=MsgpackSerializer())

Decode Failures

Decode failures are routed immediately to dlq_handler(msg, "decode_error") and the message is ACKed. Corrupt data is not retried — it will always fail.

Custom Serializer

Implement the Serializer protocol:

from redis_stream_queue import Serializer

class CborSerializer:
    def encode(self, data: dict) -> bytes:
        import cbor2
        return cbor2.dumps(data)

    def decode(self, raw: bytes) -> dict:
        import cbor2
        return cbor2.loads(raw)

# Use it
producer = StreamProducer(..., serializer=CborSerializer())
consumer = StreamConsumer(..., serializer=CborSerializer())