From bf4d2978c0855247ff724c755639100dd76f894d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gell=C3=A9rt=20Peresztegi-Nagy?= Date: Thu, 25 Jul 2024 16:02:53 +0100 Subject: [PATCH] dt/schema_registry: smoketests with python client Test schema registry with the confluent python client. (cherry picked from commit 8a3c504eec1a45be045a677bff9848ed6ee4ef4f) --- tests/rptest/tests/schema_registry_test.py | 205 ++++++++++++++++++++- 1 file changed, 203 insertions(+), 2 deletions(-) diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 6d8ac8d8f811e..2b6d340fd34ee 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -18,7 +18,7 @@ import random import socket -from confluent_kafka.schema_registry import topic_subject_name_strategy, record_subject_name_strategy, topic_record_subject_name_strategy +from confluent_kafka.schema_registry import SchemaRegistryClient, topic_subject_name_strategy, record_subject_name_strategy, topic_record_subject_name_strategy, Schema, SchemaRegistryError, SchemaReference from confluent_kafka.serialization import (MessageField, SerializationContext) from ducktape.mark import parametrize, matrix from ducktape.services.background_thread import BackgroundThreadService @@ -37,7 +37,7 @@ from rptest.tests.cluster_config_test import wait_for_version_status_sync from rptest.tests.pandaproxy_test import User, PandaProxyTLSProvider from rptest.tests.redpanda_test import RedpandaTest -from rptest.util import inject_remote_script, search_logs_with_timeout +from rptest.util import expect_exception, inject_remote_script, search_logs_with_timeout class SchemaIdValidationMode(str, Enum): @@ -100,6 +100,15 @@ def get_subject_name(sns: str, topic: str, field: MessageField, Simple id = 1; }""" +well_known_proto_def = """ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +message Test3 { + google.protobuf.Timestamp timestamp = 1; +}""" + json_number_schema_def = '{"type": "number"}' log_config = LoggingConfig('info', @@ -3424,3 +3433,195 @@ def test_license_nag(self, mode): wait_until(self._has_license_nag, timeout_sec=self.LICENSE_CHECK_INTERVAL_SEC * 2, err_msg="License nag failed to appear") + + +class SchemaRegistryConfluentClient(SchemaRegistryEndpoints): + """ + Test schema registry with the confluent python client + """ + def __init__(self, context, **kwargs): + super(SchemaRegistryConfluentClient, self).__init__(context, **kwargs) + + self.sr_client = SchemaRegistryClient({'url': self._base_uri()}) + + @cluster(num_nodes=3) + @matrix(normalize_schemas=[True, False]) + def test_register_get_lookup_schema(self, normalize_schemas): + """ + Verify that the register, get and lookup schema methods work + """ + + test_subject = "topic_1-key" + schema1 = Schema(schema1_def, "AVRO") + + result = self.sr_client.register_schema( + test_subject, schema1, normalize_schemas=normalize_schemas) + assert result == 1, f"Result: {result}" + + result = self.sr_client.get_schema(1) + assert result == schema1, f"Result: {result}" + + result = self.sr_client.lookup_schema( + test_subject, schema1, normalize_schemas=normalize_schemas) + assert result.schema_id == 1, f"Result: {result}" + + # TODO: we currently always normalize schemas. Once redpanda supports + # the normize flag, this test should pass + # with expect_exception(SchemaRegistryError, lambda e: True): + # self.sr_client.lookup_schema( + # test_subject, schema1, normalize_schemas=not normalize_schemas) + + @cluster(num_nodes=3) + def test_versions(self): + """ + Verify that the version endpoints work with the confluent client + """ + + test_subject = "topic_1-key" + schema1 = Schema(schema1_def, "AVRO") + schema2 = Schema(schema2_def, "AVRO") + + result = self.sr_client.register_schema(test_subject, schema1) + assert result == 1, f"Result: {result}" + + result = self.sr_client.get_latest_version(test_subject) + assert result.schema_id == 1, f"Result: {result}" + + result = self.sr_client.register_schema(test_subject, schema2) + assert result == 2, f"Result: {result}" + + result = self.sr_client.get_latest_version(test_subject) + assert result.schema_id == 2, f"Result: {result}" + + result = self.sr_client.get_version(test_subject, 1) + assert result.schema == schema1, f"Result: {result}" + + result = self.sr_client.get_version(test_subject, 2) + assert result.schema == schema2, f"Result: {result}" + + result = self.sr_client.get_versions(test_subject) + assert result == [1, 2], f"Result: {result}" + + result = self.sr_client.delete_version(test_subject, 2) + assert result == 2, f"Result: {result}" + + result = self.sr_client.get_latest_version(test_subject) + assert result.schema_id == 1, f"Result: {result}" + + result = self.sr_client.get_versions(test_subject) + assert result == [1], f"Result: {result}" + + with expect_exception(SchemaRegistryError, lambda e: True): + self.sr_client.get_version(test_subject, 2) + + @cluster(num_nodes=3) + def test_set_get_compatibility(self): + """ + Verify that setting and getting the compatibility level works with the confluent client + """ + + levels = [ + "BACKWARD", "BACKWARD_TRANSITIVE", "FORWARD", "FORWARD_TRANSITIVE", + "FULL", "FULL_TRANSITIVE", "NONE" + ] + + for level in levels: + result = self.sr_client.set_compatibility(level=level) + assert result['compatibility'] == level, f"Result: {result}" + + result = self.sr_client.get_compatibility() + assert result == level, f"Result: {result}" + + test_subject = "topic_1-key" + schema1 = Schema(schema1_def, "AVRO") + + result = self.sr_client.register_schema(test_subject, schema1) + assert result == 1, f"Result: {result}" + + result = self.sr_client.set_compatibility(test_subject, + level=level) + assert result['compatibility'] == level, f"Result: {result}" + + result = self.sr_client.get_compatibility(test_subject) + assert result == level, f"Result: {result}" + + @cluster(num_nodes=3) + @matrix(permanent=[True, False]) + def test_delete_subject(self, permanent): + """ + Verify that soft and hard deleting a subject works with the confluent client + """ + + test_subject = "topic_1-key" + schema1 = Schema(schema1_def, "AVRO") + + result = self.sr_client.register_schema(test_subject, schema1) + assert result == 1, f"Result: {result}" + + result = self.sr_client.get_schema(1) + assert result == schema1, f"Result: {result}" + + result = self.sr_client.delete_subject(test_subject, + permanent=permanent) + assert result == [1], f"Result: {result}" + + result = self.sr_client.get_subjects() + assert len(result) == 0, f"Result: {result}" + + @cluster(num_nodes=3) + def test_test_compatible(self): + """ + Verify that the test_compatible method of the confluent client works + """ + + test_subject = "topic_1-key" + schema1 = Schema(schema1_def, "AVRO") + schema2 = Schema(schema2_def, "AVRO") + schema3 = Schema(schema3_def, "AVRO") + + result = self.sr_client.register_schema(test_subject, schema1) + assert result == 1, f"Result: {result}" + + result = self.sr_client.test_compatibility(test_subject, schema2) + assert result == True, f"Result: {result}" + + result = self.sr_client.test_compatibility(test_subject, schema3) + assert result == False, f"Result: {result}" + + @cluster(num_nodes=3) + def test_references(self): + """ + Verify that reference handling works with the confluent client + """ + + simple_subject = "topic_1-key" + simple_schema = Schema(simple_proto_def, "PROTOBUF") + + result = self.sr_client.register_schema(simple_subject, simple_schema) + assert result == 1, f"Result: {result}" + + imported_subject = "topic_2-key" + imported_schema = Schema( + imported_proto_def, + "PROTOBUF", + references=[SchemaReference("simple", simple_subject, 1)]) + + result = self.sr_client.register_schema(imported_subject, + imported_schema) + assert result == 2, f"Result: {result}" + + well_known_subject = "topic_3-key" + well_known_schema = Schema(well_known_proto_def, "PROTOBUF") + + result = self.sr_client.register_schema(well_known_subject, + well_known_schema) + assert result == 3, f"Result: {result}" + + result = self.sr_client.get_schema(1) + assert result == simple_schema, f"Result: {result}" + + result = self.sr_client.get_schema(2) + assert result == imported_schema, f"Result: {result}" + + result = self.sr_client.get_schema(3) + assert result == well_known_schema, f"Result: {result}"