Skip to content

How to use it with Faust

This section describe how integrate this library with Faust

Schemas, Custom Codecs and Serializers

Because we want to be sure that the message that we encode are valid, we can use Avro or JSON schemas. Also, Introduction to Schemas in Apache Kafka with the Confluent Schema Registry is a good post to start with schemas. Avro and JSON can be used to define the data schema for a record's value. This schema describes the fields allowed in the value, along with their data types.

In order to use avro schemas or json schemas with Faust, we need to define a custom codec and a custom serializer able to talk with the schema-registry, and to do that, we will use the MessageSerializer.

For serializing avro schemas we should use the FaustSerializer. For serializing json schemas we should use the FaustJsonSerializer.

For our demonstration, let's imagine that we have the following avro schema:

{
    "type": "record",
    "namespace": "com.example",
    "name": "AvroUsers",
    "fields": [
        {"name": "first_name", "type": "string"},
        {"name": "last_name", "type": "string"}
    ]
}

Let's register the custom codec

Trivial Usage
# codecs.codec.py
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers.faust import FaustSerializer

# create an instance of the `SchemaRegistryClient`
client = SchemaRegistryClient(url=settings.SCHEMA_REGISTRY_URL)

# schema that we want to use. For this example we
# are using a dict, but this schema could be located in a file called avro_user_schema.avsc
avro_user_schema = schema.AvroSchema({
    "type": "record",
    "namespace": "com.example",
    "name": "AvroUsers",
    "fields": [
        {"name": "first_name", "type": "string"},
        {"name": "last_name", "type": "string"}
    ]
})

avro_user_serializer = FaustSerializer(client, "users", avro_user_schema)

# function used to register the codec
def avro_user_codec():
    return avro_user_serializer

and add in setup.py the following code in order to tell faust where to find the custom codecs.

setup.py
setup(
    ...
    entry_points={
        'console_scripts': [
            'example = example.app:main',
        ],
        'faust.codecs': [
            'avro_users = example.codecs.avro:avro_user_codec',
        ],
    },
)

or if you are using poetry an a pyproject.toml you can add in pyproject.toml the following code to tell faust where to find the custom codecs:

pyproject.toml
[tool.poetry.scripts]
example = "example.app:main"

[tool.poetry.plugins."faust.codecs"]
avro_users = "example.codecs.avro:avro_user_codec"

Now the final step is to integrate the faust model with the AvroSerializer.

user.models.py
import faust


class UserModel(faust.Record, serializer='avro_users'):
    first_name: str
    last_name: str

Now our application is able to send and receive message using arvo schemas!!!! :-)

application
import logging

from your_project.app import app
from .codecs.codec import avro_user_serializer
from .models import UserModel

users_topic = app.topic('avro_users', partitions=1, value_type=UserModel)

logger = logging.getLogger(__name__)


@app.agent(users_topic)
async def users(users):
    async for user in users:
        logger.info("Event received in topic avro_users")
        logger.info(f"First Name: {user.first_name}, last name {user.last_name}")


@app.timer(5.0, on_leader=True)
async def publish_users():
    logger.info('PUBLISHING ON LEADER FOR USERS APP!')
    user = {"first_name": "foo", "last_name": "bar"}
    await users.send(value=user, value_serializer=avro_user_serializer)

The full example is here

Usage with dataclasses-avroschema for avro schemas

You can also use this funcionality with dataclasses-avroschema and you won't have to provide the avro schema. The only thing that you need to do is add the AvroModel class and use its methods:

user.models.py
import faust

from dataclasses_avroschema.faust import AvroRecord


class UserModel(AvroRecord, serializer='avro_users'):
    first_name: str
    last_name: str


# codecs.codec.py
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers.faust import FaustSerializer

from users.models import UserModel

# create an instance of the `SchemaRegistryClient`
client = SchemaRegistryClient(url=settings.SCHEMA_REGISTRY_URL)

avro_user_serializer = FaustSerializer(client, "users", UserModel.avro_schema())  # usign the method avro_schema to get the avro schema representation

# function used to register the codec
def avro_user_codec():
    return avro_user_serializer

Usage with pydantic for json schemas

You can also use this funcionality with dataclasses-pydantic and you won't have to provide the json schema. The only thing that you need to do is add the BaseModel class and use its methods:

users.models.py
import faust

from pydantic import BaseModel


class UserModel(faust.Record, BaseModel, serializer='json_users'):
    first_name: str
    last_name: str


# codecs.codec.py
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers.faust import FaustJsonSerializer

from users.models import UserModel

# create an instance of the `SchemaRegistryClient`
client = SchemaRegistryClient(url=settings.SCHEMA_REGISTRY_URL)

json_user_serializer = FaustJsonSerializer(client, "users", UserModel.model_json_schema())  # using the method model_json_schema to get the json schema representation

# function used to register the codec
def json_user_codec():
    return json_user_serializer