diff --git a/src/v/pandaproxy/api/api-doc/schema_registry.json b/src/v/pandaproxy/api/api-doc/schema_registry.json index 4fde928d77ea..bbf96809050f 100644 --- a/src/v/pandaproxy/api/api-doc/schema_registry.json +++ b/src/v/pandaproxy/api/api-doc/schema_registry.json @@ -185,17 +185,210 @@ "application/json" ], "parameters": [], - "produces": ["application/vnd.schemaregistry.v1+json"], + "produces": [ + "application/vnd.schemaregistry.v1+json", + "application/vnd.schemaregistry+json", + "application/json" + ], "responses": { "200": { "description": "OK", "schema": { - "type": "object", - "properties": { - "mode": { - "type": "string" - } - } + "$ref": "#/definitions/mode" + } + }, + "500": { + "description": "Internal Server error", + "schema": { + "$ref": "#/definitions/error_body" + } + } + } + }, + "put": { + "summary": "Set the global mode.", + "operationId": "put_mode", + "consumes": [ + "application/vnd.schemaregistry.v1+json", + "application/vnd.schemaregistry+json", + "application/json" + ], + "parameters": [ + { + "name": "mode", + "in": "body", + "schema": { + "$ref": "#/definitions/mode" + } + } + ], + "produces": [ + "application/vnd.schemaregistry.v1+json", + "application/vnd.schemaregistry+json", + "application/json" + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/mode" + } + }, + "422": { + "description": "Unprocessable Entity", + "schema": { + "$ref": "#/definitions/error_body" + } + }, + "500": { + "description": "Internal Server error", + "schema": { + "$ref": "#/definitions/error_body" + } + } + } + } + }, + "/mode/{subject}": { + "get": { + "summary": "Get the mode for a subject.", + "operationId": "get_mode_subject", + "consumes": [ + "application/vnd.schemaregistry.v1+json", + "application/vnd.schemaregistry+json", + "application/json" + ], + "parameters": [ + { + "name": "subject", + "description": "The subject to get the mode for.", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "defaultToGlobal", + "description": "If true, return the global mode if the subject doesn't have a mode set.", + "in": "query", + "required": false, + "type": "boolean" + } + ], + "produces": [ + "application/vnd.schemaregistry.v1+json", + "application/vnd.schemaregistry+json", + "application/json" + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/mode" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/error_body" + } + }, + "500": { + "description": "Internal Server error", + "schema": { + "$ref": "#/definitions/error_body" + } + } + } + }, + "put": { + "summary": "Set the mode for a subject.", + "operationId": "put_mode_subject", + "consumes": [ + "application/vnd.schemaregistry.v1+json", + "application/vnd.schemaregistry+json", + "application/json" + ], + "parameters": [ + { + "name": "subject", + "description": "The subject to set the mode for.", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "mode", + "in": "body", + "schema": { + "$ref": "#/definitions/mode" + } + } + ], + "produces": [ + "application/vnd.schemaregistry.v1+json", + "application/vnd.schemaregistry+json", + "application/json" + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/mode" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/error_body" + } + }, + "422": { + "description": "Unprocessable Entity", + "schema": { + "$ref": "#/definitions/error_body" + } + }, + "500": { + "description": "Internal Server error", + "schema": { + "$ref": "#/definitions/error_body" + } + } + } + }, + "delete": { + "summary": "Delete the mode for a subject.", + "operationId": "delete_mode_subject", + "consumes": [ + "application/vnd.schemaregistry.v1+json", + "application/vnd.schemaregistry+json", + "application/json" + ], + "parameters": [ + { + "name": "subject", + "description": "The subject to delete the mode for.", + "in": "path", + "required": true, + "type": "string" + } + ], + "produces": [ + "application/vnd.schemaregistry.v1+json", + "application/vnd.schemaregistry+json", + "application/json" + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/mode" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/error_body" } }, "500": { @@ -423,7 +616,7 @@ }, "/subjects/{subject}": { "post": { - "summary": "Check if a schema is already registred for the subject.", + "summary": "Check if a schema is already registered for the subject.", "operationId": "post_subject", "consumes": [ "application/vnd.schemaregistry.v1+json", diff --git a/src/v/pandaproxy/api/api-doc/schema_registry_definitions.def.json b/src/v/pandaproxy/api/api-doc/schema_registry_definitions.def.json index 51aaa223c517..ac3ce8de8d38 100644 --- a/src/v/pandaproxy/api/api-doc/schema_registry_definitions.def.json +++ b/src/v/pandaproxy/api/api-doc/schema_registry_definitions.def.json @@ -89,5 +89,17 @@ "compatibility": { "type": "string" } + } + }, + "mode": { + "type": "object", + "properties": { + "mode": { + "type": "string", + "enum": [ + "READWRITE", + "READONLY" + ] + }, } } \ No newline at end of file diff --git a/src/v/pandaproxy/api/api-doc/schema_registry_header.json b/src/v/pandaproxy/api/api-doc/schema_registry_header.json index 6f807a01b416..5245535773be 100644 --- a/src/v/pandaproxy/api/api-doc/schema_registry_header.json +++ b/src/v/pandaproxy/api/api-doc/schema_registry_header.json @@ -2,7 +2,7 @@ "swagger": "2.0", "info": { "title": "Pandaproxy Schema Registry", - "version": "1.0.3" + "version": "1.0.4" }, "host": "{{Host}}", "basePath": "/", diff --git a/src/v/pandaproxy/auth_utils.h b/src/v/pandaproxy/auth_utils.h index 3102d73dda65..5d752f909b2c 100644 --- a/src/v/pandaproxy/auth_utils.h +++ b/src/v/pandaproxy/auth_utils.h @@ -39,4 +39,47 @@ inline credential_t maybe_authenticate_request( return user; } + +enum class auth_level { + // Unauthenticated endpoint (not a typo, 'public' is a keyword) + publik = 0, + // Requires authentication (if enabled) but not superuser status + user = 1, + // Requires authentication (if enabled) and superuser status + superuser = 2 +}; + +inline credential_t maybe_authorize_request( + config::rest_authn_method authn_method, + auth_level lvl, + request_authenticator& authenticator, + const ss::http::request& req) { + credential_t user; + + if (authn_method != config::rest_authn_method::none) { + // Will throw 400 & 401 if auth fails + auto auth_result = authenticator.authenticate(req); + // Will throw 403 if user enabled HTTP Basic Auth but + // did not give the authorization header. + switch (lvl) { + case auth_level::superuser: + auth_result.require_superuser(); + break; + case auth_level::user: + auth_result.require_authenticated(); + break; + case auth_level::publik: + auth_result.pass(); + break; + } + + user = credential_t{ + auth_result.get_username(), + auth_result.get_password(), + auth_result.get_sasl_mechanism()}; + } + + return user; +} + } // namespace pandaproxy diff --git a/src/v/pandaproxy/error.cc b/src/v/pandaproxy/error.cc index 33a4c074b709..aa119d91513e 100644 --- a/src/v/pandaproxy/error.cc +++ b/src/v/pandaproxy/error.cc @@ -129,6 +129,8 @@ struct reply_error_category final : std::error_category { return "subject_version_not_deleted"; case reply_error_code::compatibility_not_found: return "compatibility_not_found"; + case reply_error_code::mode_not_found: + return "mode_not_found"; case reply_error_code::serialization_error: return "serialization_error"; case reply_error_code::consumer_already_exists: @@ -140,6 +142,8 @@ struct reply_error_category final : std::error_category { return "Invalid schema version"; case reply_error_code::compatibility_level_invalid: return "Invalid compatibility level"; + case reply_error_code::mode_invalid: + return "Invalid mode"; case reply_error_code::subject_version_operaton_not_permitted: return "Overwrite new schema is not permitted."; case reply_error_code::subject_version_has_references: diff --git a/src/v/pandaproxy/error.h b/src/v/pandaproxy/error.h index 0caba1ef0eaa..1216eae492c4 100644 --- a/src/v/pandaproxy/error.h +++ b/src/v/pandaproxy/error.h @@ -78,11 +78,13 @@ enum class reply_error_code : uint16_t { subject_version_soft_deleted = 40406, subject_version_not_deleted = 40407, compatibility_not_found = 40408, + mode_not_found = 40409, serialization_error = 40801, consumer_already_exists = 40902, schema_empty = 42201, schema_version_invalid = 42202, compatibility_level_invalid = 42203, + mode_invalid = 42204, subject_version_operaton_not_permitted = 42205, subject_version_has_references = 42206, subject_version_schema_id_already_exists = 42207, diff --git a/src/v/pandaproxy/schema_registry/api.cc b/src/v/pandaproxy/schema_registry/api.cc index 1b19bbc5e84f..092996683d91 100644 --- a/src/v/pandaproxy/schema_registry/api.cc +++ b/src/v/pandaproxy/schema_registry/api.cc @@ -45,7 +45,7 @@ api::~api() noexcept = default; ss::future<> api::start() { _store = std::make_unique(); - co_await _store->start(_sg); + co_await _store->start(is_mutable(_cfg.mode_mutability), _sg); co_await _schema_id_validation_probe.start(); co_await _schema_id_validation_probe.invoke_on_all( &schema_id_validation_probe::setup_metrics); diff --git a/src/v/pandaproxy/schema_registry/configuration.cc b/src/v/pandaproxy/schema_registry/configuration.cc index 62bf5729b23e..68c05f207865 100644 --- a/src/v/pandaproxy/schema_registry/configuration.cc +++ b/src/v/pandaproxy/schema_registry/configuration.cc @@ -32,6 +32,7 @@ configuration::configuration() {}, {}, config::endpoint_tls_config::validate_many) + , mode_mutability(*this, "mode_mutability", "Allow modifying mode", {}, false) , schema_registry_replication_factor( *this, "schema_registry_replication_factor", diff --git a/src/v/pandaproxy/schema_registry/configuration.h b/src/v/pandaproxy/schema_registry/configuration.h index b946a066270d..182582e85303 100644 --- a/src/v/pandaproxy/schema_registry/configuration.h +++ b/src/v/pandaproxy/schema_registry/configuration.h @@ -26,6 +26,7 @@ struct configuration final : public config::config_store { schema_registry_api; config::one_or_many_property schema_registry_api_tls; + config::property mode_mutability; config::property> schema_registry_replication_factor; config::property api_doc_dir; }; diff --git a/src/v/pandaproxy/schema_registry/error.cc b/src/v/pandaproxy/schema_registry/error.cc index 9fed38cd0985..70e7d797e2e7 100644 --- a/src/v/pandaproxy/schema_registry/error.cc +++ b/src/v/pandaproxy/schema_registry/error.cc @@ -53,6 +53,8 @@ struct error_category final : std::error_category { case error_code::compatibility_not_found: return "Subject does not have subject-level compatibility " "configured"; + case error_code::mode_not_found: + return "Subject does not have subject-level mode configured"; case error_code::subject_version_operaton_not_permitted: return "Overwrite new schema is not permitted."; case error_code::subject_version_has_references: @@ -69,6 +71,8 @@ struct error_category final : std::error_category { return "Invalid compatibility level. Valid values are NONE, " "BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, " "FORWARD_TRANSITIVE, and FULL_TRANSITIVE"; + case error_code::mode_invalid: + return "Invalid mode. Valid values are READWRITE, READONLY"; } return "(unrecognized error)"; } @@ -93,6 +97,8 @@ struct error_category final : std::error_category { return reply_error_code::subject_version_not_deleted; // 40407 case error_code::compatibility_not_found: return reply_error_code::compatibility_not_found; // 40408 + case error_code::mode_not_found: + return reply_error_code::mode_not_found; // 40409 case error_code::subject_schema_invalid: return reply_error_code::internal_server_error; // 500 case error_code::write_collision: @@ -117,6 +123,8 @@ struct error_category final : std::error_category { return reply_error_code::zookeeper_error; // 50001 case error_code::compatibility_level_invalid: return reply_error_code::compatibility_level_invalid; // 42203 + case error_code::mode_invalid: + return reply_error_code::mode_invalid; // 42204 } return {}; } diff --git a/src/v/pandaproxy/schema_registry/error.h b/src/v/pandaproxy/schema_registry/error.h index 33afa3cefcda..9cbdb87e5a47 100644 --- a/src/v/pandaproxy/schema_registry/error.h +++ b/src/v/pandaproxy/schema_registry/error.h @@ -29,6 +29,7 @@ enum class error_code { subject_version_soft_deleted, subject_version_not_deleted, compatibility_not_found, + mode_not_found, subject_version_operaton_not_permitted, subject_version_has_references, subject_version_schema_id_already_exists, @@ -36,6 +37,7 @@ enum class error_code { write_collision, topic_parse_error, compatibility_level_invalid, + mode_invalid, }; std::error_code make_error_code(error_code); diff --git a/src/v/pandaproxy/schema_registry/errors.h b/src/v/pandaproxy/schema_registry/errors.h index 62baed14f84a..5ae5bbb587dc 100644 --- a/src/v/pandaproxy/schema_registry/errors.h +++ b/src/v/pandaproxy/schema_registry/errors.h @@ -20,6 +20,11 @@ namespace pandaproxy::schema_registry { +/// \brief error_info stores an error_code and custom message. +/// +/// This class is useful for transporting via an outcome::result +/// and automatic conversion to an `exception`. +/// See `outcome_throw_as_system_error_with_payload`. class error_info { public: error_info() = default; @@ -69,6 +74,13 @@ inline error_info not_found(const subject& sub) { fmt::format("Subject '{}' not found.", sub())}; } +inline error_info not_found(const subject& sub, mode) { + return error_info{ + error_code::mode_not_found, + fmt::format( + "Subject '{}' does not have subject-level mode configured.", sub())}; +} + inline error_info not_found(const subject&, schema_version id) { return error_info{ error_code::subject_version_not_found, @@ -156,4 +168,24 @@ inline error_info compatibility_not_found(const subject& sub) { sub())}; } +inline error_info mode_not_found(const subject& sub) { + return error_info{ + error_code::mode_not_found, + fmt::format( + "Subject '{}' does not have subject-level mode configured", sub())}; +} + +inline error_info mode_not_readwrite(const subject& sub) { + return error_info{ + error_code::subject_version_operaton_not_permitted, + fmt::format("Subject {} is not in read-write mode", sub())}; +} + +inline error_info mode_is_readonly(const std::optional& sub) { + return error_info{ + error_code::subject_version_operaton_not_permitted, + fmt::format( + "Subject {} is in read-only mode", sub.value_or(subject{"null"}))}; +} + } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/handlers.cc b/src/v/pandaproxy/schema_registry/handlers.cc index 54c89bebbc12..8187dde1c77f 100644 --- a/src/v/pandaproxy/schema_registry/handlers.cc +++ b/src/v/pandaproxy/schema_registry/handlers.cc @@ -21,6 +21,7 @@ #include "pandaproxy/schema_registry/requests/get_schemas_ids_id.h" #include "pandaproxy/schema_registry/requests/get_schemas_ids_id_versions.h" #include "pandaproxy/schema_registry/requests/get_subject_versions_version.h" +#include "pandaproxy/schema_registry/requests/mode.h" #include "pandaproxy/schema_registry/requests/post_subject_versions.h" #include "pandaproxy/schema_registry/types.h" #include "pandaproxy/server.h" @@ -182,6 +183,7 @@ delete_config_subject(server::request_t rq, server::reply_t rp) { // ensure we see latest writes co_await rq.service().writer().read_sync(); + co_await rq.service().writer().check_mutable(sub); compatibility_level lvl{}; try { @@ -206,9 +208,95 @@ ss::future get_mode(server::request_t rq, server::reply_t rp) { parse_accept_header(rq, rp); rq.req.reset(); - rp.rep->write_body("json", ss::sstring{R"({ - "mode": "READWRITE" -})"}); + // Ensure we see latest writes + co_await rq.service().writer().read_sync(); + + auto res = co_await rq.service().schema_store().get_mode(); + + auto json_rslt = ppj::rjson_serialize(mode_req_rep{.mode = res}); + rp.rep->write_body("json", json_rslt); + co_return rp; +} + +ss::future put_mode(server::request_t rq, server::reply_t rp) { + parse_content_type_header(rq); + parse_accept_header(rq, rp); + auto frc = parse::query_param>(*rq.req, "force") + .value_or(force::no); + auto res = ppj::rjson_parse(rq.req->content.data(), mode_handler<>{}); + rq.req.reset(); + + co_await rq.service().writer().write_mode(std::nullopt, res.mode, frc); + + auto json_rslt = ppj::rjson_serialize(res); + rp.rep->write_body("json", json_rslt); + co_return rp; +} + +ss::future +get_mode_subject(server::request_t rq, server::reply_t rp) { + parse_accept_header(rq, rp); + auto sub = parse::request_param(*rq.req, "subject"); + auto fallback = parse::query_param>( + *rq.req, "defaultToGlobal") + .value_or(default_to_global::no); + rq.req.reset(); + + // Ensure we see latest writes + co_await rq.service().writer().read_sync(); + + auto res = co_await rq.service().schema_store().get_mode(sub, fallback); + + auto json_rslt = ppj::rjson_serialize(mode_req_rep{.mode = res}); + rp.rep->write_body("json", json_rslt); + co_return rp; +} + +ss::future +put_mode_subject(server::request_t rq, server::reply_t rp) { + parse_content_type_header(rq); + parse_accept_header(rq, rp); + auto frc = parse::query_param>(*rq.req, "force") + .value_or(force::no); + auto sub = parse::request_param(*rq.req, "subject"); + auto res = ppj::rjson_parse(rq.req->content.data(), mode_handler<>{}); + rq.req.reset(); + + // Ensure we see latest writes + co_await rq.service().writer().read_sync(); + co_await rq.service().writer().write_mode(sub, res.mode, frc); + + auto json_rslt = ppj::rjson_serialize(res); + rp.rep->write_body("json", json_rslt); + co_return rp; +} + +ss::future +delete_mode_subject(server::request_t rq, server::reply_t rp) { + parse_accept_header(rq, rp); + auto sub = parse::request_param(*rq.req, "subject"); + + rq.req.reset(); + + // ensure we see latest writes + co_await rq.service().writer().read_sync(); + + mode m{}; + try { + m = co_await rq.service().schema_store().get_mode( + sub, default_to_global::no); + } catch (const exception& e) { + if (e.code() == error_code::mode_not_found) { + // Upstream compatibility: return 40401 instead of 40409 + throw as_exception(not_found(sub)); + } + throw; + } + + co_await rq.service().writer().delete_mode(sub); + + auto json_rslt = ppj::rjson_serialize(mode_req_rep{.mode = m}); + rp.rep->write_body("json", json_rslt); co_return rp; } diff --git a/src/v/pandaproxy/schema_registry/handlers.h b/src/v/pandaproxy/schema_registry/handlers.h index 9d16fdd7aa5b..876e7240fee1 100644 --- a/src/v/pandaproxy/schema_registry/handlers.h +++ b/src/v/pandaproxy/schema_registry/handlers.h @@ -37,6 +37,18 @@ ss::future::reply_t> delete_config_subject( ss::future::reply_t> get_mode(ctx_server::request_t rq, ctx_server::reply_t rp); +ss::future::reply_t> +put_mode(ctx_server::request_t rq, ctx_server::reply_t rp); + +ss::future::reply_t> get_mode_subject( + ctx_server::request_t rq, ctx_server::reply_t rp); + +ss::future::reply_t> put_mode_subject( + ctx_server::request_t rq, ctx_server::reply_t rp); + +ss::future::reply_t> delete_mode_subject( + ctx_server::request_t rq, ctx_server::reply_t rp); + ss::future::reply_t> get_schemas_types( ctx_server::request_t rq, ctx_server::reply_t rp); diff --git a/src/v/pandaproxy/schema_registry/requests/mode.h b/src/v/pandaproxy/schema_registry/requests/mode.h new file mode 100644 index 000000000000..d5159a623a74 --- /dev/null +++ b/src/v/pandaproxy/schema_registry/requests/mode.h @@ -0,0 +1,89 @@ +/* + * Copyright 2021 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "json/types.h" +#include "pandaproxy/json/rjson_parse.h" +#include "pandaproxy/json/rjson_util.h" +#include "pandaproxy/schema_registry/errors.h" +#include "pandaproxy/schema_registry/types.h" + +namespace pandaproxy::schema_registry { + +struct mode_req_rep { + static constexpr std::string_view field_name = "mode"; + mode mode{mode::read_write}; +}; + +template> +class mode_handler : public json::base_handler { + enum class state { + empty = 0, + object, + mode, + }; + state _state = state::empty; + +public: + using Ch = typename json::base_handler::Ch; + using rjson_parse_result = mode_req_rep; + rjson_parse_result result; + + explicit mode_handler() + : json::base_handler{json::serialization_format::none} + , result() {} + + bool Key(const Ch* str, ::json::SizeType len, bool) { + auto sv = std::string_view{str, len}; + if (_state == state::object && sv == mode_req_rep::field_name) { + _state = state::mode; + return true; + } + return false; + } + + bool String(const Ch* str, ::json::SizeType len, bool) { + auto sv = std::string_view{str, len}; + if (_state == state::mode) { + auto s = from_string_view(sv); + if (s.has_value() && s.value() != mode::import) { + result.mode = *s; + _state = state::object; + } else { + auto code = error_code::mode_invalid; + throw as_exception( + error_info{code, make_error_code(code).message()}); + } + return s.has_value(); + } + return false; + } + + bool StartObject() { + return std::exchange(_state, state::object) == state::empty; + } + + bool EndObject(::json::SizeType) { + return std::exchange(_state, state::empty) == state::object; + } +}; + +inline void rjson_serialize( + ::json::Writer<::json::StringBuffer>& w, + const schema_registry::mode_req_rep& res) { + w.StartObject(); + w.Key(mode_req_rep::field_name.data()); + ::json::rjson_serialize(w, to_string_view(res.mode)); + w.EndObject(); +} + +} // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/seq_writer.cc b/src/v/pandaproxy/schema_registry/seq_writer.cc index 0f010b3a1b6e..e5d66ab47bcd 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.cc +++ b/src/v/pandaproxy/schema_registry/seq_writer.cc @@ -13,6 +13,7 @@ #include "base/vlog.h" #include "kafka/client/client_fetch_batch_reader.h" #include "pandaproxy/logger.h" +#include "pandaproxy/schema_registry/error.h" #include "pandaproxy/schema_registry/errors.h" #include "pandaproxy/schema_registry/exceptions.h" #include "pandaproxy/schema_registry/sharded_store.h" @@ -82,8 +83,13 @@ struct batch_builder : public storage::record_batch_builder { auto key = config_key{.seq{s.seq}, .node{s.node}, .sub{sub}}; add_raw_kv(to_json_iobuf(std::move(key)), std::nullopt); } break; - default: + case seq_marker_key_type::mode: { + auto key = mode_key{.seq{s.seq}, .node{s.node}, .sub{sub}}; + add_raw_kv(to_json_iobuf(std::move(key)), std::nullopt); + } break; + case seq_marker_key_type::invalid: vassert(false, "Unknown key type"); + break; } } @@ -109,6 +115,15 @@ ss::future<> seq_writer::read_sync() { co_await wait_for(max_offset - model::offset{1}); } +ss::future<> seq_writer::check_mutable(std::optional const& sub) { + auto mode = sub ? co_await _store.get_mode(*sub, default_to_global::yes) + : co_await _store.get_mode(); + if (mode == mode::read_only) { + throw as_exception(mode_is_readonly(sub)); + } + co_return; +} + ss::future<> seq_writer::wait_for(model::offset offset) { return container().invoke_on(0, _smp_opts, [offset](seq_writer& seq) { if (auto waiters = seq._wait_for_sem.waiters(); waiters != 0) { @@ -197,6 +212,8 @@ void seq_writer::advance_offset_inner(model::offset offset) { ss::future> seq_writer::do_write_subject_version( subject_schema schema, model::offset write_at) { + co_await check_mutable(schema.schema.sub()); + // Check if store already contains this data: if // so, we do no I/O and return the schema ID. auto projected = co_await _store.project_ids(schema).handle_exception( @@ -261,6 +278,8 @@ ss::future> seq_writer::do_write_config( to_string_view(compat), write_at); + co_await check_mutable(sub); + try { // Check for no-op case compatibility_level existing; @@ -301,6 +320,8 @@ ss::future seq_writer::write_config( ss::future> seq_writer::do_delete_config(subject sub) { vlog(plog.debug, "delete config sub={}", sub); + co_await check_mutable(sub); + try { co_await _store.get_compatibility(sub, default_to_global::no); } catch (const exception&) { @@ -326,9 +347,83 @@ ss::future seq_writer::delete_config(subject sub) { }); } +ss::future> seq_writer::do_write_mode( + std::optional sub, mode m, force f, model::offset write_at) { + vlog( + plog.debug, + "write_mode sub={} mode={} force={} offset={}", + sub, + to_string_view(m), + f, + write_at); + + _store.check_mode_mutability(force::no); + + try { + // Check for no-op case + mode existing = sub ? co_await _store.get_mode( + sub.value(), default_to_global::no) + : co_await _store.get_mode(); + if (existing == m) { + co_return false; + } + } catch (const exception& e) { + if (e.code() != error_code::mode_not_found) { + throw; + } + } + + batch_builder rb(write_at, sub); + rb( + mode_key{.seq{write_at}, .node{_node_id}, .sub{sub}}, + mode_value{.mode = m}); + + if (co_await produce_and_apply(write_at, std::move(rb).build())) { + co_return true; + } else { + // Pass up a None, our caller's cue to retry + co_return std::nullopt; + } +} + +ss::future +seq_writer::write_mode(std::optional sub, mode mode, force f) { + return sequenced_write( + [sub{std::move(sub)}, mode, f](model::offset write_at, seq_writer& seq) { + return seq.do_write_mode(sub, mode, f, write_at); + }); +} + +ss::future> +seq_writer::do_delete_mode(subject sub, model::offset write_at) { + vlog(plog.debug, "delete mode sub={} offset={}", sub, write_at); + + // Report an error if the mode isn't registered + co_await _store.get_mode(sub, default_to_global::no); + _store.check_mode_mutability(force::no); + + batch_builder rb{write_at, sub}; + rb(co_await _store.get_subject_mode_written_at(sub)); + if (co_await produce_and_apply(std::nullopt, std::move(rb).build())) { + co_return true; + } else { + // Pass up a None, our caller's cue to retry + co_return std::nullopt; + } +} + +ss::future seq_writer::delete_mode(subject sub) { + return sequenced_write( + [sub{std::move(sub)}](model::offset write_at, seq_writer& seq) { + return seq.do_delete_mode(sub, write_at); + }); +} + /// Impermanent delete: update a version with is_deleted=true ss::future> seq_writer::do_delete_subject_version( subject sub, schema_version version, model::offset write_at) { + co_await check_mutable(sub); + if (co_await _store.is_referenced(sub, version)) { throw as_exception(has_references(sub, version)); } @@ -367,6 +462,8 @@ seq_writer::delete_subject_version(subject sub, schema_version version) { ss::future>> seq_writer::do_delete_subject_impermanent(subject sub, model::offset write_at) { + co_await check_mutable(sub); + // Grab the versions before they're gone. auto versions = co_await _store.get_versions(sub, include_deleted::no); @@ -391,10 +488,20 @@ seq_writer::do_delete_subject_impermanent(subject sub, model::offset write_at) { delete_subject_key{.seq{write_at}, .node{_node_id}, .sub{sub}}, delete_subject_value{.sub{sub}}); - auto conf = co_await ss::coroutine::as_future( - _store.get_subject_config_written_at(sub)); - if (!conf.failed()) { - rb(conf.get()); + try { + rb(co_await _store.get_subject_mode_written_at(sub)); + } catch (exception const& e) { + if (e.code() != error_code::subject_not_found) { + throw; + } + } + + try { + rb(co_await _store.get_subject_config_written_at(sub)); + } catch (exception const& e) { + if (e.code() != error_code::subject_not_found) { + throw; + } } if (co_await produce_and_apply(write_at, std::move(rb).build())) { @@ -435,6 +542,9 @@ seq_writer::delete_subject_permanent_inner( /// Check for whether our victim is already soft-deleted happens /// within these store functions (will throw a 404-equivalent if so) vlog(plog.debug, "delete_subject_permanent sub={}", sub); + + co_await check_mutable(sub); + if (version.has_value()) { // Check version first to see if the version exists sequences = co_await _store.get_subject_version_written_at( diff --git a/src/v/pandaproxy/schema_registry/seq_writer.h b/src/v/pandaproxy/schema_registry/seq_writer.h index 35dd09214f28..7de53e6b5f65 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.h +++ b/src/v/pandaproxy/schema_registry/seq_writer.h @@ -40,6 +40,9 @@ class seq_writer final : public ss::peering_sharded_service { ss::future<> read_sync(); + // Throws 42205 if the subject cannot be modified + ss::future<> check_mutable(std::optional const& sub); + // API for readers: notify us when they have read and applied an offset ss::future<> advance_offset(model::offset offset); @@ -50,6 +53,10 @@ class seq_writer final : public ss::peering_sharded_service { ss::future delete_config(subject sub); + ss::future write_mode(std::optional sub, mode m, force f); + + ss::future delete_mode(subject sub); + ss::future delete_subject_version(subject sub, schema_version version); @@ -79,6 +86,12 @@ class seq_writer final : public ss::peering_sharded_service { ss::future> do_delete_config(subject sub); + ss::future> do_write_mode( + std::optional sub, mode m, force f, model::offset write_at); + + ss::future> + do_delete_mode(subject sub, model::offset write_at); + ss::future> do_delete_subject_version( subject sub, schema_version version, model::offset write_at); diff --git a/src/v/pandaproxy/schema_registry/service.cc b/src/v/pandaproxy/schema_registry/service.cc index 6b7e3e7b9840..d04b215e9ed9 100644 --- a/src/v/pandaproxy/schema_registry/service.cc +++ b/src/v/pandaproxy/schema_registry/service.cc @@ -55,9 +55,10 @@ const security::acl_principal principal{ class wrap { public: - wrap(ss::gate& g, one_shot& os, server::function_handler h) + wrap(ss::gate& g, one_shot& os, auth_level lvl, server::function_handler h) : _g{g} , _os{os} + , _auth_level(lvl) , _h{std::move(h)} {} ss::future @@ -66,8 +67,11 @@ class wrap { rq.service().config().schema_registry_api.value(), rq.req->get_listener_idx()); try { - rq.user = maybe_authenticate_request( - rq.authn_method, rq.service().authenticator(), *rq.req); + rq.user = maybe_authorize_request( + rq.authn_method, + _auth_level, + rq.service().authenticator(), + *rq.req); } catch (unauthorized_user_exception& e) { audit_authn_failure(rq, e.get_username(), e.what()); throw; @@ -190,6 +194,7 @@ class wrap { private: ss::gate& _g; one_shot& _os; + auth_level _auth_level; server::function_handler _h; }; @@ -198,91 +203,118 @@ server::routes_t get_schema_registry_routes(ss::gate& gate, one_shot& es) { routes.api = ss::httpd::schema_registry_json::name; routes.routes.emplace_back(server::route_t{ - ss::httpd::schema_registry_json::get_config, wrap(gate, es, get_config)}); + ss::httpd::schema_registry_json::get_config, + wrap(gate, es, auth_level::user, get_config)}); routes.routes.emplace_back(server::route_t{ - ss::httpd::schema_registry_json::put_config, wrap(gate, es, put_config)}); + ss::httpd::schema_registry_json::put_config, + wrap(gate, es, auth_level::user, put_config)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::get_config_subject, - wrap(gate, es, get_config_subject)}); + wrap(gate, es, auth_level::user, get_config_subject)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::put_config_subject, - wrap(gate, es, put_config_subject)}); + wrap(gate, es, auth_level::user, put_config_subject)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::delete_config_subject, - wrap(gate, es, delete_config_subject)}); + wrap(gate, es, auth_level::user, delete_config_subject)}); routes.routes.emplace_back(server::route_t{ - ss::httpd::schema_registry_json::get_mode, wrap(gate, es, get_mode)}); + ss::httpd::schema_registry_json::get_mode, + wrap(gate, es, auth_level::user, get_mode)}); + + routes.routes.emplace_back(server::route_t{ + ss::httpd::schema_registry_json::put_mode, + wrap(gate, es, auth_level::superuser, put_mode)}); + + routes.routes.emplace_back(server::route_t{ + ss::httpd::schema_registry_json::get_mode_subject, + wrap(gate, es, auth_level::user, get_mode_subject)}); + + routes.routes.emplace_back(server::route_t{ + ss::httpd::schema_registry_json::put_mode_subject, + wrap(gate, es, auth_level::superuser, put_mode_subject)}); + + routes.routes.emplace_back(server::route_t{ + ss::httpd::schema_registry_json::delete_mode_subject, + wrap(gate, es, auth_level::superuser, delete_mode_subject)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::get_schemas_types, - wrap(gate, es, get_schemas_types)}); + wrap(gate, es, auth_level::publik, get_schemas_types)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::get_schemas_ids_id, - wrap(gate, es, get_schemas_ids_id)}); + wrap(gate, es, auth_level::user, get_schemas_ids_id)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::get_schemas_ids_id_versions, - wrap(gate, es, get_schemas_ids_id_versions)}); + wrap(gate, es, auth_level::user, get_schemas_ids_id_versions)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::get_schemas_ids_id_subjects, - wrap(gate, es, get_schemas_ids_id_subjects)}); + wrap(gate, es, auth_level::user, get_schemas_ids_id_subjects)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::get_subjects, - wrap(gate, es, get_subjects)}); + wrap(gate, es, auth_level::user, get_subjects)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::get_subject_versions, - wrap(gate, es, get_subject_versions)}); + wrap(gate, es, auth_level::user, get_subject_versions)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::post_subject, - wrap(gate, es, post_subject)}); + wrap(gate, es, auth_level::user, post_subject)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::post_subject_versions, - wrap(gate, es, post_subject_versions)}); + wrap(gate, es, auth_level::user, post_subject_versions)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::get_subject_versions_version, - wrap(gate, es, get_subject_versions_version)}); + wrap(gate, es, auth_level::user, get_subject_versions_version)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::get_subject_versions_version_schema, - wrap(gate, es, get_subject_versions_version_schema)}); + wrap(gate, es, auth_level::user, get_subject_versions_version_schema)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json:: get_subject_versions_version_referenced_by, - wrap(gate, es, get_subject_versions_version_referenced_by)}); + wrap( + gate, + es, + auth_level::user, + get_subject_versions_version_referenced_by)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json:: get_subject_versions_version_referenced_by_deprecated, - wrap(gate, es, get_subject_versions_version_referenced_by)}); + wrap( + gate, + es, + auth_level::user, + get_subject_versions_version_referenced_by)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::delete_subject, - wrap(gate, es, delete_subject)}); + wrap(gate, es, auth_level::user, delete_subject)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::delete_subject_version, - wrap(gate, es, delete_subject_version)}); + wrap(gate, es, auth_level::user, delete_subject_version)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::compatibility_subject_version, - wrap(gate, es, compatibility_subject_version)}); + wrap(gate, es, auth_level::user, compatibility_subject_version)}); routes.routes.emplace_back(server::route_t{ ss::httpd::schema_registry_json::schema_registry_status_ready, - wrap(gate, es, status_ready)}); + wrap(gate, es, auth_level::publik, status_ready)}); return routes; } diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 9ee3c3481218..bb94277f1f97 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -66,9 +66,9 @@ constexpr auto set_accumulator = } // namespace -ss::future<> sharded_store::start(ss::smp_service_group sg) { +ss::future<> sharded_store::start(is_mutable mut, ss::smp_service_group sg) { _smp_opts = ss::smp_submit_to_options{sg}; - return _store.start(); + return _store.start(mut); } ss::future<> sharded_store::stop() { return _store.stop(); } @@ -364,6 +364,11 @@ sharded_store::get_subjects(include_deleted inc_del) { co_return co_await _store.map_reduce0(map, subjects{}, reduce); } +ss::future sharded_store::has_subjects(include_deleted inc_del) { + auto map = [inc_del](store& s) { return s.has_subjects(inc_del); }; + return _store.map_reduce0(map, false, std::logical_or<>{}); +} + ss::future> sharded_store::get_versions(subject sub, include_deleted inc_del) { auto sub_shard{shard_for(sub)}; @@ -474,6 +479,15 @@ sharded_store::get_subject_config_written_at(subject sub) { }); } +ss::future> +sharded_store::get_subject_mode_written_at(subject sub) { + auto sub_shard{shard_for(sub)}; + co_return co_await _store.invoke_on( + sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) { + return s.store::get_subject_mode_written_at(sub).value(); + }); +} + ss::future> sharded_store::get_subject_version_written_at(subject sub, schema_version ver) { auto sub_shard{shard_for(sub)}; @@ -492,6 +506,43 @@ ss::future sharded_store::delete_subject_version( }); } +ss::future sharded_store::get_mode() { + co_return _store.local().get_mode().value(); +} + +ss::future +sharded_store::get_mode(subject sub, default_to_global fallback) { + auto sub_shard{shard_for(sub)}; + co_return co_await _store.invoke_on( + sub_shard, [sub{std::move(sub)}, fallback](store& s) { + return s.get_mode(sub, fallback).value(); + }); +} + +ss::future sharded_store::set_mode(mode m, force f) { + auto map = [m, f](store& s) { return s.set_mode(m, f).value(); }; + auto reduce = std::logical_and<>{}; + co_return co_await _store.map_reduce0(map, true, reduce); +} + +ss::future +sharded_store::set_mode(seq_marker marker, subject sub, mode m, force f) { + auto sub_shard{shard_for(sub)}; + co_return co_await _store.invoke_on( + sub_shard, _smp_opts, [marker, sub{std::move(sub)}, m, f](store& s) { + return s.set_mode(marker, sub, m, f).value(); + }); +} + +ss::future +sharded_store::clear_mode(seq_marker marker, subject sub, force f) { + auto sub_shard{shard_for(sub)}; + co_return co_await _store.invoke_on( + sub_shard, _smp_opts, [marker, sub{std::move(sub)}, f](store& s) { + return s.clear_mode(marker, sub, f).value(); + }); +} + ss::future sharded_store::get_compatibility() { co_return _store.local().get_compatibility().value(); } @@ -678,6 +729,10 @@ ss::future sharded_store::is_compatible( co_return is_compat; } +void sharded_store::check_mode_mutability(force f) const { + _store.local().check_mode_mutability(f).value(); +} + ss::future sharded_store::has_version( const subject& sub, schema_id id, include_deleted i) { auto sub_shard{shard_for(sub)}; diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index 16b236f2fcf4..ed4b7b57d4d7 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -24,7 +24,7 @@ class store; /// subject or schema_id class sharded_store { public: - ss::future<> start(ss::smp_service_group sg); + ss::future<> start(is_mutable mut, ss::smp_service_group sg); ss::future<> stop(); ///\brief Make the canonical form of the schema @@ -87,6 +87,9 @@ class sharded_store { ///\brief Return a list of subjects. ss::future> get_subjects(include_deleted inc_del); + ///\brief Return whether there are any subjects. + ss::future has_subjects(include_deleted inc_del); + ///\brief Return a list of versions and associated schema_id. ss::future> get_versions(subject sub, include_deleted inc_del); @@ -115,6 +118,11 @@ class sharded_store { ss::future> get_subject_config_written_at(subject sub); + ///\brief Get sequence number history of subject mode. Subject need + /// not be soft-deleted first + ss::future> + get_subject_mode_written_at(subject sub); + ///\brief Get sequence number history (errors out if not soft-deleted) ss::future> get_subject_version_written_at(subject sub, schema_version version); @@ -124,6 +132,24 @@ class sharded_store { ss::future delete_subject_version( subject sub, schema_version version, force f = force::no); + ///\brief Get the global mode. + ss::future get_mode(); + + ///\brief Get the mode for a subject, or fallback to global. + ss::future get_mode(subject sub, default_to_global fallback); + + ///\brief Set the global mode. + /// \param force Override checks, always apply action + ss::future set_mode(mode m, force f); + + ///\brief Set the mode for a subject. + /// \param force Override checks, always apply action + ss::future set_mode(seq_marker marker, subject sub, mode m, force f); + + ///\brief Clear the mode for a subject. + /// \param force Override checks, always apply action + ss::future clear_mode(seq_marker marker, subject sub, force f); + ///\brief Get the global compatibility level. ss::future get_compatibility(); @@ -151,6 +177,9 @@ class sharded_store { ss::future has_version(const subject&, schema_id, include_deleted); + //// \brief Throw if the store is not mutable + void check_mode_mutability(force f) const; + private: ss::future upsert_schema(schema_id id, canonical_schema_definition def); diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index 09d6f36168dd..cdc60bf16833 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -36,7 +36,8 @@ namespace pandaproxy::schema_registry { using topic_key_magic = named_type; -enum class topic_key_type { noop = 0, schema, config, delete_subject }; +enum class topic_key_type { noop = 0, schema, config, mode, delete_subject }; + constexpr std::string_view to_string_view(topic_key_type kt) { switch (kt) { case topic_key_type::noop: @@ -45,6 +46,8 @@ constexpr std::string_view to_string_view(topic_key_type kt) { return "SCHEMA"; case topic_key_type::config: return "CONFIG"; + case topic_key_type::mode: + return "MODE"; case topic_key_type::delete_subject: return "DELETE_SUBJECT"; } @@ -57,6 +60,7 @@ from_string_view(std::string_view sv) { .match(to_string_view(topic_key_type::noop), topic_key_type::noop) .match(to_string_view(topic_key_type::schema), topic_key_type::schema) .match(to_string_view(topic_key_type::config), topic_key_type::config) + .match(to_string_view(topic_key_type::mode), topic_key_type::mode) .match( to_string_view(topic_key_type::delete_subject), topic_key_type::delete_subject) @@ -846,6 +850,241 @@ class config_value_handler : public json::base_handler { } }; +struct mode_key { + static constexpr topic_key_type keytype{topic_key_type::mode}; + std::optional seq; + std::optional node; + std::optional sub; + topic_key_magic magic{0}; + + friend bool operator==(const mode_key&, const mode_key&) = default; + + friend std::ostream& operator<<(std::ostream& os, const mode_key& v) { + if (v.seq.has_value() && v.node.has_value()) { + fmt::print( + os, + "seq: {} node: {} keytype: {}, subject: {}, magic: {}", + *v.seq, + *v.node, + to_string_view(v.keytype), + v.sub.value_or(invalid_subject), + v.magic); + } else { + fmt::print( + os, + "unsequenced keytype: {}, subject: {}, magic: {}", + to_string_view(v.keytype), + v.sub.value_or(invalid_subject), + v.magic); + } + return os; + } +}; + +inline void rjson_serialize( + ::json::Writer<::json::StringBuffer>& w, + const schema_registry::mode_key& key) { + w.StartObject(); + w.Key("keytype"); + ::json::rjson_serialize(w, to_string_view(key.keytype)); + w.Key("subject"); + if (key.sub) { + ::json::rjson_serialize(w, key.sub.value()); + } else { + w.Null(); + } + w.Key("magic"); + ::json::rjson_serialize(w, key.magic); + if (key.seq.has_value()) { + w.Key("seq"); + ::json::rjson_serialize(w, *key.seq); + } + if (key.node.has_value()) { + w.Key("node"); + ::json::rjson_serialize(w, *key.node); + } + w.EndObject(); +} + +template> +class mode_key_handler : public json::base_handler { + enum class state { + empty = 0, + object, + keytype, + seq, + node, + subject, + magic, + }; + state _state = state::empty; + +public: + using Ch = typename json::base_handler::Ch; + using rjson_parse_result = mode_key; + rjson_parse_result result; + + mode_key_handler() + : json::base_handler{json::serialization_format::none} {} + + bool Key(const Ch* str, ::json::SizeType len, bool) { + auto sv = std::string_view{str, len}; + std::optional s{string_switch>(sv) + .match("keytype", state::keytype) + .match("seq", state::seq) + .match("node", state::node) + .match("subject", state::subject) + .match("magic", state::magic) + .default_match(std::nullopt)}; + return s.has_value() && std::exchange(_state, *s) == state::object; + } + + bool Uint(int i) { + switch (_state) { + case state::magic: { + result.magic = topic_key_magic{i}; + _state = state::object; + return true; + } + case state::seq: { + result.seq = model::offset{i}; + _state = state::object; + return true; + } + case state::node: { + result.node = model::node_id{i}; + _state = state::object; + return true; + } + case state::empty: + case state::subject: + case state::keytype: + case state::object: + return false; + } + return false; + } + + bool String(const Ch* str, ::json::SizeType len, bool) { + auto sv = std::string_view{str, len}; + switch (_state) { + case state::keytype: { + auto kt = from_string_view(sv); + _state = state::object; + return kt == result.keytype; + } + case state::subject: { + result.sub = subject{ss::sstring{sv}}; + _state = state::object; + return true; + } + case state::empty: + case state::seq: + case state::node: + case state::object: + case state::magic: + return false; + } + return false; + } + + bool Null() { + // The subject, and only the subject, is nullable. + return std::exchange(_state, state::object) == state::subject; + } + + bool StartObject() { + return std::exchange(_state, state::object) == state::empty; + } + + bool EndObject(::json::SizeType) { + return result.seq.has_value() == result.node.has_value() + && std::exchange(_state, state::empty) == state::object; + } +}; + +struct mode_value { + mode mode{mode::read_write}; + std::optional sub; + + friend bool operator==(const mode_value&, const mode_value&) = default; + + friend std::ostream& operator<<(std::ostream& os, const mode_value& v) { + if (v.sub.has_value()) { + fmt::print(os, "subject: {}, ", v.sub.value()); + } + fmt::print(os, "mode: {}", to_string_view(v.mode)); + + return os; + } +}; + +inline void rjson_serialize( + ::json::Writer<::json::StringBuffer>& w, + const schema_registry::mode_value& val) { + w.StartObject(); + if (val.sub.has_value()) { + w.Key("subject"); + ::json::rjson_serialize(w, val.sub.value()); + } + w.Key("mode"); + ::json::rjson_serialize(w, to_string_view(val.mode)); + w.EndObject(); +} + +template> +class mode_value_handler : public json::base_handler { + enum class state { + empty = 0, + object, + mode, + subject, + }; + state _state = state::empty; + +public: + using Ch = typename json::base_handler::Ch; + using rjson_parse_result = mode_value; + rjson_parse_result result; + + mode_value_handler() + : json::base_handler{json::serialization_format::none} {} + + bool Key(const Ch* str, ::json::SizeType len, bool) { + auto sv = std::string_view{str, len}; + std::optional s{string_switch>(sv) + .match("mode", state::mode) + .match("subject", state::subject) + .default_match(std::nullopt)}; + return s.has_value() && std::exchange(_state, *s) == state::object; + } + + bool String(const Ch* str, ::json::SizeType len, bool) { + auto sv = std::string_view{str, len}; + if (_state == state::mode) { + auto s = from_string_view(sv); + if (s.has_value()) { + result.mode = *s; + _state = state::object; + } + return s.has_value(); + } else if (_state == state::subject) { + result.sub.emplace(sv); + _state = state::object; + return true; + } + return false; + } + + bool StartObject() { + return std::exchange(_state, state::object) == state::empty; + } + + bool EndObject(::json::SizeType) { + return std::exchange(_state, state::empty) == state::object; + } +}; + struct delete_subject_key { static constexpr topic_key_type keytype{topic_key_type::delete_subject}; std::optional seq; @@ -1188,7 +1427,18 @@ struct consume_to_store { val); break; } - case topic_key_type::delete_subject: + case topic_key_type::mode: { + std::optional val; + if (!record.value().empty()) { + auto value = record.release_value(); + val.emplace( + from_json_iobuf>(std::move(value))); + } + co_await apply( + offset, from_json_iobuf>(std::move(key)), val); + break; + } + case topic_key_type::delete_subject: { std::optional val; if (!record.value().empty()) { val.emplace(from_json_iobuf>( @@ -1201,6 +1451,7 @@ struct consume_to_store { std::move(val)); break; } + } co_await _sequencer.advance_offset(offset); } @@ -1331,6 +1582,61 @@ struct consume_to_store { } } + ss::future<> + apply(model::offset offset, mode_key key, std::optional val) { + // Drop out-of-sequence messages + // + // Check seq if it was provided, otherwise assume 3rdparty + // compatibility, which can't collide. + if (val && key.seq.has_value() && offset != key.seq) { + vlog( + plog.debug, + "Ignoring out of order {} (at offset {})", + key, + offset); + co_return; + } + + if (key.magic != 0) { + throw exception( + error_code::topic_parse_error, + fmt::format("Unexpected magic: {}", key)); + } + try { + vlog(plog.debug, "Applying: {}", key); + if (key.sub.has_value()) { + if (!val.has_value()) { + co_await _store.clear_mode( + seq_marker{ + .seq = key.seq, + .node = key.node, + .version{invalid_schema_version}, // Not applicable + .key_type = seq_marker_key_type::mode}, + *key.sub, + force::yes); + } else { + co_await _store.set_mode( + seq_marker{ + .seq = key.seq, + .node = key.node, + .version{invalid_schema_version}, // Not applicable + .key_type = seq_marker_key_type::mode}, + *key.sub, + val->mode, + force::yes); + } + } else if (val.has_value()) { + co_await _store.set_mode(val->mode, force::yes); + } else { + vlog( + plog.warn, + "Tried to apply mode with neither subject nor value"); + } + } catch (const exception& e) { + vlog(plog.debug, "Error replaying: {}: {}", key, e); + } + } + ss::future<> apply( model::offset offset, delete_subject_key key, @@ -1379,6 +1685,7 @@ struct consume_to_store { vlog(plog.debug, "Error replaying: {}: {}", key, e); } } + void end_of_stream() {} sharded_store& _store; seq_writer& _sequencer; diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index 2c9262f91b0f..906f1c19937e 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -58,6 +58,11 @@ class store { public: using schema_id_set = absl::btree_set; + explicit store() = default; + + explicit store(is_mutable mut) + : _mutable(mut) {} + struct insert_result { schema_version version; schema_id id; @@ -185,6 +190,15 @@ class store { return res; } + ///\brief Return if there are subjects. + bool has_subjects(include_deleted inc_del) const { + return absl::c_any_of(_subjects, [inc_del](auto const& sub) { + return absl::c_any_of( + sub.second.versions, + [inc_del](auto const& v) { return inc_del || !v.deleted; }); + }); + } + ///\brief Return a list of versions and associated schema_id. result> get_versions(const subject& sub, include_deleted inc_del) const { @@ -273,6 +287,34 @@ class store { return result; } + /// \brief Return the seq_marker write history of a subject, but only + /// mode_keys + /// + /// \return A vector (possibly empty) + result> + get_subject_mode_written_at(const subject& sub) const { + auto sub_it = BOOST_OUTCOME_TRYX( + get_subject_iter(sub, include_deleted::yes)); + + // This should never happen (how can a record get into the + // store without an originating sequenced record?), but return + // an error instead of vasserting out. + if (sub_it->second.written_at.empty()) { + return not_found(sub); + } + + std::vector result; + std::copy_if( + sub_it->second.written_at.begin(), + sub_it->second.written_at.end(), + std::back_inserter(result), + [](const auto& sm) { + return sm.key_type == seq_marker_key_type::mode; + }); + + return result; + } + /// \brief Return the seq_marker write history of a version. /// /// \return A vector with at least one element @@ -463,6 +505,46 @@ class store { return true; } + ///\brief Get the global mode. + result get_mode() const { return _mode; } + + ///\brief Get the mode for a subject, or fallback to global. + result + get_mode(const subject& sub, default_to_global fallback) const { + auto sub_it = get_subject_iter(sub, include_deleted::yes); + if (sub_it && (sub_it.assume_value())->second.mode.has_value()) { + return (sub_it.assume_value())->second.mode.value(); + } else if (fallback) { + return _mode; + } + return mode_not_found(sub); + } + + ///\brief Set the global mode. + result set_mode(mode m, force f) { + BOOST_OUTCOME_TRYX(check_mode_mutability(f)); + return std::exchange(_mode, m) != m; + } + + ///\brief Set the mode for a subject. + result + set_mode(seq_marker marker, const subject& sub, mode m, force f) { + BOOST_OUTCOME_TRYX(check_mode_mutability(f)); + auto& sub_entry = _subjects[sub]; + sub_entry.written_at.push_back(marker); + return std::exchange(sub_entry.mode, m) != m; + } + + ///\brief Clear the mode for a subject. + result + clear_mode(const seq_marker& marker, const subject& sub, force f) { + BOOST_OUTCOME_TRYX(check_mode_mutability(f)); + auto sub_it = BOOST_OUTCOME_TRYX( + get_subject_iter(sub, include_deleted::yes)); + std::erase(sub_it->second.written_at, marker); + return std::exchange(sub_it->second.mode, std::nullopt) != std::nullopt; + } + ///\brief Get the global compatibility level. result get_compatibility() const { return _compatibility; @@ -598,6 +680,16 @@ class store { return !found; } + //// \brief Return error if the store is not mutable + result check_mode_mutability(force f) const { + if (!_mutable && !f) { + return error_info{ + error_code::subject_version_operaton_not_permitted, + "Mode changes are not allowed"}; + } + return outcome::success(); + } + private: struct schema_entry { explicit schema_entry(canonical_schema_definition definition) @@ -608,6 +700,7 @@ class store { struct subject_entry { std::optional compatibility; + std::optional mode; std::vector versions; is_deleted deleted{false}; @@ -672,6 +765,8 @@ class store { schema_map _schemas; subject_map _subjects; compatibility_level _compatibility{compatibility_level::backward}; + mode _mode{mode::read_write}; + is_mutable _mutable{is_mutable::no}; }; } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc b/src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc index 060882aa5e78..e66acb390e98 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc @@ -22,21 +22,33 @@ #include #include -#include +#include #include namespace pps = pandaproxy::schema_registry; +namespace pandaproxy::schema_registry { + +std::ostream& operator<<(std::ostream& os, mode m) { + return os << to_string_view(m); +} + +} // namespace pandaproxy::schema_registry + inline model::record_batch make_record_batch( - std::string_view key, std::string_view val, model::offset base_offset) { + std::string_view key, + std::optional val, + model::offset base_offset) { storage::record_batch_builder rb{ model::record_batch_type::raft_data, base_offset}; iobuf key_buf; - iobuf val_buf; + std::optional val_buf; key_buf.append(key.data(), key.size()); - val_buf.append(val.data(), val.size()); - + if (val) { + val_buf = iobuf(); + val_buf->append(val->data(), val->size()); + } rb.add_raw_kv(std::move(key_buf), std::move(val_buf)); return std::move(rb).build(); } @@ -46,6 +58,12 @@ constexpr std::string_view config_key_0{ constexpr std::string_view config_value_0{ R"({"compatibilityLevel":"BACKWARD"})"}; +constexpr std::string_view mode_key_0{R"({"keytype":"MODE","magic":0})"}; +constexpr std::string_view mode_key_sub_0{ + R"({"keytype":"MODE","subject":"subject_0","magic":0})"}; +constexpr std::string_view mode_value_rw{R"({"mode":"READWRITE"})"}; +constexpr std::string_view mode_value_ro{R"({"mode":"READONLY"})"}; + constexpr std::string_view schema_key_0{ R"({"keytype":"SCHEMA","subject":"subject_0","version":1,"magic":1})"}; constexpr std::string_view schema_value_0{ @@ -58,7 +76,7 @@ constexpr std::string_view del_sub_value_0{ SEASTAR_THREAD_TEST_CASE(test_consume_to_store_3rdparty) { pps::sharded_store s; - s.start(ss::default_smp_service_group()).get(); + s.start(pps::is_mutable::yes, ss::default_smp_service_group()).get(); auto stop_store = ss::defer([&s]() { s.stop().get(); }); // This kafka client will not be used by the sequencer @@ -112,4 +130,75 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store_3rdparty) { [](const pps::exception& e) { return e.code() == pps::error_code::subject_not_found; }); + + // perm delete sub version + BOOST_REQUIRE_NO_THROW( + c(make_record_batch(schema_key_0, std::nullopt, base_offset++)).get()); + + // Test mode default + BOOST_REQUIRE_EQUAL(c._store.get_mode().get(), pps::mode::read_write); + + // Test mode READONLY + BOOST_REQUIRE_NO_THROW( + c(make_record_batch(mode_key_0, mode_value_ro, base_offset++)).get()); + BOOST_REQUIRE_EQUAL(c._store.get_mode().get(), pps::mode::read_only); + + // Test mode no subject, no fallback + BOOST_REQUIRE_EXCEPTION( + c._store.get_mode(pps::subject{"subject_0"}, pps::default_to_global::no) + .get(), + pps::exception, + [](const pps::exception& e) { + return e.code() == pps::error_code::mode_not_found; + }); + + // Test mode no subject, with fallback + BOOST_REQUIRE_EQUAL( + c._store.get_mode(pps::subject{"subject_0"}, pps::default_to_global::yes) + .get(), + pps ::mode::read_only); + + // test mode READWRITE + BOOST_REQUIRE_NO_THROW( + c(make_record_batch(mode_key_0, mode_value_rw, base_offset++)).get()); + BOOST_REQUIRE_EQUAL(c._store.get_mode().get(), pps::mode::read_write); + + // test mode subject override + BOOST_REQUIRE_NO_THROW( + c(make_record_batch(mode_key_sub_0, mode_value_ro, base_offset++)).get()); + BOOST_REQUIRE_EQUAL( + c._store.get_mode(pps::subject{"subject_0"}, pps::default_to_global::no) + .get(), + pps::mode::read_only); + + // test subject is not found + BOOST_REQUIRE_EXCEPTION( + c._store.get_versions(pps::subject{"subject_0"}, pps::include_deleted::no) + .get(), + pps::exception, + [](const pps::exception& e) { + return e.code() == pps::error_code::subject_not_found; + }); + + // test subject is not found + BOOST_REQUIRE_EXCEPTION( + c._store + .get_versions(pps::subject{"subject_0"}, pps::include_deleted::yes) + .get(), + pps::exception, + [](const pps::exception& e) { + return e.code() == pps::error_code::subject_not_found; + }); + + // clear mode subject override + c(make_record_batch(mode_key_sub_0, std::nullopt, base_offset++)).get(); + BOOST_REQUIRE_EXCEPTION( + c._store.get_mode(pps::subject{"subject_0"}, pps::default_to_global::no) + .get(), + pps::exception, + [](const pps::exception& e) { + return e.code() == pps::error_code::mode_not_found; + }); + + // Add a subject version } diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc b/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc index 13166cfc5315..ac5e5ceb79e3 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc @@ -26,7 +26,8 @@ namespace pps = pp::schema_registry; struct simple_sharded_store { simple_sharded_store() { - store.start(ss::default_smp_service_group()).get(); + store.start(pps::is_mutable::yes, ss::default_smp_service_group()) + .get(); } ~simple_sharded_store() { store.stop().get(); } simple_sharded_store(const simple_sharded_store&) = delete; diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_store.cc b/src/v/pandaproxy/schema_registry/test/compatibility_store.cc index f62f911cde3d..25f5ebc594fc 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_store.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_store.cc @@ -24,7 +24,7 @@ SEASTAR_THREAD_TEST_CASE(test_avro_basic_backwards_store_compat) { // used to read the data written in the previous schema. pps::sharded_store s; - s.start(ss::default_smp_service_group()).get(); + s.start(pps::is_mutable::yes, ss::default_smp_service_group()).get(); auto stop_store = ss::defer([&s]() { s.stop().get(); }); pps::seq_marker dummy_marker; diff --git a/src/v/pandaproxy/schema_registry/test/consume_to_store.cc b/src/v/pandaproxy/schema_registry/test/consume_to_store.cc index 3c5e3e4ffa3f..8d351efdff19 100644 --- a/src/v/pandaproxy/schema_registry/test/consume_to_store.cc +++ b/src/v/pandaproxy/schema_registry/test/consume_to_store.cc @@ -83,7 +83,7 @@ inline model::record_batch make_delete_subject_permanently_batch( SEASTAR_THREAD_TEST_CASE(test_consume_to_store) { pps::sharded_store s; - s.start(ss::default_smp_service_group()).get(); + s.start(pps::is_mutable::yes, ss::default_smp_service_group()).get(); auto stop_store = ss::defer([&s]() { s.stop().get(); }); // This kafka client will not be used by the sequencer @@ -202,7 +202,7 @@ model::record_batch as_record_batch(Key key) { SEASTAR_THREAD_TEST_CASE(test_consume_to_store_after_compaction) { pps::sharded_store s; - s.start(ss::default_smp_service_group()).get(); + s.start(pps::is_mutable::no, ss::default_smp_service_group()).get(); auto stop_store = ss::defer([&s]() { s.stop().get(); }); // This kafka client will not be used by the sequencer diff --git a/src/v/pandaproxy/schema_registry/test/sharded_store.cc b/src/v/pandaproxy/schema_registry/test/sharded_store.cc index 2c2dbc7a67c2..7621cc1e0077 100644 --- a/src/v/pandaproxy/schema_registry/test/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/test/sharded_store.cc @@ -25,7 +25,7 @@ namespace pps = pp::schema_registry; SEASTAR_THREAD_TEST_CASE(test_sharded_store_referenced_by) { pps::sharded_store store; - store.start(ss::default_smp_service_group()).get(); + store.start(pps::is_mutable::yes, ss::default_smp_service_group()).get(); auto stop_store = ss::defer([&store]() { store.stop().get(); }); const pps::schema_version ver1{1}; @@ -97,7 +97,7 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_referenced_by) { SEASTAR_THREAD_TEST_CASE(test_sharded_store_find_unordered) { pps::sharded_store store; - store.start(ss::default_smp_service_group()).get(); + store.start(pps::is_mutable::no, ss::default_smp_service_group()).get(); auto stop_store = ss::defer([&store]() { store.stop().get(); }); pps::unparsed_schema array_unsanitized{ diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index 9d53e4bd1881..f1655311288a 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -28,6 +28,7 @@ namespace pandaproxy::schema_registry { +using is_mutable = ss::bool_class; using permanent_delete = ss::bool_class; using include_deleted = ss::bool_class; using is_deleted = ss::bool_class; @@ -38,6 +39,28 @@ template std::enable_if_t, std::optional> from_string_view(std::string_view); +enum class mode { import = 0, read_only, read_write }; + +constexpr std::string_view to_string_view(mode e) { + switch (e) { + case mode::import: + return "IMPORT"; + case mode::read_only: + return "READONLY"; + case mode::read_write: + return "READWRITE"; + } + return "{invalid}"; +} +template<> +constexpr std::optional from_string_view(std::string_view sv) { + return string_switch>(sv) + .match(to_string_view(mode::import), mode::import) + .match(to_string_view(mode::read_only), mode::read_only) + .match(to_string_view(mode::read_write), mode::read_write) + .default_match(std::nullopt); +} + enum class schema_type { avro = 0, json, protobuf }; constexpr std::string_view to_string_view(schema_type e) { @@ -289,22 +312,28 @@ struct subject_version { }; // Very similar to topic_key_type, separate to avoid intermingling storage code -enum class seq_marker_key_type { invalid = 0, schema, delete_subject, config }; +enum class seq_marker_key_type { + invalid = 0, + schema, + delete_subject, + config, + mode +}; constexpr std::string_view to_string_view(seq_marker_key_type v) { switch (v) { case seq_marker_key_type::schema: return "schema"; - break; case seq_marker_key_type::delete_subject: return "delete_subject"; - break; case seq_marker_key_type::config: return "config"; + case seq_marker_key_type::mode: + return "mode"; + case seq_marker_key_type::invalid: break; - default: - return "invalid"; } + return "invalid"; } // Record the sequence+node where updates were made to a subject, diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index 03f0751f882c..652c3929bb7f 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -861,6 +861,8 @@ class SchemaRegistryConfig(TlsConfig): SR_TLS_CLIENT_KEY_FILE = "/etc/redpanda/sr_client.key" SR_TLS_CLIENT_CRT_FILE = "/etc/redpanda/sr_client.crt" + mode_mutability = False + def __init__(self): super(SchemaRegistryConfig, self).__init__() diff --git a/tests/rptest/services/templates/redpanda.yaml b/tests/rptest/services/templates/redpanda.yaml index ff3708d67e17..ef7500f4b57b 100644 --- a/tests/rptest/services/templates/redpanda.yaml +++ b/tests/rptest/services/templates/redpanda.yaml @@ -132,6 +132,7 @@ schema_registry: {% endif %} api_doc_dir: {{root}}/usr/share/redpanda/proxy-api-doc + mode_mutability: {{ schema_registry_config.mode_mutability }} {% if schema_registry_config.truststore_file is not none %} schema_registry_api_tls: diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index d88e7600aa54..e0ef4051b415 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -32,6 +32,7 @@ from rptest.services.admin import Admin from rptest.services.cluster import cluster from rptest.services.redpanda import DEFAULT_LOG_ALLOW_LIST, MetricsEndpoint, ResourceSettings, SecurityConfig, LoggingConfig, PandaproxyConfig, SchemaRegistryConfig +from rptest.services.redpanda_types import SaslCredentials from rptest.services.serde_client import SerdeClient from rptest.tests.cluster_config_test import wait_for_version_status_sync from rptest.tests.pandaproxy_test import User, PandaProxyTLSProvider @@ -310,6 +311,49 @@ def _delete_config_subject(self, def _get_mode(self, headers=HTTP_GET_HEADERS, **kwargs): return self._request("GET", "mode", headers=headers, **kwargs) + def _set_mode(self, + data, + force=False, + headers=HTTP_POST_HEADERS, + **kwargs): + return self._request("PUT", + f"mode{'?force=true' if force else ''}", + headers=headers, + data=data, + **kwargs) + + def _get_mode_subject(self, + subject, + fallback=False, + headers=HTTP_GET_HEADERS, + **kwargs): + return self._request( + "GET", + f"mode/{subject}{'?defaultToGlobal=true' if fallback else ''}", + headers=headers, + **kwargs) + + def _set_mode_subject(self, + subject, + data, + force=False, + headers=HTTP_POST_HEADERS, + **kwargs): + return self._request("PUT", + f"mode/{subject}{'?force=true' if force else ''}", + headers=headers, + data=data, + **kwargs) + + def _delete_mode_subject(self, + subject, + headers=HTTP_POST_HEADERS, + **kwargs): + return self._request("DELETE", + f"mode/{subject}", + headers=headers, + **kwargs) + def _get_schemas_types(self, headers=HTTP_GET_HEADERS, tls_enabled: bool = False, @@ -985,15 +1029,6 @@ def test_config(self): assert result_raw.json( )["message"] == f"Subject 'foo-key' not found.", f"{json.dumps(result_raw.json(), indent=1)}" - @cluster(num_nodes=3) - def test_mode(self): - """ - Smoketest get_mode endpoint - """ - self.logger.debug("Get initial global mode") - result_raw = self._get_mode() - assert result_raw.json()["mode"] == "READWRITE" - @cluster(num_nodes=3) def test_post_compatibility_subject_version(self): """ @@ -1785,13 +1820,311 @@ def check_each_schema(subject: str, schemas: list[str], subjects[subject]["subject_versions"]) +class SchemaRegistryModeNotMutableTest(SchemaRegistryEndpoints): + """ + Test that mode cannot be mutated when mode_mutability=False. + """ + def __init__(self, context, **kwargs): + self.schema_registry_config = SchemaRegistryConfig() + self.schema_registry_config.mode_mutability = False + + super(SchemaRegistryEndpoints, self).__init__( + context, + schema_registry_config=self.schema_registry_config, + **kwargs) + + @cluster(num_nodes=3) + def test_mode_immutable(self): + + subject = f"{create_topic_names(1)[0]}-key" + + result_raw = self._get_mode() + assert result_raw.status_code == 200 + assert result_raw.json()["mode"] == "READWRITE" + + result_raw = self._set_mode(data=json.dumps({"mode": "INVALID"})) + assert result_raw.status_code == 422 + assert result_raw.json()["error_code"] == 42204 + + result_raw = self._set_mode(data=json.dumps({"mode": "READONLY"})) + assert result_raw.status_code == 422 + assert result_raw.json()["error_code"] == 42205 + assert result_raw.json()["message"] == "Mode changes are not allowed" + + # Check that setting it to the same value is still refused + result_raw = self._set_mode(data=json.dumps({"mode": "READWRITE"})) + assert result_raw.status_code == 422 + assert result_raw.json()["error_code"] == 42205 + assert result_raw.json()["message"] == "Mode changes are not allowed" + + result_raw = self._set_mode_subject(subject=subject, + data=json.dumps( + {"mode": "READONLY"})) + assert result_raw.status_code == 422 + assert result_raw.json()["error_code"] == 42205 + assert result_raw.json()["message"] == "Mode changes are not allowed" + + result_raw = self._delete_mode_subject(subject=subject) + assert result_raw.status_code == 404 + assert result_raw.json()["error_code"] == 40401 + assert result_raw.json( + )["message"] == f"Subject '{subject}' not found." + + +class SchemaRegistryModeMutableTest(SchemaRegistryEndpoints): + """ + Test schema registry mode against a redpanda cluster. + """ + def __init__(self, context, **kwargs): + self.schema_registry_config = SchemaRegistryConfig() + self.schema_registry_config.mode_mutability = True + super(SchemaRegistryEndpoints, self).__init__( + context, + schema_registry_config=self.schema_registry_config, + **kwargs) + + @cluster(num_nodes=3) + def test_mode(self): + """ + Smoketest mode endpoints + """ + subject = f"{create_topic_names(1)[0]}-key" + not_subject = f"{create_topic_names(1)[0]}-key" + + self.logger.debug("Get initial global mode") + result_raw = self._get_mode() + assert result_raw.status_code == 200 + assert result_raw.json()["mode"] == "READWRITE" + + self.logger.debug("Set invalid global mode") + result_raw = self._set_mode(data=json.dumps({"mode": "INVALID"})) + assert result_raw.status_code == 422 + assert result_raw.json()["error_code"] == 42204 + + self.logger.debug("Set global mode") + result_raw = self._set_mode(data=json.dumps({"mode": "READONLY"})) + assert result_raw.status_code == 200 + assert result_raw.json()["mode"] == "READONLY" + + self.logger.debug("Get global mode") + result_raw = self._get_mode() + assert result_raw.status_code == 200 + assert result_raw.json()["mode"] == "READONLY" + + self.logger.debug("Get mode for non-existant subject") + result_raw = self._get_mode_subject(subject=not_subject) + assert result_raw.status_code == 404 + assert result_raw.json()["error_code"] == 40409 + + self.logger.debug("Get mode for non-existant subject, with fallback") + result_raw = self._get_mode_subject(subject=not_subject, fallback=True) + assert result_raw.status_code == 200 + assert result_raw.json()["mode"] == "READONLY" + + self.logger.debug("Set mode for non-existant subject (allowed)") + result_raw = self._set_mode_subject(subject=subject, + data=json.dumps( + {"mode": "READWRITE"})) + assert result_raw.status_code == 200 + assert result_raw.json()["mode"] == "READWRITE" + + self.logger.debug("Set invalid subject mode") + result_raw = self._set_mode_subject(subject="test-sub", + data=json.dumps( + {"mode": "INVALID"})) + assert result_raw.status_code == 422 + assert result_raw.json()["error_code"] == 42204 + + self.logger.debug("Get mode for non-existant subject") + result_raw = self._get_mode_subject(subject=subject, fallback=False) + assert result_raw.status_code == 200 + assert result_raw.json()["mode"] == "READWRITE" + + self.logger.debug("Delete mode for non-existant subject") + result_raw = self._delete_mode_subject(subject=subject) + assert result_raw.status_code == 200 + assert result_raw.json()["mode"] == "READWRITE" + + self.logger.debug("Get mode for non-existant subject") + result_raw = self._get_mode_subject(subject=subject, fallback=False) + assert result_raw.status_code == 404 + assert result_raw.json()["error_code"] == 40409 + + self.logger.debug("Set global mode to READWRITE") + result_raw = self._set_mode(data=json.dumps({"mode": "READWRITE"})) + assert result_raw.status_code == 200 + + self.logger.debug("Add a schema") + result_raw = self._post_subjects_subject_versions( + subject=subject, data=json.dumps({"schema": schema1_def})) + assert result_raw.status_code == requests.codes.ok + + self.logger.debug("Set global mode to IMPORT") + result_raw = self._set_mode(data=json.dumps({"mode": "IMPORT"})) + assert result_raw.status_code == 422 + assert result_raw.json()["error_code"] == 42204 + assert result_raw.json( + )["message"] == "Invalid mode. Valid values are READWRITE, READONLY" + + self.logger.debug("Set subject mode to IMPORT") + result_raw = self._set_mode_subject(subject="test-sub", + data=json.dumps({"mode": + "IMPORT"})) + assert result_raw.status_code == 422 + assert result_raw.json()["error_code"] == 42204 + assert result_raw.json( + )["message"] == "Invalid mode. Valid values are READWRITE, READONLY" + + @cluster(num_nodes=3) + def test_mode_readonly(self): + """ + Test endpoints when in READONLY + """ + ro_subject = f"ro-{create_topic_names(1)[0]}-key" + rw_subject = f"rw-{create_topic_names(1)[0]}-key" + + schema1 = json.dumps({"schema": schema1_def}) + schema2 = json.dumps({"schema": schema2_def}) + + self.logger.info("Posting schema 1 as ro_subject key") + result_raw = self._post_subjects_subject_versions( + subject=ro_subject, data=json.dumps({"schema": schema1_def})) + assert result_raw.status_code == requests.codes.ok + + self.logger.debug("Set global mode to readonly") + result_raw = self._set_mode(data=json.dumps({"mode": "READONLY"})) + assert result_raw.status_code == 200 + assert result_raw.json()["mode"] == "READONLY" + + self.logger.debug("Override mode for rw_subject") + result_raw = self._set_mode_subject(subject=rw_subject, + data=json.dumps( + {"mode": "READWRITE"})) + assert result_raw.status_code == 200 + assert result_raw.json()["mode"] == "READWRITE" + + self.logger.info("Posting schema 1 as rw_subject key") + result_raw = self._post_subjects_subject_versions(subject=rw_subject, + data=schema1) + assert result_raw.status_code == requests.codes.ok + + # mode + result_raw = self._get_mode() + assert result_raw.status_code == 200 + + for sub in [ro_subject, rw_subject]: + result_raw = self._get_mode_subject(subject=sub, fallback=True) + assert result_raw.status_code == 200 + + # config + result_raw = self._get_config() + assert result_raw.status_code == 200 + + for sub in [ro_subject, rw_subject]: + result_raw = self._get_config_subject(subject=sub, fallback=True) + assert result_raw.status_code == 200 + + # This is the default, check that setting it to the default/existing is failure, not quiet success + compat_back = json.dumps({"compatibility": "BACKWARD"}) + result_raw = self._set_config(data=compat_back) + assert result_raw.status_code == 422 + assert result_raw.json()["error_code"] == 42205 + assert result_raw.json( + )["message"] == "Subject null is in read-only mode" + + result_raw = self._set_config_subject(subject=ro_subject, + data=compat_back) + assert result_raw.status_code == 422 + assert result_raw.json()["error_code"] == 42205 + assert result_raw.json( + )["message"] == f"Subject {ro_subject} is in read-only mode" + + result_raw = self._set_config_subject(subject=rw_subject, + data=compat_back) + assert result_raw.status_code == 200 + + # The config doesn't exist, but the mode is checked first + result_raw = self._delete_config_subject(subject=ro_subject) + assert result_raw.status_code == 422 + assert result_raw.json()["error_code"] == 42205 + assert result_raw.json( + )["message"] == f"Subject {ro_subject} is in read-only mode" + + result_raw = self._delete_config_subject(subject=rw_subject) + assert result_raw.status_code == 200 + + # subjects + result_raw = self._get_subjects() + assert result_raw.status_code == 200 + + for sub in [ro_subject, rw_subject]: + result_raw = self._get_subjects_subject_versions(subject=sub) + assert result_raw.status_code == 200 + + result_raw = self._get_subjects_subject_versions_version( + subject=sub, version=1) + assert result_raw.status_code == 200 + + result_raw = self._get_subjects_subject_versions_version_referenced_by( + subject=sub, version=1) + assert result_raw.status_code == 200 + + self.logger.info("Checking for schema 1 as subject key") + result_raw = self._post_subjects_subject(subject=sub, data=schema1) + assert result_raw.status_code == requests.codes.ok + assert result_raw.json()["id"] == 1 + assert result_raw.json()["version"] == 1 + + self.logger.info("Checking for schema 1 as subject key") + result_raw = self._post_subjects_subject_versions(subject=sub, + data=schema1) + assert result_raw.status_code == requests.codes.ok + assert result_raw.json()["id"] == 1 + + self.logger.info("Checking schema 2 as subject key") + result_raw = self._post_subjects_subject(subject=sub, data=schema2) + assert result_raw.status_code == 404 + assert result_raw.json()["error_code"] == 40403 + assert result_raw.json()["message"] == f"Schema not found" + + self.logger.info("Posting schema 2 as ro_subject key") + result_raw = self._post_subjects_subject_versions(subject=ro_subject, + data=schema2) + assert result_raw.status_code == 422 + assert result_raw.json()["error_code"] == 42205 + assert result_raw.json( + )["message"] == f"Subject {ro_subject} is in read-only mode" + + self.logger.info("Posting schema 2 as rw_subject key") + result_raw = self._post_subjects_subject_versions(subject=rw_subject, + data=schema2) + assert result_raw.status_code == 200 + + # compatibility + for sub in [ro_subject, rw_subject]: + result_raw = self._post_compatibility_subject_version(subject=sub, + version=1, + data=schema2) + assert result_raw.status_code == 200 + + # schemas + result_raw = self._get_schemas_types() + assert result_raw.status_code == 200 + + result_raw = self._get_schemas_ids_id(id=1) + assert result_raw.status_code == 200 + + result_raw = self._get_schemas_ids_id_subjects(id=1) + assert result_raw.status_code == 200 + + result_raw = self._get_schemas_ids_id_versions(id=1) + assert result_raw.status_code == 200 + + class SchemaRegistryBasicAuthTest(SchemaRegistryEndpoints): """ Test schema registry against a redpanda cluster with HTTP Basic Auth enabled. """ - username = 'red' - password = 'panda' - def __init__(self, context): security = SecurityConfig() security.enable_sasl = True @@ -1799,26 +2132,39 @@ def __init__(self, context): schema_registry_config = SchemaRegistryConfig() schema_registry_config.authn_method = 'http_basic' + schema_registry_config.mode_mutability = True super(SchemaRegistryBasicAuthTest, self).__init__(context, security=security, schema_registry_config=schema_registry_config) + superuser = self.redpanda.SUPERUSER_CREDENTIALS + self.user = SaslCredentials('user', 'panda', superuser.mechanism) + public_user = SaslCredentials('red', 'panda', superuser.mechanism) + + self.super_auth = (superuser.username, superuser.password) + self.user_auth = (self.user.username, self.user.password) + self.public_auth = (public_user.username, public_user.password) + + def _init_users(self): + admin = Admin(self.redpanda) + admin.create_user(username=self.user.username, + password=self.user.password, + algorithm=self.user.mechanism) + @cluster(num_nodes=3) def test_schemas_types(self): """ Verify the schema registry returns the supported types """ - result_raw = self._get_schemas_types(auth=(self.username, - self.password)) - assert result_raw.json()['error_code'] == 40101 + self._init_users() - super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS + result_raw = self._get_schemas_types(auth=self.public_auth) + assert result_raw.json()['error_code'] == 40101 self.logger.debug(f"Request schema types with default accept header") - result_raw = self._get_schemas_types(auth=(super_username, - super_password)) + result_raw = self._get_schemas_types(auth=self.super_auth) assert result_raw.status_code == requests.codes.ok result = result_raw.json() assert set(result) == {"PROTOBUF", "AVRO"} @@ -1828,7 +2174,7 @@ def test_get_schema_id_versions(self): """ Verify schema versions """ - super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS + self._init_users() topic = create_topic_names(1)[0] subject = f"{topic}-key" @@ -1836,23 +2182,20 @@ def test_get_schema_id_versions(self): schema_1_data = json.dumps({"schema": schema1_def}) self.logger.debug("Posting schema 1 as a subject key") - result_raw = self._post_subjects_subject_versions( - subject=subject, - data=schema_1_data, - auth=(super_username, super_password)) + result_raw = self._post_subjects_subject_versions(subject=subject, + data=schema_1_data, + auth=self.super_auth) self.logger.debug(result_raw) assert result_raw.status_code == requests.codes.ok assert result_raw.json()["id"] == 1 self.logger.debug("Checking schema 1 versions") result_raw = self._get_schemas_ids_id_versions(id=1, - auth=(self.username, - self.password)) + auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 result_raw = self._get_schemas_ids_id_versions(id=1, - auth=(super_username, - super_password)) + auth=self.super_auth) assert result_raw.status_code == requests.codes.ok assert result_raw.json() == [{"subject": subject, "version": 1}] @@ -1861,70 +2204,57 @@ def test_post_subjects_subject_versions(self): """ Verify posting a schema """ + self._init_users() topic = create_topic_names(1)[0] schema_1_data = json.dumps({"schema": schema1_def}) result_raw = self._post_subjects_subject_versions( - subject=f"{topic}-key", - data=schema_1_data, - auth=(self.username, self.password)) + subject=f"{topic}-key", data=schema_1_data, auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 - super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS - self.logger.debug("Posting schema 1 as a subject key") result_raw = self._post_subjects_subject_versions( - subject=f"{topic}-key", - data=schema_1_data, - auth=(super_username, super_password)) + subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth) self.logger.debug(result_raw) assert result_raw.status_code == requests.codes.ok assert result_raw.json()["id"] == 1 self.logger.debug("Get subjects") - result_raw = self._get_subjects(auth=(self.username, self.password)) + result_raw = self._get_subjects(auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 - result_raw = self._get_subjects(auth=(super_username, super_password)) + result_raw = self._get_subjects(auth=self.super_auth) assert result_raw.json() == [f"{topic}-key"] self.logger.debug("Get schema versions for subject key") result_raw = self._get_subjects_subject_versions( - subject=f"{topic}-key", auth=(self.username, self.password)) + subject=f"{topic}-key", auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 result_raw = self._get_subjects_subject_versions( - subject=f"{topic}-key", auth=(super_username, super_password)) + subject=f"{topic}-key", auth=self.super_auth) assert result_raw.status_code == requests.codes.ok assert result_raw.json() == [1] self.logger.debug("Get latest schema version for subject key") result_raw = self._get_subjects_subject_versions_version( - subject=f"{topic}-key", - version="latest", - auth=(self.username, self.password)) + subject=f"{topic}-key", version="latest", auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 result_raw = self._get_subjects_subject_versions_version( - subject=f"{topic}-key", - version="latest", - auth=(super_username, super_password)) + subject=f"{topic}-key", version="latest", auth=self.super_auth) assert result_raw.status_code == requests.codes.ok result = result_raw.json() assert result["subject"] == f"{topic}-key" assert result["version"] == 1 self.logger.debug("Get schema version 1") - result_raw = self._get_schemas_ids_id(id=1, - auth=(self.username, - self.password)) + result_raw = self._get_schemas_ids_id(id=1, auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 - result_raw = self._get_schemas_ids_id(id=1, - auth=(super_username, - super_password)) + result_raw = self._get_schemas_ids_id(id=1, auth=self.super_auth) assert result_raw.status_code == requests.codes.ok @cluster(num_nodes=3) @@ -1932,33 +2262,32 @@ def test_post_subjects_subject(self): """ Verify posting a schema """ + self._init_users() topic = create_topic_names(1)[0] subject = f"{topic}-key" - super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS - self.logger.info("Posting schema 1 as a subject key") result_raw = self._post_subjects_subject_versions( subject=subject, data=json.dumps({"schema": schema1_def}), - auth=(super_username, super_password)) + auth=self.super_auth) self.logger.info(result_raw) self.logger.info(result_raw.content) assert result_raw.status_code == requests.codes.ok assert result_raw.json()["id"] == 1 - result_raw = self._post_subjects_subject( - subject=subject, - data=json.dumps({"schema": schema1_def}), - auth=(self.username, self.password)) + result_raw = self._post_subjects_subject(subject=subject, + data=json.dumps( + {"schema": schema1_def}), + auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 self.logger.info("Posting existing schema should be success") - result_raw = self._post_subjects_subject( - subject=subject, - data=json.dumps({"schema": schema1_def}), - auth=(super_username, super_password)) + result_raw = self._post_subjects_subject(subject=subject, + data=json.dumps( + {"schema": schema1_def}), + auth=self.super_auth) self.logger.info(result_raw) self.logger.info(result_raw.content) assert result_raw.status_code == requests.codes.ok @@ -1973,24 +2302,24 @@ def test_config(self): """ Smoketest config endpoints """ - super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS + self._init_users() self.logger.debug("Get initial global config") - result_raw = self._get_config(auth=(self.username, self.password)) + result_raw = self._get_config(auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 - result_raw = self._get_config(auth=(super_username, super_password)) + result_raw = self._get_config(auth=self.super_auth) assert result_raw.json()["compatibilityLevel"] == "BACKWARD" self.logger.debug("Set global config") result_raw = self._set_config(data=json.dumps( {"compatibility": "FULL"}), - auth=(self.username, self.password)) + auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 result_raw = self._set_config(data=json.dumps( {"compatibility": "FULL"}), - auth=(super_username, super_password)) + auth=self.super_auth) assert result_raw.json()["compatibility"] == "FULL" schema_1_data = json.dumps({"schema": schema1_def}) @@ -1999,102 +2328,141 @@ def test_config(self): self.logger.debug("Posting schema 1 as a subject key") result_raw = self._post_subjects_subject_versions( - subject=f"{topic}-key", - data=schema_1_data, - auth=(super_username, super_password)) + subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth) self.logger.debug("Set subject config") self.logger.debug("Set subject config") result_raw = self._set_config_subject( subject=f"{topic}-key", data=json.dumps({"compatibility": "BACKWARD_TRANSITIVE"}), - auth=(self.username, self.password)) + auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 result_raw = self._set_config_subject( subject=f"{topic}-key", data=json.dumps({"compatibility": "BACKWARD_TRANSITIVE"}), - auth=(super_username, super_password)) + auth=self.super_auth) assert result_raw.status_code == requests.codes.ok assert result_raw.json()["compatibility"] == "BACKWARD_TRANSITIVE" self.logger.debug("Get subject config - should be overriden") result_raw = self._get_config_subject(subject=f"{topic}-key", - auth=(self.username, - self.password)) + auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 result_raw = self._get_config_subject(subject=f"{topic}-key", - auth=(super_username, - super_password)) + auth=self.super_auth) assert result_raw.json()["compatibilityLevel"] == "BACKWARD_TRANSITIVE" - global_config = self._get_config(auth=(super_username, - super_password)).json() + global_config = self._get_config(auth=self.super_auth).json() old_config = result_raw.json() result_raw = self._delete_config_subject(subject=f"{topic}-key", - auth=(super_username, - super_password)) + auth=self.super_auth) assert result_raw.json( )["compatibilityLevel"] == old_config["compatibilityLevel"] #, f"{json.dumps(result_raw.json(), indent=1)}, {json.dumps(global_config, indent=1)}" result_raw = self._get_config_subject(subject=f"{topic}-key", fallback=True, - auth=(super_username, - super_password)) + auth=self.super_auth) assert result_raw.json( )["compatibilityLevel"] == global_config["compatibilityLevel"] @cluster(num_nodes=3) def test_mode(self): """ - Smoketest get_mode endpoint + Smoketest mode endpoints """ - result_raw = self._get_mode(auth=(self.username, self.password)) + self._init_users() + + self.logger.debug("Get initial global mode") + result_raw = self._get_mode(auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 - super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS + result_raw = self._get_mode(auth=self.user_auth) + assert result_raw.json()["mode"] == "READWRITE" - self.logger.debug("Get initial global mode") - result_raw = self._get_mode(auth=(super_username, super_password)) + result_raw = self._get_mode(auth=self.super_auth) assert result_raw.json()["mode"] == "READWRITE" + self.logger.debug("Set global mode") + result_raw = self._set_mode(data=json.dumps({"mode": "READONLY"}), + auth=self.public_auth) + assert result_raw.json()['error_code'] == 40101 + + result_raw = self._set_mode(data=json.dumps({"mode": "READONLY"}), + auth=self.user_auth) + assert result_raw.json()['error_code'] == 403 + + result_raw = self._set_mode(data=json.dumps({"mode": "READONLY"}), + auth=self.super_auth) + assert result_raw.json()["mode"] == "READONLY" + + sub = "test-sub" + self.logger.debug("Set subject mode") + result_raw = self._set_mode_subject(subject=sub, + data=json.dumps( + {"mode": "READONLY"}), + auth=self.public_auth) + assert result_raw.json()['error_code'] == 40101 + + result_raw = self._set_mode_subject(subject=sub, + data=json.dumps( + {"mode": "READONLY"}), + auth=self.user_auth) + assert result_raw.json()['error_code'] == 403 + + result_raw = self._set_mode_subject(subject=sub, + data=json.dumps( + {"mode": "READONLY"}), + auth=self.super_auth) + assert result_raw.json()["mode"] == "READONLY" + + self.logger.debug("Delete subject mode") + result_raw = self._delete_mode_subject(subject=sub, + auth=self.public_auth) + assert result_raw.json()['error_code'] == 40101 + + result_raw = self._delete_mode_subject(subject=sub, + auth=self.user_auth) + assert result_raw.json()['error_code'] == 403 + + result_raw = self._delete_mode_subject(subject=sub, + auth=self.super_auth) + assert result_raw.json()["mode"] == "READONLY" + @cluster(num_nodes=3) def test_post_compatibility_subject_version(self): """ Verify compatibility """ + self._init_users() topic = create_topic_names(1)[0] self.logger.debug(f"Register a schema against a subject") schema_1_data = json.dumps({"schema": schema1_def}) - super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS - self.logger.debug("Posting schema 1 as a subject key") result_raw = self._post_subjects_subject_versions( - subject=f"{topic}-key", - data=schema_1_data, - auth=(super_username, super_password)) + subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth) self.logger.debug(result_raw) assert result_raw.status_code == requests.codes.ok self.logger.debug("Set subject config - NONE") - result_raw = self._set_config_subject( - subject=f"{topic}-key", - data=json.dumps({"compatibility": "NONE"}), - auth=(super_username, super_password)) + result_raw = self._set_config_subject(subject=f"{topic}-key", + data=json.dumps( + {"compatibility": "NONE"}), + auth=self.super_auth) assert result_raw.status_code == requests.codes.ok result_raw = self._post_compatibility_subject_version( subject=f"{topic}-key", version=1, data=schema_1_data, - auth=(self.username, self.password)) + auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 self.logger.debug("Check compatibility none, no default") @@ -2102,7 +2470,7 @@ def test_post_compatibility_subject_version(self): subject=f"{topic}-key", version=1, data=schema_1_data, - auth=(super_username, super_password)) + auth=self.super_auth) assert result_raw.status_code == requests.codes.ok assert result_raw.json()["is_compatible"] == True @@ -2111,43 +2479,38 @@ def test_delete_subject(self): """ Verify delete subject """ + self._init_users() topic = create_topic_names(1)[0] self.logger.debug(f"Register a schema against a subject") schema_1_data = json.dumps({"schema": schema1_def}) - super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS - self.logger.debug("Posting schema 1 as a subject key") result_raw = self._post_subjects_subject_versions( - subject=f"{topic}-key", - data=schema_1_data, - auth=(super_username, super_password)) + subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth) self.logger.debug(result_raw) assert result_raw.status_code == requests.codes.ok self.logger.debug("Soft delete subject") result_raw = self._delete_subject(subject=f"{topic}-key", - auth=(self.username, self.password)) + auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 result_raw = self._delete_subject(subject=f"{topic}-key", - auth=(super_username, - super_password)) + auth=self.super_auth) assert result_raw.status_code == requests.codes.ok assert result_raw.json() == [1] self.logger.debug("Permanently delete subject") result_raw = self._delete_subject(subject=f"{topic}-key", permanent=True, - auth=(self.username, self.password)) + auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 result_raw = self._delete_subject(subject=f"{topic}-key", permanent=True, - auth=(super_username, - super_password)) + auth=self.super_auth) self.logger.debug(result_raw) assert result_raw.status_code == requests.codes.ok @@ -2156,40 +2519,35 @@ def test_delete_subject_version(self): """ Verify delete subject version """ + self._init_users() topic = create_topic_names(1)[0] self.logger.debug(f"Register a schema against a subject") schema_1_data = json.dumps({"schema": schema1_def}) - super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS - self.logger.debug("Posting schema 1 as a subject key") result_raw = self._post_subjects_subject_versions( - subject=f"{topic}-key", - data=schema_1_data, - auth=(super_username, super_password)) + subject=f"{topic}-key", data=schema_1_data, auth=self.super_auth) self.logger.debug(result_raw) assert result_raw.status_code == requests.codes.ok self.logger.debug("Set subject config - NONE") - result_raw = self._set_config_subject( - subject=f"{topic}-key", - data=json.dumps({"compatibility": "NONE"}), - auth=(super_username, super_password)) + result_raw = self._set_config_subject(subject=f"{topic}-key", + data=json.dumps( + {"compatibility": "NONE"}), + auth=self.super_auth) assert result_raw.status_code == requests.codes.ok self.logger.debug("Soft delete version 1") result_raw = self._delete_subject_version(subject=f"{topic}-key", version=1, - auth=(self.username, - self.password)) + auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 result_raw = self._delete_subject_version(subject=f"{topic}-key", version=1, - auth=(super_username, - super_password)) + auth=self.super_auth) self.logger.debug(result_raw) assert result_raw.status_code == requests.codes.ok @@ -2197,15 +2555,13 @@ def test_delete_subject_version(self): result_raw = self._delete_subject_version(subject=f"{topic}-key", version=1, permanent=True, - auth=(self.username, - self.password)) + auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 result_raw = self._delete_subject_version(subject=f"{topic}-key", version=1, permanent=True, - auth=(super_username, - super_password)) + auth=self.super_auth) self.logger.debug(result_raw) assert result_raw.status_code == requests.codes.ok @@ -2214,8 +2570,7 @@ def test_protobuf(self): """ Verify basic protobuf functionality """ - - super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS + self._init_users() self.logger.info("Posting failed schema should be 422") result_raw = self._post_subjects_subject_versions( @@ -2224,19 +2579,20 @@ def test_protobuf(self): "schema": imported_proto_def, "schemaType": "PROTOBUF" }), - auth=(super_username, super_password)) + auth=self.super_auth) self.logger.info(result_raw) self.logger.info(result_raw.content) assert result_raw.status_code == requests.codes.unprocessable_entity self.logger.info("Posting simple as a subject key") - result_raw = self._post_subjects_subject_versions( - subject="simple", - data=json.dumps({ - "schema": simple_proto_def, - "schemaType": "PROTOBUF" - }), - auth=(super_username, super_password)) + result_raw = self._post_subjects_subject_versions(subject="simple", + data=json.dumps({ + "schema": + simple_proto_def, + "schemaType": + "PROTOBUF" + }), + auth=self.super_auth) self.logger.info(result_raw) self.logger.info(result_raw.content) assert result_raw.status_code == requests.codes.ok @@ -2256,7 +2612,7 @@ def test_protobuf(self): "version": 1 }] }), - auth=(super_username, super_password)) + auth=self.super_auth) self.logger.info(result_raw) self.logger.info(result_raw.content) assert result_raw.status_code == requests.codes.ok @@ -2265,7 +2621,7 @@ def test_protobuf(self): result_raw = self._request("GET", f"subjects/simple/versions/1/schema", headers=HTTP_GET_HEADERS, - auth=(super_username, super_password)) + auth=self.super_auth) self.logger.info(result_raw) assert result_raw.status_code == requests.codes.ok assert result_raw.text.strip() == simple_proto_def.strip() @@ -2273,7 +2629,7 @@ def test_protobuf(self): result_raw = self._request("GET", f"schemas/ids/1", headers=HTTP_GET_HEADERS, - auth=(super_username, super_password)) + auth=self.super_auth) self.logger.info(result_raw) assert result_raw.status_code == requests.codes.ok result = result_raw.json() @@ -2282,11 +2638,11 @@ def test_protobuf(self): # Regular user should fail result_raw = self._get_subjects_subject_versions_version_referenced_by( - "simple", 1, auth=(self.username, self.password)) + "simple", 1, auth=self.public_auth) assert result_raw.json()['error_code'] == 40101 result_raw = self._get_subjects_subject_versions_version_referenced_by( - "simple", 1, auth=(super_username, super_password)) + "simple", 1, auth=self.super_auth) self.logger.info(result_raw) assert result_raw.status_code == requests.codes.ok assert result_raw.json() == [2]