Serialization
Is possible to serialize/deserialize
with the correspondent avro schema generated and the dataclass.
In both cases we can do it with avro
or avro-json
.
Instances serialization
from dataclasses import dataclass
import typing
from dataclasses_avroschema import AvroModel
@dataclass
class Address(AvroModel):
"An Address"
street: str
street_number: int
@dataclass
class User(AvroModel):
"User with multiple Address"
name: str
age: int
addresses: typing.List[Address]
address_data = {
"street": "test",
"street_number": 10,
}
# create an Address instance
address = Address(**address_data)
data_user = {
"name": "john",
"age": 20,
"addresses": [address],
}
# create an User instance
user = User(**data_user)
user.serialize()
# >>> b"\x08john(\x02\x08test\x14\x00"
user.serialize(serialization_type="avro-json")
# >>> b'{"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}'
user.to_json()
# python dict >>> {'name': 'john', 'age': 20, 'addresses': [{'street': 'test', 'street_number': 10}]}
(This script is complete, it should run "as is")
Note
For serialization is neccesary to use python dataclasses
Deserialization
Deserialization could take place with an instance dataclass or the dataclass itself. Can return the dict representation or a new class instance.
import typing
import dataclasses
from dataclasses_avroschema import AvroModel
@dataclasses.dataclass
class Address(AvroModel):
"An Address"
street: str
street_number: int
@dataclasses.dataclass
class User(AvroModel):
"User with multiple Address"
name: str
age: int
addresses: typing.List[Address]
avro_binary = b"\x08john(\x02\x08test\x14\x00"
avro_json_binary = b'{"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}'
# return a new class instance!!
User.deserialize(avro_binary)
# >>> User(name='john', age=20, addresses=[Address(street='test', street_number=10)])
# return a python dict
User.deserialize(avro_binary, create_instance=False)
# >>> {"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}
# return a new class instance!!
User.deserialize(avro_json_binary, serialization_type="avro-json")
# >>> User(name='john', age=20, addresses=[Address(street='test', street_number=10)])
# return a python dict
User.deserialize(avro_json_binary, serialization_type="avro-json", create_instance=False)
# >>> {"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}
(This script is complete, it should run "as is")
Deserialization using a different schema
To deserialize data encoded via a different schema, one can pass an optional writer_schema: AvroModel | dict[str, Any]
attribute. It will be used by the fastavros schemaless_reader
.
@dataclass
class User(AvroModel):
name: str
age: int
@dataclass
class UserCompatible(AvroModel):
name: str
age: int
nickname: Optional[str] = None
class Meta:
schema_name = "User"
user_data = {
"name": "G.R. Emlin",
"age": 52,
}
# serialize data with the User schema
>>> serialized_user = User(**user_data).serialize()
# deserialize user using a new, but compatible schema
>>> deserialized_user = UserCompatible.deserialize(serialized_user, writer_schema=User)
(This script is complete, it should run "as is")
Custom Serialization
The serialization/deserialization
process is built over fastavro. If you want to use another library or a different process, you can override the base AvroModel
:
import dataclasses
from dataclasses_avroschema import AvroModel, SerializationType
@dataclasses.dataclass
class MyAvroModel(AvroModel):
...
def serialize(self, serialization_type: SerializationType = "avro") -> bytes:
# Get the schema as a python dict
schema = self.avro_schema_to_python()
# instance as python dict
data = self.asdict()
# call your custom serialization withe the avro schema and the data
return custom_serialization(schema, datam serialization_type=serialization_type)
@classmethod
def deserialize(
cls, data: bytes, serialization_type: SerializationType = "avro", create_instance: bool = True
) -> typing.Union[typing.Dict, "AvroModel"]:
# Get the schema as a python dict
schema = cls.avro_schema_to_python()
# get the python dict with the schema and the data (bytes)
payload = custom_deserialize(data, schema, serialization_type=serialization_type)
if create_instance:
return from_dict(data_class=cls, data=payload, config=Config(check_types=False))
return payload
# and then inherits from your custom AvroModel
@dataclasses.dataclass
class Address(MyAvroModel):
"An Address"
street: str
street_number: int
Encoding for unions with avro-json
When you have an union
and you want to serialize a payload
using avro-json
you will notice that the type
is added to each union
field.
This is needed because after the serialization process you need to know the type
in order to deserialize
:
Do not confuse json with avro-json!!
import typing
import dataclasses
import datetime
import uuid
from dataclasses_avroschema import AvroModel
@dataclasses.dataclass
class UnionSchema(AvroModel):
"Some Unions"
first_union: typing.Union[str, int]
logical_union: typing.Union[datetime.datetime, datetime.date, uuid.uuid4]
my_union = UnionSchema(first_union=10, logical_union=datetime.datetime.now())
event = my_union.serialize(serialization_type="avro-json")
print(event)
# long is added to each field
>>> b'{"first_union": {"long": 10}, "logical_union": {"long": 1647971584847}}'
my_union.deserialize(event, serialization_type="avro-json")
# >>> UnionSchema(first_union=10, logical_union=datetime.datetime(2022, 3, 22, 17, 53, 4, 847000, tzinfo=datetime.timezone.utc))
# bad data
event_2 = b'{"first_union": 10, "logical_union": {"long": 1647971584847}}'
my_union.deserialize(event_2, serialization_type="avro-json")
File ~/Projects/dataclasses-avroschema/.venv/lib/python3.9/site-packages/fastavro/io/json_decoder.py:213, in AvroJSONDecoder.read_index(self)
211 label = "null"
212 else:
--> 213 label, data = self._current[self._key].popitem()
214 self._current[self._key] = data
215 # TODO: Do we need to do this?
AttributeError: 'int' object has no attribute 'popitem'
(This script is complete, it should run "as is")
Utils
The library includes two utils to serialize/deserialize using the fastavro
as backend
Serialize a payload into avro using fastavro
as backend
Attributes:
Name | Type | Description |
---|---|---|
payload |
Dict[str, Any]
|
The payload to serialize |
serialization_type |
SerializationType
|
|
Returns:
Type | Description |
---|---|
bytes
|
bytes encoded in avro format |
Example
from dataclasses_avroschema import serialize
payload = {'event': 'Hello world'}
schema = {
'type': 'record',
'name': 'MyRecord',
'fields': [
{'name': 'event', 'type': 'string', 'default': 'Hello World'}
]
}
event = serialize(payload=payload, schema=schema)
assert event == b'Hello world'
Deserialize an binary event
into a python Dict using fastavro
as backend
Attributes:
Name | Type | Description |
---|---|---|
data |
bytes
|
The event to deserialize |
schema |
bytes
|
Dict[str, Any]: The schema to use for the deserialization |
serialization_type |
SerializationType
|
|
context |
Dict[str, Any] | None
|
Optional extra context to use. Usually in includes an entry with all the extra models defined by the end user by name. Example AvroModel.name: AvroModel |
writer_schema |
Dict[str, Any] | None
|
The schema that was used to write
the event. If it is not provided it is assumed that the event was
written with the |
Returns:
Type | Description |
---|---|
JsonDict
|
The object dezerialized python Dict |
Example
from dataclasses_avroschema import deserialize
event = b'Hello world'
schema = {
'type': 'record',
'name': 'MyRecord',
'fields': [
{'name': 'event', 'type': 'string', 'default': 'Hello World'}
]
}
payload = deserialize(data=event, schema=schema)
assert payload == {'event': 'Hello world'}