Skip to content

Python Rest Client Schema Registry

Python package GitHub license codecov Python Version

Python Rest Client to interact against schema-registry confluent server to manage Avro and JSON schemas resources.

Requirements

python 3.8+

Installation

pip install python-schema-registry-client

If you want the Faust functionality:

pip install python-schema-registry-client[faust]

Usage

Trival Usage with avro
from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},
    ],
}

avro_schema = schema.AvroSchema(deployment_schema)

schema_id = client.register("test-deployment", avro_schema)
Trival Usage Async with avro
from schema_registry.client import AsyncSchemaRegistryClient, schema

async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},
    ],
}

avro_schema = schema.AvroSchema(deployment_schema)

schema_id = await async_client.register("test-deployment", avro_schema)
Trival Usage with json schemas
from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "definitions" : {
        "JsonDeployment" : {
            "type" : "object",
            "required" : ["image", "replicas", "port"],
            "properties" : {
                "image" :       {"type" : "string"},
                "replicas" :    {"type" : "integer"},
                "port" :        {"type" : "integer"}
            }
        }
    },
    "$ref" : "#/definitions/JsonDeployment"
}

json_schema = schema.JsonSchema(deployment_schema)

schema_id = client.register("test-deployment", json_schema)
Trival Usage Asynv with json schemas
from schema_registry.client import AsyncSchemaRegistryClient, schema

async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")

deployment_schema = {
    "definitions" : {
        "JsonDeployment" : {
            "type" : "object",
            "required" : ["image", "replicas", "port"],
            "properties" : {
                "image" :       {"type" : "string"},
                "replicas" :    {"type" : "integer"},
                "port" :        {"type" : "integer"}
            }
        }
    },
    "$ref" : "#/definitions/JsonDeployment"
}

json_schema = schema.JsonSchema(deployment_schema)

schema_id = await async_client.register("test-deployment", json_schema)

Usage with dataclasses-avroschema

You can generate the avro schema and json schemas directely from a python class using dataclasses-avroschema and use it in the API for register schemas, check versions and test compatibility:

Trival Usage with dataclasses-avroschema
import dataclasses
from enum import Enum
import typing

from dataclasses_avroschema import AvroModel
from schema_registry.client import SchemaRegistryClient

client = SchemaRegistryClient(url="http://127.0.0.1:8081")


class ColorEnum(str, Enum):
    BLUE = "BLUE"
    YELLOW = "YELLOW"
    GREEN = "GREEN"


@dataclasses.dataclass
class UserAdvance(AvroModel):
    name: str
    age: int
    pets: typing.List[str] = dataclasses.field(default_factory=lambda: ["dog", "cat"])
    accounts: typing.Dict[str, int] = dataclasses.field(default_factory=lambda: {"key": 1})
    has_car: bool = False
    favorite_colors: ColorEnum = ColorEnum.BLUE
    country: str = "Argentina"
    address: str = None

subject = "subject"

# register the schema
schema_id = client.register(subject, UserAdvance.avro_schema())

print(schema_id)
# >>> 12

result = client.check_version(subject, UserAdvance.avro_schema())
print(result)
# >>> SchemaVersion(subject='dataclasses-avroschema-subject-2', schema_id=12, schema=1, version={"type":"record" ...')

compatible = client.test_compatibility(subject, UserAdvance.avro_schema())
print(compatible)
# >>> True

Note

You can generate json schemas with dataclasses-avroschema adding the pydantic batteries

Usage with pydantic for json schemas

You can generate the json schema directely from a python class using pydantic and use it in the API for register schemas, check versions and test compatibility:

Trival Usage with pydantic
import typing
from enum import Enum

from pydantic import BaseModel
from schema_registry.client import SchemaRegistryClient

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

class ColorEnum(str, Enum):
  BLUE = "BLUE"
  YELLOW = "YELLOW"
  GREEN = "GREEN"


class UserAdvance(BaseModel):
    name: str
    age: int
    pets: typing.List[str] = ["dog", "cat"]
    accounts: typing.Dict[str, int] = {"key": 1}
    has_car: bool = False
    favorite_colors: ColorEnum = ColorEnum.BLUE
    country: str = "Argentina"
    address: str = None

subject = "subject"

# register the schema
schema_id = client.register(subject, UserAdvance.model_json_schema(), schema_type="JSON")

print(schema_id)
# >>> 12

result = client.check_version(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(result)
# >>> SchemaVersion(subject='pydantic-jsonschema-subject', schema_id=12, schema=1, version=<schema_registry.client.schema.JsonSchema object at 0x7f40354550a0>)

compatible = client.test_compatibility(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(compatible)
# >>> True

When use this library

Usually, we have a situacion like this:

Confluent Architecture

So, our producers/consumers have to serialize/deserialize messages every time that they send/receive from Kafka topics. In this picture, we can imagine a Faust application receiving messages (encoded with an Avro schema) and we want to deserialize them, so we can ask the schema server to do that for us. In this scenario, the MessageSerializer is perfect.

Also, could be a use case that we would like to have an Application only to administrate Avro Schemas (register, update compatibilities, delete old schemas, etc.), so the SchemaRegistryClient is perfect.

Development

Poetry is needed to install the dependencies and develope locally

  1. Install dependencies: poetry install --all-extras
  2. Code linting: ./scripts/format
  3. Run tests: ./scripts/test

For commit messages we use commitizen in order to standardize a way of committing rules

Note: The tests are run against the Schema Server using docker compose, so you will need Docker and Docker Compose installed.

In a terminal run docker-compose up. Then in a different terminal run the tests:

./scripts/test

All additional args will be passed to pytest, for example:

./scripts/test ./tests/client/

Tests usind the python shell

To perform tests using the python shell you can run the project using docker-compose.

  1. Build: docker-compose build --build-arg PYTHON_VERSION=$PYTHON_VERSION
  2. Execute docker-compose up. Then, the schema registry server will run on http://127.0.0.1:8081, then you can interact against it using the SchemaRegistryClient:
  3. Use the python interpreter (get a python shell typing python in your command line)
  4. Play with the schema server
from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="http://127.0.0.1:8081")

# do some operations with the client...
deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},
    ],
}

avro_schema = schema.AvroSchema(deployment_schema)
client.register("test-deployment", avro_schema)
# >>>> Out[5]: 1

Then, you can check the schema using your browser going to the url http://127.0.0.1:8081/schemas/ids/1