Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.2.x] CORE-3189 dt/schema_registry: smoketests with python client #22524

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 203 additions & 2 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import random
import socket

from confluent_kafka.schema_registry import topic_subject_name_strategy, record_subject_name_strategy, topic_record_subject_name_strategy
from confluent_kafka.schema_registry import SchemaRegistryClient, topic_subject_name_strategy, record_subject_name_strategy, topic_record_subject_name_strategy, Schema, SchemaRegistryError, SchemaReference
from confluent_kafka.serialization import (MessageField, SerializationContext)
from ducktape.mark import parametrize, matrix
from ducktape.services.background_thread import BackgroundThreadService
Expand All @@ -37,7 +37,7 @@
from rptest.tests.cluster_config_test import wait_for_version_status_sync
from rptest.tests.pandaproxy_test import User, PandaProxyTLSProvider
from rptest.tests.redpanda_test import RedpandaTest
from rptest.util import inject_remote_script, search_logs_with_timeout
from rptest.util import expect_exception, inject_remote_script, search_logs_with_timeout


class SchemaIdValidationMode(str, Enum):
Expand Down Expand Up @@ -100,6 +100,15 @@ def get_subject_name(sns: str, topic: str, field: MessageField,
Simple id = 1;
}"""

well_known_proto_def = """
syntax = "proto3";

import "google/protobuf/timestamp.proto";

message Test3 {
google.protobuf.Timestamp timestamp = 1;
}"""

json_number_schema_def = '{"type": "number"}'

log_config = LoggingConfig('info',
Expand Down Expand Up @@ -3424,3 +3433,195 @@ def test_license_nag(self, mode):
wait_until(self._has_license_nag,
timeout_sec=self.LICENSE_CHECK_INTERVAL_SEC * 2,
err_msg="License nag failed to appear")


class SchemaRegistryConfluentClient(SchemaRegistryEndpoints):
"""
Test schema registry with the confluent python client
"""
def __init__(self, context, **kwargs):
super(SchemaRegistryConfluentClient, self).__init__(context, **kwargs)

self.sr_client = SchemaRegistryClient({'url': self._base_uri()})

@cluster(num_nodes=3)
@matrix(normalize_schemas=[True, False])
def test_register_get_lookup_schema(self, normalize_schemas):
"""
Verify that the register, get and lookup schema methods work
"""

test_subject = "topic_1-key"
schema1 = Schema(schema1_def, "AVRO")

result = self.sr_client.register_schema(
test_subject, schema1, normalize_schemas=normalize_schemas)
assert result == 1, f"Result: {result}"

result = self.sr_client.get_schema(1)
assert result == schema1, f"Result: {result}"

result = self.sr_client.lookup_schema(
test_subject, schema1, normalize_schemas=normalize_schemas)
assert result.schema_id == 1, f"Result: {result}"

# TODO: we currently always normalize schemas. Once redpanda supports
# the normize flag, this test should pass
# with expect_exception(SchemaRegistryError, lambda e: True):
# self.sr_client.lookup_schema(
# test_subject, schema1, normalize_schemas=not normalize_schemas)

@cluster(num_nodes=3)
def test_versions(self):
"""
Verify that the version endpoints work with the confluent client
"""

test_subject = "topic_1-key"
schema1 = Schema(schema1_def, "AVRO")
schema2 = Schema(schema2_def, "AVRO")

result = self.sr_client.register_schema(test_subject, schema1)
assert result == 1, f"Result: {result}"

result = self.sr_client.get_latest_version(test_subject)
assert result.schema_id == 1, f"Result: {result}"

result = self.sr_client.register_schema(test_subject, schema2)
assert result == 2, f"Result: {result}"

result = self.sr_client.get_latest_version(test_subject)
assert result.schema_id == 2, f"Result: {result}"

result = self.sr_client.get_version(test_subject, 1)
assert result.schema == schema1, f"Result: {result}"

result = self.sr_client.get_version(test_subject, 2)
assert result.schema == schema2, f"Result: {result}"

result = self.sr_client.get_versions(test_subject)
assert result == [1, 2], f"Result: {result}"

result = self.sr_client.delete_version(test_subject, 2)
assert result == 2, f"Result: {result}"

result = self.sr_client.get_latest_version(test_subject)
assert result.schema_id == 1, f"Result: {result}"

result = self.sr_client.get_versions(test_subject)
assert result == [1], f"Result: {result}"

with expect_exception(SchemaRegistryError, lambda e: True):
self.sr_client.get_version(test_subject, 2)

@cluster(num_nodes=3)
def test_set_get_compatibility(self):
"""
Verify that setting and getting the compatibility level works with the confluent client
"""

levels = [
"BACKWARD", "BACKWARD_TRANSITIVE", "FORWARD", "FORWARD_TRANSITIVE",
"FULL", "FULL_TRANSITIVE", "NONE"
]

for level in levels:
result = self.sr_client.set_compatibility(level=level)
assert result['compatibility'] == level, f"Result: {result}"

result = self.sr_client.get_compatibility()
assert result == level, f"Result: {result}"

test_subject = "topic_1-key"
schema1 = Schema(schema1_def, "AVRO")

result = self.sr_client.register_schema(test_subject, schema1)
assert result == 1, f"Result: {result}"

result = self.sr_client.set_compatibility(test_subject,
level=level)
assert result['compatibility'] == level, f"Result: {result}"

result = self.sr_client.get_compatibility(test_subject)
assert result == level, f"Result: {result}"

@cluster(num_nodes=3)
@matrix(permanent=[True, False])
def test_delete_subject(self, permanent):
"""
Verify that soft and hard deleting a subject works with the confluent client
"""

test_subject = "topic_1-key"
schema1 = Schema(schema1_def, "AVRO")

result = self.sr_client.register_schema(test_subject, schema1)
assert result == 1, f"Result: {result}"

result = self.sr_client.get_schema(1)
assert result == schema1, f"Result: {result}"

result = self.sr_client.delete_subject(test_subject,
permanent=permanent)
assert result == [1], f"Result: {result}"

result = self.sr_client.get_subjects()
assert len(result) == 0, f"Result: {result}"

@cluster(num_nodes=3)
def test_test_compatible(self):
"""
Verify that the test_compatible method of the confluent client works
"""

test_subject = "topic_1-key"
schema1 = Schema(schema1_def, "AVRO")
schema2 = Schema(schema2_def, "AVRO")
schema3 = Schema(schema3_def, "AVRO")

result = self.sr_client.register_schema(test_subject, schema1)
assert result == 1, f"Result: {result}"

result = self.sr_client.test_compatibility(test_subject, schema2)
assert result == True, f"Result: {result}"

result = self.sr_client.test_compatibility(test_subject, schema3)
assert result == False, f"Result: {result}"

@cluster(num_nodes=3)
def test_references(self):
"""
Verify that reference handling works with the confluent client
"""

simple_subject = "topic_1-key"
simple_schema = Schema(simple_proto_def, "PROTOBUF")

result = self.sr_client.register_schema(simple_subject, simple_schema)
assert result == 1, f"Result: {result}"

imported_subject = "topic_2-key"
imported_schema = Schema(
imported_proto_def,
"PROTOBUF",
references=[SchemaReference("simple", simple_subject, 1)])

result = self.sr_client.register_schema(imported_subject,
imported_schema)
assert result == 2, f"Result: {result}"

well_known_subject = "topic_3-key"
well_known_schema = Schema(well_known_proto_def, "PROTOBUF")

result = self.sr_client.register_schema(well_known_subject,
well_known_schema)
assert result == 3, f"Result: {result}"

result = self.sr_client.get_schema(1)
assert result == simple_schema, f"Result: {result}"

result = self.sr_client.get_schema(2)
assert result == imported_schema, f"Result: {result}"

result = self.sr_client.get_schema(3)
assert result == well_known_schema, f"Result: {result}"
Loading