Examples
Under examples folder you can find 4 differents examples, one with aiokafka (async
) showing the simplest use case when a AvroModel
instance is serialized and sent it thorught kafka, and the event is consumed.
The other two examples are sync
using the kafka-python driver, where the avro-json
serialization and schema evolution
(FULL
compatibility) is shown. The last example is a redis example with the python drivers walrus
Kafka examples
import asyncio
import enum
import random
from dataclasses import dataclass
from typing import Optional
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from dataclasses_avroschema import AvroModel
class FavoriteColor(enum.Enum):
BLUE = "BLUE"
YELLOW = "YELLOW"
GREEN = "GREEN"
@dataclass
class UserModel(AvroModel):
"An User"
name: str
age: int
favorite_colors: FavoriteColor = FavoriteColor.BLUE
country: str = "Argentina"
address: Optional[str] = None
class Meta:
namespace = "User.v1"
aliases = ["user-v1", "super user"]
async def consume(loop, total_events=10):
consumer = AIOKafkaConsumer(
"my_topic",
"my_other_topic",
loop=loop,
bootstrap_servers="localhost:9092",
group_id="my-group",
)
# Get cluster layout and join group `my-group`
await consumer.start()
run_consumer = True
while run_consumer:
try:
# Consume messages
async for msg in consumer:
print(f"Message received: {msg.value} at {msg.timestamp}")
user = UserModel.deserialize(msg.value)
print(f"Message deserialized: {user}")
except KeyboardInterrupt:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
print("Stoping consumer...")
run_consumer = False
async def send(loop, total_events=10):
producer = AIOKafkaProducer(loop=loop, bootstrap_servers="localhost:9092")
# Get cluster layout and initial topic/partition leadership information
await producer.start()
for event_number in range(1, total_events + 1):
# Produce message
print(f"Sending event number {event_number}")
user = UserModel(
name=random.choice(
[
"Juan",
"Peter",
"Michael",
"Moby",
"Kim",
]
),
age=random.randint(1, 50),
)
# create the message
message = user.serialize()
await producer.send_and_wait("my_topic", message)
# sleep for 2 seconds
await asyncio.sleep(2)
else:
# Wait for all pending messages to be delivered or expire.
await producer.stop()
print("Stoping producer...")
def main():
loop = asyncio.get_event_loop()
tasks = asyncio.gather(send(loop), consume(loop))
loop.run_until_complete(tasks)
(This script is complete, it should run "as is")
import enum
import random
from dataclasses import dataclass
from time import sleep
from typing import Optional
from kafka import KafkaConsumer, KafkaProducer
from dataclasses_avroschema import AvroModel
class FavoriteColor(enum.Enum):
BLUE = "BLUE"
YELLOW = "YELLOW"
GREEN = "GREEN"
@dataclass
class UserModel(AvroModel):
"An User"
name: str
age: int
favorite_colors: FavoriteColor = FavoriteColor.BLUE
country: str = "Argentina"
address: Optional[str] = None
class Meta:
namespace = "User.v1"
aliases = ["user-v1", "super user"]
def consume():
consumer = KafkaConsumer(
"my_topic", bootstrap_servers="localhost:9092", group_id="my-group"
)
for msg in consumer:
print(f"Message received: {msg.value} at {msg.timestamp}")
user = UserModel.deserialize(msg.value)
print(f"Message deserialized: {user}")
print("Stoping consumer...")
def send(total_events=10):
producer = KafkaProducer(bootstrap_servers="localhost:9092")
for event_number in range(1, total_events + 1):
# Produce message
print(f"Sending event number {event_number}")
# create an instance of User v1
user = UserModel(
name=random.choice(
[
"Juan",
"Peter",
"Michael",
"Moby",
"Kim",
]
),
age=random.randint(1, 50),
)
# create the message
message = user.serialize()
producer.send("my_topic", message)
# sleep for 2 seconds
sleep(1)
print("Stoping producer...")
def main():
send()
consume()
(This script is complete, it should run "as is")
import enum
import random
from dataclasses import dataclass
from time import sleep
from typing import Optional
from kafka import KafkaConsumer, KafkaProducer
from dataclasses_avroschema import AvroModel
class FavoriteColor(enum.Enum):
BLUE = "BLUE"
YELLOW = "YELLOW"
GREEN = "GREEN"
@dataclass
class UserModel(AvroModel):
"An User"
name: str
age: int
favorite_colors: FavoriteColor = FavoriteColor.BLUE
country: str = "Argentina"
address: Optional[str] = None
testing: bool = False
class Meta:
namespace = "User.v1"
aliases = ["user-v1", "super user"]
@dataclass
class UserModelV2(AvroModel):
"A User v2"
name: str
age: int
favorite_colors: FavoriteColor = FavoriteColor.BLUE
country: str = "Argentina"
address: Optional[str] = None
class Meta:
namespace = "User.v2"
def consume():
consumer = KafkaConsumer(
"my_topic", bootstrap_servers="localhost:9092", group_id="my-group"
)
for msg in consumer:
print(f"Message received: {msg.value} at {msg.timestamp}")
# create an instance of User v2
user = UserModelV2.deserialize(msg.value)
print(f"Message deserialized: {user}")
print("Stoping consumer...")
def send(total_events=2):
producer = KafkaProducer(bootstrap_servers="localhost:9092")
for event_number in range(1, total_events + 1):
# Produce message
print(f"Sending event number {event_number}")
# create an instance of User v1
user = UserModel(
name=random.choice(
[
"Juan",
"Peter",
"Michael",
"Moby",
"Kim",
]
),
age=random.randint(1, 50),
)
print(user)
# create the message
message = user.serialize()
producer.send("my_topic", message)
# sleep for 2 seconds
sleep(1)
print("Stoping producer...")
def main():
send()
consume()
(This script is complete, it should run "as is")
Redis examples
Minimal redis example using redis streams
with walrus driver.
import enum
import random
from dataclasses import dataclass
from time import sleep
from typing import Optional
from walrus import Database # A subclass of the redis-py Redis client.
from dataclasses_avroschema import AvroModel
class FavoriteColor(enum.Enum):
BLUE = "BLUE"
YELLOW = "YELLOW"
GREEN = "GREEN"
@dataclass
class UserModel(AvroModel):
"An User"
name: str
age: int
favorite_colors: FavoriteColor = FavoriteColor.BLUE
country: str = "Argentina"
address: Optional[str] = None
testing: bool = False
class Meta:
namespace = "User.v1"
aliases = ["user-v1", "super user"]
def consume(consumer_group):
# read new messages in the stream
while True:
result = consumer_group.my_stream.read(count=1, block=1000)
# Each record has the followinf format
# [(b'1598545738231-0', {b'message': b'\x06KimT\x00\x12Argentina\x00\x00'})]
if result:
message_id, message_content = result[0]
if message_id:
value = message_content[b"message"]
print(f"Processing message {message_id} with value {value}")
user = UserModel.deserialize(value)
print(user)
def produce(consumer_group):
for i in range(10):
# create an instance of User v1
user = UserModel(
name=random.choice(
[
"Juan",
"Peter",
"Michael",
"Moby",
"Kim",
]
),
age=random.randint(1, 50),
)
msgid = consumer_group.my_stream.add({"message": user.serialize()})
print(f"Producing message {msgid}")
print("Producer finished....")
print("#" * 80)
sleep(2)
def main():
db = Database()
stream_name = "my-stream"
db.Stream(stream_name) # Create a new stream instance
# create the consumer group
consumer_group = db.consumer_group("my-consumer-group-1", [stream_name])
consumer_group.create() # Create the consumer group.
consumer_group.set_id("$")
produce(consumer_group)
consume(consumer_group)
(This script is complete, it should run "as is")
RabbitMQ examples
Minimal rabbitmq example with pika driver.
import enum
import random
from dataclasses import dataclass
from typing import Optional
import pika
from dataclasses_avroschema import AvroModel
class FavoriteColor(enum.Enum):
BLUE = "BLUE"
YELLOW = "YELLOW"
GREEN = "GREEN"
@dataclass
class UserModel(AvroModel):
"An User"
name: str
age: int
favorite_colors: FavoriteColor = FavoriteColor.BLUE
country: str = "Argentina"
address: Optional[str] = None
class Meta:
namespace = "User.v1"
aliases = ["user-v1", "super user"]
def consume(*, channel, queue_name: str):
def callback(channel: str, method, properties, body):
print(f"Message received with body: {body} in channel {method} \n")
user = UserModel.deserialize(body)
print(f"Message deserialized: {user} \n")
channel.basic_consume(on_message_callback=callback, queue=queue_name)
channel.start_consuming()
def produce(*, channel, queue_name: str, total_events: int = 10):
for event_number in range(1, total_events + 1):
# Produce message
print(f"Sending event number {event_number}")
user = UserModel(
name=random.choice(
[
"Juan",
"Peter",
"Michael",
"Moby",
"Kim",
]
),
age=random.randint(1, 50),
)
# create the message
message = user.serialize()
# publish
channel.basic_publish(exchange="", routing_key=queue_name, body=message)
def main():
rabbitmq_url = "amqp://guest:guest@localhost:5672/%2f"
queue_name = "test-stream"
params = pika.URLParameters(rabbitmq_url)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
produce(channel=channel, queue_name=queue_name)
consume(channel=channel, queue_name=queue_name)
connection.close()
(This script is complete, it should run "as is")