Serialization with Dataclasses Avro Schema
Last year I released the project Dataclasses Avro Schema in wich the main goal was to generate avro schemas
from python dataclass
. Thanks to this main feature, is possible to serialize/deserialize
python instances using the self contain avro schemas
. For example, we can serialize
python instances in order to create events
and place them (binary
) in kafka topics
or redis streams
and also we can deserialize
the events
and convert them into the original python instances. This is a powerful feature, because the data layer
for streaming application is full covered with this library, meaning that you can use your favorite python kafka driver
or python redis driver
to built streaming application without worries of the data model
.
Serialization
Serialization
can be done using avro
, avro-json
or json
on a python instance:
from dataclasses import dataclass import typing from dataclasses_avroschema import AvroModel @dataclass class Address(AvroModel): "An Address" street: str street_number: int address_data = { "street": "test", "street_number": 10, } # create an Address instance address = Address(**address_data) address.serialize() # >>> b'\x08test\x14' address.serialize(serialization_type="avro-json") # >>> b'{"street": "test", "street_number": 10}' # Get the json from the instance address.to_json() # python dict >>> {'street': 'test', 'street_number': 10}
Deserialization
Deserialization
can be done as well with avro
or avro-json
. You must know beforehand which one you should use
avro_binary = b'\x08test\x14' # Address instance serialized with avro Address.deserialize(avro_binary) # create a python instance of Address # >>>> Address(street='test', street_number=10) avro_json = b'{"street": "test", "street_number": 10}' # Address instance serialized with avro-json Address.deserialize(avro_json, serialization_type="avro-json") # create a python instance of Address # >>>> Address(street='test', street_number=10)
Examples with kafka and redis drivers
You can create simple straming applications using your favorite python driver either kafka
or redis
and integrate producers and consumers with dataclasses-avroschema
. The following is a minimal example using aiokafka
:
import asyncio from dataclasses import dataclass import random from aiokafka import AIOKafkaConsumer, AIOKafkaProducer from dataclasses_avroschema import AvroModel, types @dataclass class UserModel(AvroModel): "An User" name: str age: int favorite_colors: types.Enum = types.Enum(["BLUE", "YELLOW", "GREEN"], default="BLUE") country: str = "Argentina" address: str = None class Meta: namespace = "User.v1" aliases = ["user-v1", "super user"] async def consume(loop, total_events=10): consumer = AIOKafkaConsumer( 'my_topic', 'my_other_topic', loop=loop, bootstrap_servers='localhost:9092', group_id="my-group") # Get cluster layout and join group `my-group` await consumer.start() run_consumer = True while run_consumer: try: # Consume messages async for msg in consumer: print(f"Message received: {msg.value} at {msg.timestamp}") user = UserModel.deserialize(msg.value) print(f"Message deserialized: {user}") except KeyboardInterrupt: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() print("Stoping consumer...") run_consumer = False async def send(loop, total_events=10): producer = AIOKafkaProducer( loop=loop, bootstrap_servers='localhost:9092') # Get cluster layout and initial topic/partition leadership information await producer.start() for event_number in range(1, total_events + 1): # Produce message print(f"Sending event number {event_number}") user = UserModel( name=random.choice(["Juan", "Peter", "Michael", "Moby", "Kim",]), age=random.randint(1, 50) ) # create the message message = user.serialize() await producer.send_and_wait("my_topic", message) # sleep for 2 seconds await asyncio.sleep(2) else: # Wait for all pending messages to be delivered or expire. await producer.stop() print("Stoping producer...") if __name__ == "__main__": loop = asyncio.get_event_loop() tasks = asyncio.gather(send(loop), consume(loop)) loop.run_until_complete(tasks)
Under examples folder you can other two kafka examples (sync
) using the kafka-python driver, where the avro-json
serialization and schema evolution
(FULL
compatibility) is shown.
Also, there are two redis
examples using redis streams
with walrus and redisgears-py
Factory and fixtures
Dataclasses Avro Schema also includes a factory
feature, so you can generate fast
python instances and use them, for example, to test your data streaming pipelines. Instances can be genrated using the fake
method.
import typing from dataclasses_avroschema import AvroModel class Address(AvroModel): "An Address" street: str street_number: int class User(AvroModel): "User with multiple Address" name: str age: int addresses: typing.List[Address] Address.fake() # >>>> Address(street='PxZJILDRgbXyhWrrPWxQ', street_number=2067) User.fake() # >>>> User(name='VGSBbOGfSGjkMDnefHIZ', age=8974, addresses=[Address(street='vNpPYgesiHUwwzGcmMiS', street_number=4790)])
Conclusion
If you are starting a straming python application, give it a try to Dataclasses Avro Schema in order to cover the data model layer, and avoid headaches at the moment of serializarion/deserialization
process.