Python Rest Client Schema Registry
Python Rest Client to interact against schema-registry confluent server to manage Avro and JSON schemas resources.
Requirements
python 3.8+
Installation
pip install python-schema-registry-client
If you want the Faust
functionality:
pip install python-schema-registry-client[faust]
Usage
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
schema_id = client.register("test-deployment", avro_schema)
from schema_registry.client import AsyncSchemaRegistryClient, schema
async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
schema_id = await async_client.register("test-deployment", avro_schema)
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"definitions" : {
"JsonDeployment" : {
"type" : "object",
"required" : ["image", "replicas", "port"],
"properties" : {
"image" : {"type" : "string"},
"replicas" : {"type" : "integer"},
"port" : {"type" : "integer"}
}
}
},
"$ref" : "#/definitions/JsonDeployment"
}
json_schema = schema.JsonSchema(deployment_schema)
schema_id = client.register("test-deployment", json_schema)
from schema_registry.client import AsyncSchemaRegistryClient, schema
async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"definitions" : {
"JsonDeployment" : {
"type" : "object",
"required" : ["image", "replicas", "port"],
"properties" : {
"image" : {"type" : "string"},
"replicas" : {"type" : "integer"},
"port" : {"type" : "integer"}
}
}
},
"$ref" : "#/definitions/JsonDeployment"
}
json_schema = schema.JsonSchema(deployment_schema)
schema_id = await async_client.register("test-deployment", json_schema)
Usage with dataclasses-avroschema
You can generate the avro schema
and json schemas
directely from a python class using dataclasses-avroschema
and use it in the API for register schemas
, check versions
and test compatibility
:
import dataclasses
from enum import Enum
import typing
from dataclasses_avroschema import AvroModel
from schema_registry.client import SchemaRegistryClient
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
class ColorEnum(str, Enum):
BLUE = "BLUE"
YELLOW = "YELLOW"
GREEN = "GREEN"
@dataclasses.dataclass
class UserAdvance(AvroModel):
name: str
age: int
pets: typing.List[str] = dataclasses.field(default_factory=lambda: ["dog", "cat"])
accounts: typing.Dict[str, int] = dataclasses.field(default_factory=lambda: {"key": 1})
has_car: bool = False
favorite_colors: ColorEnum = ColorEnum.BLUE
country: str = "Argentina"
address: str = None
subject = "subject"
# register the schema
schema_id = client.register(subject, UserAdvance.avro_schema())
print(schema_id)
# >>> 12
result = client.check_version(subject, UserAdvance.avro_schema())
print(result)
# >>> SchemaVersion(subject='dataclasses-avroschema-subject-2', schema_id=12, schema=1, version={"type":"record" ...')
compatible = client.test_compatibility(subject, UserAdvance.avro_schema())
print(compatible)
# >>> True
Note
You can generate json schemas with dataclasses-avroschema
adding the pydantic batteries
Usage with pydantic for json schemas
You can generate the json schema directely from a python class using pydantic and use it in the API for register schemas, check versions and test compatibility:
import typing
from enum import Enum
from pydantic import BaseModel
from schema_registry.client import SchemaRegistryClient
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
class ColorEnum(str, Enum):
BLUE = "BLUE"
YELLOW = "YELLOW"
GREEN = "GREEN"
class UserAdvance(BaseModel):
name: str
age: int
pets: typing.List[str] = ["dog", "cat"]
accounts: typing.Dict[str, int] = {"key": 1}
has_car: bool = False
favorite_colors: ColorEnum = ColorEnum.BLUE
country: str = "Argentina"
address: str = None
subject = "subject"
# register the schema
schema_id = client.register(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(schema_id)
# >>> 12
result = client.check_version(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(result)
# >>> SchemaVersion(subject='pydantic-jsonschema-subject', schema_id=12, schema=1, version=<schema_registry.client.schema.JsonSchema object at 0x7f40354550a0>)
compatible = client.test_compatibility(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(compatible)
# >>> True
When use this library
Usually, we have a situacion like this:
So, our producers/consumers have to serialize/deserialize messages every time that they send/receive from Kafka topics. In this picture, we can imagine a Faust
application receiving messages (encoded with an Avro schema) and we want to deserialize them, so we can ask the schema server
to do that for us. In this scenario, the MessageSerializer
is perfect.
Also, could be a use case that we would like to have an Application only to administrate Avro Schemas
(register, update compatibilities, delete old schemas, etc.), so the SchemaRegistryClient
is perfect.
Development
Poetry is needed to install the dependencies and develope locally
- Install dependencies:
poetry install --all-extras
- Code linting:
./scripts/format
- Run tests:
./scripts/test
For commit messages we use commitizen in order to standardize a way of committing rules
Note: The tests are run against the Schema Server
using docker compose
, so you will need
Docker
and Docker Compose
installed.
In a terminal run docker-compose up
. Then in a different terminal run the tests:
./scripts/test
All additional args will be passed to pytest, for example:
./scripts/test ./tests/client/
Tests usind the python shell
To perform tests using the python shell you can run the project using docker-compose
.
- Build:
docker-compose build --build-arg PYTHON_VERSION=$PYTHON_VERSION
- Execute
docker-compose up
. Then, theschema registry server
will run onhttp://127.0.0.1:8081
, then you can interact against it using theSchemaRegistryClient
: - Use the python interpreter (get a python shell typing
python
in your command line) - Play with the
schema server
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
# do some operations with the client...
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
client.register("test-deployment", avro_schema)
# >>>> Out[5]: 1
Then, you can check the schema using your browser going to the url http://127.0.0.1:8081/schemas/ids/1