Message Serializer

Class that serialize and deserialize messages. It interacts with the SchemaRegistryClient to get Avro Schemas in order to process messages. In your application you will intereact with it.

Usage:

from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import MessageSerializer

client = SchemaRegistryClient("http://127.0.0.1:8080")

message_serializer = MessageSerializer(client)

# Let's imagine that we have the foillowing schema.
avro_user_schema = schema.AvroSchema({
    "type": "record",
    "namespace": "com.example",
    "name": "AvroUsers",
    "fields": [
        {"name": "first_name", "type": "string"},
        {"name": "last_name", "type": "string"},
        {"name": "age", "type": "int"},

    ],
})

# We want to encode the user_record with avro_user_schema
user_record = {
    "first_name": "my_first_name",
    "last_name": "my_last_name",
    "age": 20,
}

message_encoded = await message_serializer.encode_record_with_schema(
    "user", avro_user_schema, user_record)

# this is because the message encoded reserved 5 bytes for the schema_id
assert len(message_encoded) > 5
assert isinstance(message_encoded, bytes)

# now decode the message
message_decoded = await message_serializer.decode_message(message_encoded)
assert message_decoded == user_record

# Now if we send a bad record
bad_record = {
    "first_name": "my_first_name",
    "last_name": "my_last_name",
    "age": "my_age"
}

await message_serializer.encode_record_with_schema(
    "user", avro_user_schema, bad_record)
# results in an error:
#   TypeError: unsupported operand type(s) for <<: 'str' and 'int'

Class and Methods:

MessageSerializer
    Args:
        schemaregistry_client (schema_registry.client.SchemaRegistryClient): Http Client

Encode record with a Schema:

async def encode_record_with_schema(subject, schema, record, is_key=False):
    """
    Args:
        subject (str): Subject name
        schema (avro.schema.RecordSchema): Avro Schema
        record (dict): An object to serialize
        is_key (bool): If the record is a key

    Returns:
        bytes: Encoded record with schema ID as bytes
    """

Encode a record with a schema id:

async def encode_record_with_schema_id(schema_id, record, is_key=False):
    """
    Args:
        schema_id (int): integer ID
        record (dict): An object to serialize
        is_key (bool): If the record is a key

    Returns:
        func: decoder function
    """

Decode a message encoded previously:

async def decode_message(message, is_key=False):
    """
    Args:
        message (str|bytes or None): message key or value to be decoded

    Returns:
        dict: Decoded message contents.
    """