Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] [CORE-60] Schema Registry: Support /mode #18623

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 201 additions & 8 deletions src/v/pandaproxy/api/api-doc/schema_registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,210 @@
"application/json"
],
"parameters": [],
"produces": ["application/vnd.schemaregistry.v1+json"],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "object",
"properties": {
"mode": {
"type": "string"
}
}
"$ref": "#/definitions/mode"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
},
"put": {
"summary": "Set the global mode.",
"operationId": "put_mode",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "mode",
"in": "body",
"schema": {
"$ref": "#/definitions/mode"
}
}
],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/mode"
}
},
"422": {
"description": "Unprocessable Entity",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
}
},
"/mode/{subject}": {
"get": {
"summary": "Get the mode for a subject.",
"operationId": "get_mode_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "subject",
"description": "The subject to get the mode for.",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "defaultToGlobal",
"description": "If true, return the global mode if the subject doesn't have a mode set.",
"in": "query",
"required": false,
"type": "boolean"
}
],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/mode"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
},
"put": {
"summary": "Set the mode for a subject.",
"operationId": "put_mode_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "subject",
"description": "The subject to set the mode for.",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "mode",
"in": "body",
"schema": {
"$ref": "#/definitions/mode"
}
}
],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/mode"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"422": {
"description": "Unprocessable Entity",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
},
"delete": {
"summary": "Delete the mode for a subject.",
"operationId": "delete_mode_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "subject",
"description": "The subject to delete the mode for.",
"in": "path",
"required": true,
"type": "string"
}
],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/mode"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
Expand Down Expand Up @@ -423,7 +616,7 @@
},
"/subjects/{subject}": {
"post": {
"summary": "Check if a schema is already registred for the subject.",
"summary": "Check if a schema is already registered for the subject.",
"operationId": "post_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
Expand Down
12 changes: 12 additions & 0 deletions src/v/pandaproxy/api/api-doc/schema_registry_definitions.def.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,17 @@
"compatibility": {
"type": "string"
}
}
},
"mode": {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": [
"READWRITE",
"READONLY"
]
},
}
}
2 changes: 1 addition & 1 deletion src/v/pandaproxy/api/api-doc/schema_registry_header.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"swagger": "2.0",
"info": {
"title": "Pandaproxy Schema Registry",
"version": "1.0.3"
"version": "1.0.4"
},
"host": "{{Host}}",
"basePath": "/",
Expand Down
43 changes: 43 additions & 0 deletions src/v/pandaproxy/auth_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,47 @@ inline credential_t maybe_authenticate_request(

return user;
}

enum class auth_level {
// Unauthenticated endpoint (not a typo, 'public' is a keyword)
publik = 0,
// Requires authentication (if enabled) but not superuser status
user = 1,
// Requires authentication (if enabled) and superuser status
superuser = 2
};

inline credential_t maybe_authorize_request(
config::rest_authn_method authn_method,
auth_level lvl,
request_authenticator& authenticator,
const ss::http::request& req) {
credential_t user;

if (authn_method != config::rest_authn_method::none) {
// Will throw 400 & 401 if auth fails
auto auth_result = authenticator.authenticate(req);
// Will throw 403 if user enabled HTTP Basic Auth but
// did not give the authorization header.
switch (lvl) {
case auth_level::superuser:
auth_result.require_superuser();
break;
case auth_level::user:
auth_result.require_authenticated();
break;
case auth_level::publik:
auth_result.pass();
break;
}

user = credential_t{
auth_result.get_username(),
auth_result.get_password(),
auth_result.get_sasl_mechanism()};
}

return user;
}

} // namespace pandaproxy
4 changes: 4 additions & 0 deletions src/v/pandaproxy/error.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ struct reply_error_category final : std::error_category {
return "subject_version_not_deleted";
case reply_error_code::compatibility_not_found:
return "compatibility_not_found";
case reply_error_code::mode_not_found:
return "mode_not_found";
case reply_error_code::serialization_error:
return "serialization_error";
case reply_error_code::consumer_already_exists:
Expand All @@ -140,6 +142,8 @@ struct reply_error_category final : std::error_category {
return "Invalid schema version";
case reply_error_code::compatibility_level_invalid:
return "Invalid compatibility level";
case reply_error_code::mode_invalid:
return "Invalid mode";
case reply_error_code::subject_version_operaton_not_permitted:
return "Overwrite new schema is not permitted.";
case reply_error_code::subject_version_has_references:
Expand Down
2 changes: 2 additions & 0 deletions src/v/pandaproxy/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@ enum class reply_error_code : uint16_t {
subject_version_soft_deleted = 40406,
subject_version_not_deleted = 40407,
compatibility_not_found = 40408,
mode_not_found = 40409,
serialization_error = 40801,
consumer_already_exists = 40902,
schema_empty = 42201,
schema_version_invalid = 42202,
compatibility_level_invalid = 42203,
mode_invalid = 42204,
subject_version_operaton_not_permitted = 42205,
subject_version_has_references = 42206,
subject_version_schema_id_already_exists = 42207,
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/schema_registry/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ api::~api() noexcept = default;

ss::future<> api::start() {
_store = std::make_unique<sharded_store>();
co_await _store->start(_sg);
co_await _store->start(is_mutable(_cfg.mode_mutability), _sg);
co_await _schema_id_validation_probe.start();
co_await _schema_id_validation_probe.invoke_on_all(
&schema_id_validation_probe::setup_metrics);
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ configuration::configuration()
{},
{},
config::endpoint_tls_config::validate_many)
, mode_mutability(*this, "mode_mutability", "Allow modifying mode", {}, false)
, schema_registry_replication_factor(
*this,
"schema_registry_replication_factor",
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct configuration final : public config::config_store {
schema_registry_api;
config::one_or_many_property<config::endpoint_tls_config>
schema_registry_api_tls;
config::property<bool> mode_mutability;
config::property<std::optional<int16_t>> schema_registry_replication_factor;
config::property<ss::sstring> api_doc_dir;
};
Expand Down
8 changes: 8 additions & 0 deletions src/v/pandaproxy/schema_registry/error.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ struct error_category final : std::error_category {
case error_code::compatibility_not_found:
return "Subject does not have subject-level compatibility "
"configured";
case error_code::mode_not_found:
return "Subject does not have subject-level mode configured";
case error_code::subject_version_operaton_not_permitted:
return "Overwrite new schema is not permitted.";
case error_code::subject_version_has_references:
Expand All @@ -69,6 +71,8 @@ struct error_category final : std::error_category {
return "Invalid compatibility level. Valid values are NONE, "
"BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, "
"FORWARD_TRANSITIVE, and FULL_TRANSITIVE";
case error_code::mode_invalid:
return "Invalid mode. Valid values are READWRITE, READONLY";
}
return "(unrecognized error)";
}
Expand All @@ -93,6 +97,8 @@ struct error_category final : std::error_category {
return reply_error_code::subject_version_not_deleted; // 40407
case error_code::compatibility_not_found:
return reply_error_code::compatibility_not_found; // 40408
case error_code::mode_not_found:
return reply_error_code::mode_not_found; // 40409
case error_code::subject_schema_invalid:
return reply_error_code::internal_server_error; // 500
case error_code::write_collision:
Expand All @@ -117,6 +123,8 @@ struct error_category final : std::error_category {
return reply_error_code::zookeeper_error; // 50001
case error_code::compatibility_level_invalid:
return reply_error_code::compatibility_level_invalid; // 42203
case error_code::mode_invalid:
return reply_error_code::mode_invalid; // 42204
}
return {};
}
Expand Down
Loading
Loading