Skip to content

Examples

Under examples folder you can find 4 differents examples, one with aiokafka (async) showing the simplest use case when a AvroModel instance is serialized and sent it thorught kafka, and the event is consumed. The other two examples are sync using the kafka-python driver, where the avro-json serialization and schema evolution (FULL compatibility) is shown. The last example is a redis example with the python drivers walrus

Kafka examples

aiokafka example
import asyncio
import enum
import random
from dataclasses import dataclass
from typing import Optional

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer

from dataclasses_avroschema import AvroModel


class FavoriteColor(enum.Enum):
    BLUE = "BLUE"
    YELLOW = "YELLOW"
    GREEN = "GREEN"


@dataclass
class UserModel(AvroModel):
    "An User"

    name: str
    age: int
    favorite_colors: FavoriteColor = FavoriteColor.BLUE
    country: str = "Argentina"
    address: Optional[str] = None

    class Meta:
        namespace = "User.v1"
        aliases = ["user-v1", "super user"]


async def consume(loop, total_events=10):
    consumer = AIOKafkaConsumer(
        "my_topic",
        "my_other_topic",
        loop=loop,
        bootstrap_servers="localhost:9092",
        group_id="my-group",
    )
    # Get cluster layout and join group `my-group`
    await consumer.start()
    run_consumer = True

    while run_consumer:
        try:
            # Consume messages
            async for msg in consumer:
                print(f"Message received: {msg.value} at {msg.timestamp}")

                user = UserModel.deserialize(msg.value)
                print(f"Message deserialized: {user}")
        except KeyboardInterrupt:
            # Will leave consumer group; perform autocommit if enabled.
            await consumer.stop()
            print("Stoping consumer...")
            run_consumer = False


async def send(loop, total_events=10):
    producer = AIOKafkaProducer(loop=loop, bootstrap_servers="localhost:9092")
    # Get cluster layout and initial topic/partition leadership information
    await producer.start()

    for event_number in range(1, total_events + 1):
        # Produce message
        print(f"Sending event number {event_number}")

        user = UserModel(
            name=random.choice(
                [
                    "Juan",
                    "Peter",
                    "Michael",
                    "Moby",
                    "Kim",
                ]
            ),
            age=random.randint(1, 50),
        )

        # create the message
        message = user.serialize()

        await producer.send_and_wait("my_topic", message)
        # sleep for 2 seconds
        await asyncio.sleep(2)
    else:
        # Wait for all pending messages to be delivered or expire.
        await producer.stop()
        print("Stoping producer...")


def main():
    loop = asyncio.get_event_loop()
    tasks = asyncio.gather(send(loop), consume(loop))
    loop.run_until_complete(tasks)

(This script is complete, it should run "as is")

kafka-python example
import enum
import random
from dataclasses import dataclass
from time import sleep
from typing import Optional

from kafka import KafkaConsumer, KafkaProducer

from dataclasses_avroschema import AvroModel


class FavoriteColor(enum.Enum):
    BLUE = "BLUE"
    YELLOW = "YELLOW"
    GREEN = "GREEN"


@dataclass
class UserModel(AvroModel):
    "An User"

    name: str
    age: int
    favorite_colors: FavoriteColor = FavoriteColor.BLUE
    country: str = "Argentina"
    address: Optional[str] = None

    class Meta:
        namespace = "User.v1"
        aliases = ["user-v1", "super user"]


def consume():
    consumer = KafkaConsumer(
        "my_topic", bootstrap_servers="localhost:9092", group_id="my-group"
    )

    for msg in consumer:
        print(f"Message received: {msg.value} at {msg.timestamp}")

        user = UserModel.deserialize(msg.value)
        print(f"Message deserialized: {user}")

    print("Stoping consumer...")


def send(total_events=10):
    producer = KafkaProducer(bootstrap_servers="localhost:9092")

    for event_number in range(1, total_events + 1):
        # Produce message
        print(f"Sending event number {event_number}")

        # create an instance of User v1
        user = UserModel(
            name=random.choice(
                [
                    "Juan",
                    "Peter",
                    "Michael",
                    "Moby",
                    "Kim",
                ]
            ),
            age=random.randint(1, 50),
        )

        # create the message
        message = user.serialize()

        producer.send("my_topic", message)
        # sleep for 2 seconds
        sleep(1)

    print("Stoping producer...")


def main():
    send()
    consume()

(This script is complete, it should run "as is")

schema evolution example
import enum
import random
from dataclasses import dataclass
from time import sleep
from typing import Optional

from kafka import KafkaConsumer, KafkaProducer

from dataclasses_avroschema import AvroModel


class FavoriteColor(enum.Enum):
    BLUE = "BLUE"
    YELLOW = "YELLOW"
    GREEN = "GREEN"


@dataclass
class UserModel(AvroModel):
    "An User"

    name: str
    age: int
    favorite_colors: FavoriteColor = FavoriteColor.BLUE
    country: str = "Argentina"
    address: Optional[str] = None
    testing: bool = False

    class Meta:
        namespace = "User.v1"
        aliases = ["user-v1", "super user"]


@dataclass
class UserModelV2(AvroModel):
    "A User v2"

    name: str
    age: int
    favorite_colors: FavoriteColor = FavoriteColor.BLUE
    country: str = "Argentina"
    address: Optional[str] = None

    class Meta:
        namespace = "User.v2"


def consume():
    consumer = KafkaConsumer(
        "my_topic", bootstrap_servers="localhost:9092", group_id="my-group"
    )

    for msg in consumer:
        print(f"Message received: {msg.value} at {msg.timestamp}")

        # create an instance of User v2
        user = UserModelV2.deserialize(msg.value)
        print(f"Message deserialized: {user}")

    print("Stoping consumer...")


def send(total_events=2):
    producer = KafkaProducer(bootstrap_servers="localhost:9092")

    for event_number in range(1, total_events + 1):
        # Produce message
        print(f"Sending event number {event_number}")

        # create an instance of User v1
        user = UserModel(
            name=random.choice(
                [
                    "Juan",
                    "Peter",
                    "Michael",
                    "Moby",
                    "Kim",
                ]
            ),
            age=random.randint(1, 50),
        )

        print(user)

        # create the message
        message = user.serialize()

        producer.send("my_topic", message)
        # sleep for 2 seconds
        sleep(1)

    print("Stoping producer...")


def main():
    send()
    consume()

(This script is complete, it should run "as is")

Redis examples

Minimal redis example using redis streams with walrus driver.

redis streams example
import enum
import random
from dataclasses import dataclass
from time import sleep
from typing import Optional

from walrus import Database  # A subclass of the redis-py Redis client.

from dataclasses_avroschema import AvroModel


class FavoriteColor(enum.Enum):
    BLUE = "BLUE"
    YELLOW = "YELLOW"
    GREEN = "GREEN"


@dataclass
class UserModel(AvroModel):
    "An User"

    name: str
    age: int
    favorite_colors: FavoriteColor = FavoriteColor.BLUE
    country: str = "Argentina"
    address: Optional[str] = None
    testing: bool = False

    class Meta:
        namespace = "User.v1"
        aliases = ["user-v1", "super user"]


def consume(consumer_group):
    # read new messages in the stream

    while True:
        result = consumer_group.my_stream.read(count=1, block=1000)
        # Each record has the followinf format
        # [(b'1598545738231-0', {b'message': b'\x06KimT\x00\x12Argentina\x00\x00'})]

        if result:
            message_id, message_content = result[0]

            if message_id:
                value = message_content[b"message"]
                print(f"Processing message {message_id} with value {value}")
                user = UserModel.deserialize(value)
                print(user)


def produce(consumer_group):
    for i in range(10):
        # create an instance of User v1
        user = UserModel(
            name=random.choice(
                [
                    "Juan",
                    "Peter",
                    "Michael",
                    "Moby",
                    "Kim",
                ]
            ),
            age=random.randint(1, 50),
        )

        msgid = consumer_group.my_stream.add({"message": user.serialize()})
        print(f"Producing message {msgid}")

    print("Producer finished....")
    print("#" * 80)
    sleep(2)


def main():
    db = Database()
    stream_name = "my-stream"
    db.Stream(stream_name)  # Create a new stream instance

    # create the consumer group
    consumer_group = db.consumer_group("my-consumer-group-1", [stream_name])
    consumer_group.create()  # Create the consumer group.
    consumer_group.set_id("$")

    produce(consumer_group)
    consume(consumer_group)

(This script is complete, it should run "as is")

RabbitMQ examples

Minimal rabbitmq example with pika driver.

rabbitmq example
import enum
import random
from dataclasses import dataclass
from typing import Optional

import pika

from dataclasses_avroschema import AvroModel


class FavoriteColor(enum.Enum):
    BLUE = "BLUE"
    YELLOW = "YELLOW"
    GREEN = "GREEN"


@dataclass
class UserModel(AvroModel):
    "An User"

    name: str
    age: int
    favorite_colors: FavoriteColor = FavoriteColor.BLUE
    country: str = "Argentina"
    address: Optional[str] = None

    class Meta:
        namespace = "User.v1"
        aliases = ["user-v1", "super user"]


def consume(*, channel, queue_name: str):
    def callback(channel: str, method, properties, body):
        print(f"Message received with body: {body} in channel {method} \n")
        user = UserModel.deserialize(body)
        print(f"Message deserialized: {user} \n")

    channel.basic_consume(on_message_callback=callback, queue=queue_name)
    channel.start_consuming()


def produce(*, channel, queue_name: str, total_events: int = 10):
    for event_number in range(1, total_events + 1):
        # Produce message
        print(f"Sending event number {event_number}")

        user = UserModel(
            name=random.choice(
                [
                    "Juan",
                    "Peter",
                    "Michael",
                    "Moby",
                    "Kim",
                ]
            ),
            age=random.randint(1, 50),
        )

        # create the message
        message = user.serialize()

        # publish
        channel.basic_publish(exchange="", routing_key=queue_name, body=message)


def main():
    rabbitmq_url = "amqp://guest:guest@localhost:5672/%2f"
    queue_name = "test-stream"
    params = pika.URLParameters(rabbitmq_url)

    connection = pika.BlockingConnection(params)
    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)

    produce(channel=channel, queue_name=queue_name)
    consume(channel=channel, queue_name=queue_name)

    connection.close()

(This script is complete, it should run "as is")