Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-60] Schema Registry: Support /mode #17952

Merged
merged 18 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 201 additions & 8 deletions src/v/pandaproxy/api/api-doc/schema_registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,210 @@
"application/json"
],
"parameters": [],
"produces": ["application/vnd.schemaregistry.v1+json"],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "object",
"properties": {
"mode": {
"type": "string"
}
}
"$ref": "#/definitions/mode"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
},
"put": {
"summary": "Set the global mode.",
"operationId": "put_mode",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "mode",
"in": "body",
"schema": {
"$ref": "#/definitions/mode"
}
}
],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/mode"
}
},
"422": {
"description": "Unprocessable Entity",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
}
},
"/mode/{subject}": {
"get": {
"summary": "Get the mode for a subject.",
"operationId": "get_mode_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "subject",
"description": "The subject to get the mode for.",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "defaultToGlobal",
Copy link

@kbatuigas kbatuigas May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the description for the query param

query the mode for a particular subject, regardless of whether it has a specific override

and the endpoint (without the query param)

If there is no specific mode set for the subject an error shall be returned.

Does defaultToGlobal mean, if there is no specific mode set for the subject, return the global mode instead? I wasn't sure what "specific override" was referring to

Copy link
Member Author

@BenPope BenPope May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the description for the query param

query the mode for a particular subject, regardless of whether it has a specific override

and the endpoint (without the query param)

If there is no specific mode set for the subject an error shall be returned.

Does defaultToGlobal mean, if there is no specific mode set for the subject, return the global mode instead? I wasn't sure what "specific override" was referring to

Yeah, so the global mode is in force for all subjects, unless a subject has a mode set, in which case that will override whatever is global, for the subject.

To query whether a subject has a mode set, either don't specify defaultToGlobal=true, or specify defaultToGlobal=false, and detect the success or error.

But if you're just querying what mode is in force, specify defaultToGlobal=true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kbatuigas I added some short descriptions for the parameters.

"description": "If true, return the global mode if the subject doesn't have a mode set.",
"in": "query",
"required": false,
"type": "boolean"
}
],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/mode"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
},
"put": {
"summary": "Set the mode for a subject.",
"operationId": "put_mode_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "subject",
"description": "The subject to set the mode for.",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "mode",
"in": "body",
"schema": {
"$ref": "#/definitions/mode"
}
}
],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/mode"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"422": {
"description": "Unprocessable Entity",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
},
"delete": {
"summary": "Delete the mode for a subject.",
"operationId": "delete_mode_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "subject",
"description": "The subject to delete the mode for.",
"in": "path",
"required": true,
"type": "string"
}
],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/mode"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
Expand Down Expand Up @@ -423,7 +616,7 @@
},
"/subjects/{subject}": {
"post": {
"summary": "Check if a schema is already registred for the subject.",
"summary": "Check if a schema is already registered for the subject.",
oleiman marked this conversation as resolved.
Show resolved Hide resolved
"operationId": "post_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,17 @@
"compatibility": {
"type": "string"
}
}
},
"mode": {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": [
"READWRITE",
"READONLY"
]
},
}
}
2 changes: 1 addition & 1 deletion src/v/pandaproxy/api/api-doc/schema_registry_header.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"swagger": "2.0",
"info": {
"title": "Pandaproxy Schema Registry",
"version": "1.0.3"
"version": "1.0.4"
},
"host": "{{Host}}",
"basePath": "/",
Expand Down
43 changes: 43 additions & 0 deletions src/v/pandaproxy/auth_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,47 @@ inline credential_t maybe_authenticate_request(

return user;
}

enum class auth_level {
// Unauthenticated endpoint (not a typo, 'public' is a keyword)
publik = 0,
// Requires authentication (if enabled) but not superuser status
user = 1,
// Requires authentication (if enabled) and superuser status
superuser = 2
};

inline credential_t maybe_authorize_request(
config::rest_authn_method authn_method,
auth_level lvl,
request_authenticator& authenticator,
const ss::http::request& req) {
credential_t user;

if (authn_method != config::rest_authn_method::none) {
// Will throw 400 & 401 if auth fails
auto auth_result = authenticator.authenticate(req);
// Will throw 403 if user enabled HTTP Basic Auth but
// did not give the authorization header.
switch (lvl) {
case auth_level::superuser:
auth_result.require_superuser();
break;
case auth_level::user:
auth_result.require_authenticated();
break;
case auth_level::publik:
auth_result.pass();
break;
}

user = credential_t{
auth_result.get_username(),
auth_result.get_password(),
auth_result.get_sasl_mechanism()};
}

return user;
}

} // namespace pandaproxy
4 changes: 4 additions & 0 deletions src/v/pandaproxy/error.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ struct reply_error_category final : std::error_category {
return "subject_version_not_deleted";
case reply_error_code::compatibility_not_found:
return "compatibility_not_found";
case reply_error_code::mode_not_found:
return "mode_not_found";
case reply_error_code::serialization_error:
return "serialization_error";
case reply_error_code::consumer_already_exists:
Expand All @@ -140,6 +142,8 @@ struct reply_error_category final : std::error_category {
return "Invalid schema version";
case reply_error_code::compatibility_level_invalid:
return "Invalid compatibility level";
case reply_error_code::mode_invalid:
return "Invalid mode";
case reply_error_code::subject_version_operaton_not_permitted:
return "Overwrite new schema is not permitted.";
case reply_error_code::subject_version_has_references:
Expand Down
2 changes: 2 additions & 0 deletions src/v/pandaproxy/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@ enum class reply_error_code : uint16_t {
subject_version_soft_deleted = 40406,
subject_version_not_deleted = 40407,
compatibility_not_found = 40408,
mode_not_found = 40409,
serialization_error = 40801,
consumer_already_exists = 40902,
schema_empty = 42201,
schema_version_invalid = 42202,
compatibility_level_invalid = 42203,
mode_invalid = 42204,
subject_version_operaton_not_permitted = 42205,
subject_version_has_references = 42206,
subject_version_schema_id_already_exists = 42207,
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/schema_registry/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ api::~api() noexcept = default;

ss::future<> api::start() {
_store = std::make_unique<sharded_store>();
co_await _store->start(_sg);
co_await _store->start(is_mutable(_cfg.mode_mutability), _sg);
co_await _schema_id_validation_probe.start();
co_await _schema_id_validation_probe.invoke_on_all(
&schema_id_validation_probe::setup_metrics);
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ configuration::configuration()
{},
{},
config::endpoint_tls_config::validate_many)
, mode_mutability(*this, "mode_mutability", "Allow modifying mode", {}, false)
, schema_registry_replication_factor(
*this,
"schema_registry_replication_factor",
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct configuration final : public config::config_store {
schema_registry_api;
config::one_or_many_property<config::endpoint_tls_config>
schema_registry_api_tls;
config::property<bool> mode_mutability;
config::property<std::optional<int16_t>> schema_registry_replication_factor;
config::property<ss::sstring> api_doc_dir;
};
Expand Down
8 changes: 8 additions & 0 deletions src/v/pandaproxy/schema_registry/error.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ struct error_category final : std::error_category {
case error_code::compatibility_not_found:
return "Subject does not have subject-level compatibility "
"configured";
case error_code::mode_not_found:
return "Subject does not have subject-level mode configured";
case error_code::subject_version_operaton_not_permitted:
return "Overwrite new schema is not permitted.";
case error_code::subject_version_has_references:
Expand All @@ -69,6 +71,8 @@ struct error_category final : std::error_category {
return "Invalid compatibility level. Valid values are NONE, "
"BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, "
"FORWARD_TRANSITIVE, and FULL_TRANSITIVE";
case error_code::mode_invalid:
return "Invalid mode. Valid values are READWRITE, READONLY";
}
return "(unrecognized error)";
}
Expand All @@ -93,6 +97,8 @@ struct error_category final : std::error_category {
return reply_error_code::subject_version_not_deleted; // 40407
case error_code::compatibility_not_found:
return reply_error_code::compatibility_not_found; // 40408
case error_code::mode_not_found:
return reply_error_code::mode_not_found; // 40409
case error_code::subject_schema_invalid:
return reply_error_code::internal_server_error; // 500
case error_code::write_collision:
Expand All @@ -117,6 +123,8 @@ struct error_category final : std::error_category {
return reply_error_code::zookeeper_error; // 50001
case error_code::compatibility_level_invalid:
return reply_error_code::compatibility_level_invalid; // 42203
case error_code::mode_invalid:
return reply_error_code::mode_invalid; // 42204
}
return {};
}
Expand Down
Loading
Loading