Skip to content

Serializers

To serialize and deserialize messages you can use AvroMessageSerializer and JsonMessageSerializer. They interact with the SchemaRegistryClient to get avro Schemas and json schemas in order to process messages.

If you want to run the following examples run docker-compose up and the schema registry server will run on http://127.0.0.1:8081

Warning

The AvroMessageSerializer uses the same protocol as confluent, meaning that the event will contain the schema id in the payload. If you produce an event with the AvroMessageSerializer you have to consume it with the AvroMessageSerializer as well, otherwise you have to implement the parser on the consumer side.

schema_registry.serializers.AvroMessageSerializer

Bases: MessageSerializer

AvroMessageSerializer to serialize and deserialize messages.

Example

from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import AvroMessageSerializer


client = SchemaRegistryClient("http://127.0.0.1:8081")
avro_message_serializer = AvroMessageSerializer(client)

avro_user_schema = schema.AvroSchema({
    "type": "record",
    "namespace": "com.example",
    "name": "AvroUsers",
    "fields": [
        {"name": "first_name", "type": "string"},
        {"name": "last_name", "type": "string"},
        {"name": "age", "type": "int"},

    ],
})

# We want to encode the user_record with avro_user_schema
user_record = {
    "first_name": "my_first_name",
    "last_name": "my_last_name",
    "age": 20,
}

# Encode the record
message_encoded = avro_message_serializer.encode_record_with_schema(
    "user", avro_user_schema, user_record)

# this is because the message encoded reserved 5 bytes for the schema_id
assert len(message_encoded) > 5
assert isinstance(message_encoded, bytes)

# Decode the message
message_decoded = avro_message_serializer.decode_message(message_encoded)
assert message_decoded == user_record

# Now if we send a bad record
bad_record = {
    "first_name": "my_first_name",
    "last_name": "my_last_name",
    "age": "my_age"
}

avro_message_serializer.encode_record_with_schema(
    "user", avro_user_schema, bad_record)

# >>> TypeError: an integer is required on field age

Parameters:

Name Type Description Default
schemaregistry_client SchemaRegistryClient

Http Client

required
reader_schema Optional[AvroSchema]

Specify a schema to decode the message

None
return_record_name bool

If the record name should be returned

False
Source code in schema_registry/serializers/message_serializer.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class AvroMessageSerializer(MessageSerializer):
    """AvroMessageSerializer to serialize and deserialize messages.

    !!! Example
        ```python
        from schema_registry.client import SchemaRegistryClient, schema
        from schema_registry.serializers import AvroMessageSerializer


        client = SchemaRegistryClient("http://127.0.0.1:8081")
        avro_message_serializer = AvroMessageSerializer(client)

        avro_user_schema = schema.AvroSchema({
            "type": "record",
            "namespace": "com.example",
            "name": "AvroUsers",
            "fields": [
                {"name": "first_name", "type": "string"},
                {"name": "last_name", "type": "string"},
                {"name": "age", "type": "int"},

            ],
        })

        # We want to encode the user_record with avro_user_schema
        user_record = {
            "first_name": "my_first_name",
            "last_name": "my_last_name",
            "age": 20,
        }

        # Encode the record
        message_encoded = avro_message_serializer.encode_record_with_schema(
            "user", avro_user_schema, user_record)

        # this is because the message encoded reserved 5 bytes for the schema_id
        assert len(message_encoded) > 5
        assert isinstance(message_encoded, bytes)

        # Decode the message
        message_decoded = avro_message_serializer.decode_message(message_encoded)
        assert message_decoded == user_record

        # Now if we send a bad record
        bad_record = {
            "first_name": "my_first_name",
            "last_name": "my_last_name",
            "age": "my_age"
        }

        avro_message_serializer.encode_record_with_schema(
            "user", avro_user_schema, bad_record)

        # >>> TypeError: an integer is required on field age
        ```

    Args:
        schemaregistry_client: Http Client
        reader_schema: Specify a schema to decode the message
        return_record_name: If the record name should be returned
    """

    @property
    def _serializer_schema_type(self) -> typing.Literal["AVRO", "JSON"]:
        return utils.AVRO_SCHEMA_TYPE

    def _get_encoder_func(self, schema: typing.Union[BaseSchema]) -> typing.Callable:
        return lambda record, fp: schemaless_writer(fp, schema.schema, record)

    def _get_decoder_func(self, payload: ContextStringIO, writer_schema: BaseSchema) -> typing.Callable:
        return lambda payload: schemaless_reader(
            payload,
            writer_schema.schema,
            typing.cast(Schema, self.reader_schema),
            self.return_record_name,
        )

schema_registry.serializers.JsonMessageSerializer

Bases: MessageSerializer

JsonMessageSerializer to serialize and deserialize messages.

Example

from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import JsonMessageSerializer


client = SchemaRegistryClient("http://127.0.0.1:8081")
json_message_serializer = JsonMessageSerializer(client)

json_schema = schema.JsonSchema({
"definitions" : {
    "record:python.test.basic.basic" : {
    "description" : "basic schema for tests",
    "type" : "object",
    "required" : [ "number", "name" ],
    "properties" : {
        "number" : {
        "oneOf" : [ {
            "type" : "integer"
        }, {
            "type" : "null"
        } ]
        },
        "name" : {
        "oneOf" : [ {
            "type" : "string"
        } ]
        }
    }
    }
},
"$ref" : "#/definitions/record:python.test.basic.basic"
})

# Encode the record
basic_record = {
    "number": 10,
    "name": "a_name",
}

message_encoded = json_message_serializer.encode_record_with_schema(
    "basic", json_schema, basic_record)

# this is because the message encoded reserved 5 bytes for the schema_id
assert len(message_encoded) > 5
assert isinstance(message_encoded, bytes)

# Decode the message
message_decoded = json_message_serializer.decode_message(message_encoded)
assert message_decoded == basic_record

Parameters:

Name Type Description Default
schemaregistry_client SchemaRegistryClient

Http Client

required
reader_schema Optional[AvroSchema]

Specify a schema to decode the message

None
return_record_name bool

If the record name should be returned

False
Source code in schema_registry/serializers/message_serializer.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
class JsonMessageSerializer(MessageSerializer):
    """JsonMessageSerializer to serialize and deserialize messages.

    !!! Example

        ```python
        from schema_registry.client import SchemaRegistryClient, schema
        from schema_registry.serializers import JsonMessageSerializer


        client = SchemaRegistryClient("http://127.0.0.1:8081")
        json_message_serializer = JsonMessageSerializer(client)

        json_schema = schema.JsonSchema({
        "definitions" : {
            "record:python.test.basic.basic" : {
            "description" : "basic schema for tests",
            "type" : "object",
            "required" : [ "number", "name" ],
            "properties" : {
                "number" : {
                "oneOf" : [ {
                    "type" : "integer"
                }, {
                    "type" : "null"
                } ]
                },
                "name" : {
                "oneOf" : [ {
                    "type" : "string"
                } ]
                }
            }
            }
        },
        "$ref" : "#/definitions/record:python.test.basic.basic"
        })

        # Encode the record
        basic_record = {
            "number": 10,
            "name": "a_name",
        }

        message_encoded = json_message_serializer.encode_record_with_schema(
            "basic", json_schema, basic_record)

        # this is because the message encoded reserved 5 bytes for the schema_id
        assert len(message_encoded) > 5
        assert isinstance(message_encoded, bytes)

        # Decode the message
        message_decoded = json_message_serializer.decode_message(message_encoded)
        assert message_decoded == basic_record
        ```

    Args:
        schemaregistry_client: Http Client
        reader_schema: Specify a schema to decode the message
        return_record_name: If the record name should be returned
    """

    @property
    def _serializer_schema_type(self) -> typing.Literal["AVRO", "JSON"]:
        return utils.JSON_SCHEMA_TYPE

    def _get_encoder_func(self, schema: typing.Union[BaseSchema]) -> typing.Callable:
        def json_encoder_func(record: dict, fp: ContextStringIO) -> typing.Any:
            validate(record, schema.schema)
            fp.write(json.dumps(record).encode())

        return json_encoder_func

    def _get_decoder_func(self, payload: ContextStringIO, writer_schema: BaseSchema) -> typing.Callable:
        def json_decoder_func(payload: typing.IO) -> typing.Any:
            obj = json.load(payload)
            validate(obj, writer_schema.schema)
            return obj

        return json_decoder_func

Async implementations

JsonMessageSerializer, AvroMessageSerializer and SchemaRegistryClient have their asynchronous counterparts AsyncJsonMessageSerializer, AsyncAvroMessageSerializer and AsyncSchemaRegistryClient and all examples above should work if you replace them with their async variations

schema_registry.serializers.AsyncAvroMessageSerializer

Bases: AsyncMessageSerializer

AsyncAvroMessageSerializer to serialize and deserialize messages asynchronously.

Parameters:

Name Type Description Default
schemaregistry_client AsyncSchemaRegistryClient

Http Client

required
reader_schema Optional[AvroSchema]

Specify a schema to decode the message

None
return_record_name bool

If the record name should be returned

False
Source code in schema_registry/serializers/message_serializer.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
class AsyncAvroMessageSerializer(AsyncMessageSerializer):
    """AsyncAvroMessageSerializer to serialize and deserialize messages asynchronously.

    Args:
        schemaregistry_client: Http Client
        reader_schema: Specify a schema to decode the message
        return_record_name: If the record name should be returned
    """

    @property
    def _serializer_schema_type(self) -> typing.Literal["AVRO", "JSON"]:
        return utils.AVRO_SCHEMA_TYPE

    def _get_encoder_func(self, schema: typing.Union[BaseSchema]) -> typing.Callable:
        return lambda record, fp: schemaless_writer(fp, schema.schema, record)

    def _get_decoder_func(self, payload: ContextStringIO, writer_schema: BaseSchema) -> typing.Callable:
        return lambda payload: schemaless_reader(
            payload,
            writer_schema.schema,
            typing.cast(Schema, self.reader_schema),
            self.return_record_name,
        )

schema_registry.serializers.AsyncJsonMessageSerializer

Bases: AsyncMessageSerializer

AsyncJsonMessageSerializer to serialize and deserialize messages asynchronously.

Parameters:

Name Type Description Default
schemaregistry_client AsyncSchemaRegistryClient

Http Client

required
reader_schema Optional[AvroSchema]

Specify a schema to decode the message

None
return_record_name bool

If the record name should be returned

False
Source code in schema_registry/serializers/message_serializer.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
class AsyncJsonMessageSerializer(AsyncMessageSerializer):
    """AsyncJsonMessageSerializer to serialize and deserialize messages asynchronously.

    Args:
        schemaregistry_client: Http Client
        reader_schema: Specify a schema to decode the message
        return_record_name: If the record name should be returned
    """

    @property
    def _serializer_schema_type(self) -> typing.Literal["AVRO", "JSON"]:
        return utils.JSON_SCHEMA_TYPE

    def _get_encoder_func(self, schema: typing.Union[BaseSchema]) -> typing.Callable:
        def json_encoder_func(record: dict, fp: ContextStringIO) -> typing.Any:
            validate(record, schema.schema)
            fp.write(json.dumps(record).encode())

        return json_encoder_func

    def _get_decoder_func(self, payload: ContextStringIO, writer_schema: BaseSchema) -> typing.Callable:
        def json_decoder_func(payload: typing.IO) -> typing.Any:
            obj = json.load(payload)
            validate(obj, writer_schema.schema)
            return obj

        return json_decoder_func