Skip to content

Commit

Permalink
schema_registry: Require superuser to mutate /mode
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed May 13, 2024
1 parent 6f2f894 commit dc8c2e4
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 32 deletions.
43 changes: 43 additions & 0 deletions src/v/pandaproxy/auth_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,47 @@ inline credential_t maybe_authenticate_request(

return user;
}

enum class auth_level {
// Unauthenticated endpoint (not a typo, 'public' is a keyword)
publik = 0,
// Requires authentication (if enabled) but not superuser status
user = 1,
// Requires authentication (if enabled) and superuser status
superuser = 2
};

inline credential_t maybe_authorize_request(
config::rest_authn_method authn_method,
auth_level lvl,
request_authenticator& authenticator,
const ss::http::request& req) {
credential_t user;

if (authn_method != config::rest_authn_method::none) {
// Will throw 400 & 401 if auth fails
auto auth_result = authenticator.authenticate(req);
// Will throw 403 if user enabled HTTP Basic Auth but
// did not give the authorization header.
switch (lvl) {
case auth_level::superuser:
auth_result.require_superuser();
break;
case auth_level::user:
auth_result.require_authenticated();
break;
case auth_level::publik:
auth_result.pass();
break;
}

user = credential_t{
auth_result.get_username(),
auth_result.get_password(),
auth_result.get_sasl_mechanism()};
}

return user;
}

} // namespace pandaproxy
75 changes: 46 additions & 29 deletions src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ const security::acl_principal principal{

class wrap {
public:
wrap(ss::gate& g, one_shot& os, server::function_handler h)
wrap(ss::gate& g, one_shot& os, auth_level lvl, server::function_handler h)
: _g{g}
, _os{os}
, _auth_level(lvl)
, _h{std::move(h)} {}

ss::future<server::reply_t>
Expand All @@ -66,8 +67,11 @@ class wrap {
rq.service().config().schema_registry_api.value(),
rq.req->get_listener_idx());
try {
rq.user = maybe_authenticate_request(
rq.authn_method, rq.service().authenticator(), *rq.req);
rq.user = maybe_authorize_request(
rq.authn_method,
_auth_level,
rq.service().authenticator(),
*rq.req);
} catch (unauthorized_user_exception& e) {
audit_authn_failure(rq, e.get_username(), e.what());
throw;
Expand Down Expand Up @@ -190,6 +194,7 @@ class wrap {
private:
ss::gate& _g;
one_shot& _os;
auth_level _auth_level;
server::function_handler _h;
};

Expand All @@ -198,106 +203,118 @@ server::routes_t get_schema_registry_routes(ss::gate& gate, one_shot& es) {
routes.api = ss::httpd::schema_registry_json::name;

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_config, wrap(gate, es, get_config)});
ss::httpd::schema_registry_json::get_config,
wrap(gate, es, auth_level::user, get_config)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::put_config, wrap(gate, es, put_config)});
ss::httpd::schema_registry_json::put_config,
wrap(gate, es, auth_level::user, put_config)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_config_subject,
wrap(gate, es, get_config_subject)});
wrap(gate, es, auth_level::user, get_config_subject)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::put_config_subject,
wrap(gate, es, put_config_subject)});
wrap(gate, es, auth_level::user, put_config_subject)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::delete_config_subject,
wrap(gate, es, delete_config_subject)});
wrap(gate, es, auth_level::user, delete_config_subject)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_mode, wrap(gate, es, get_mode)});
ss::httpd::schema_registry_json::get_mode,
wrap(gate, es, auth_level::user, get_mode)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::put_mode, wrap(gate, es, put_mode)});
ss::httpd::schema_registry_json::put_mode,
wrap(gate, es, auth_level::superuser, put_mode)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_mode_subject,
wrap(gate, es, get_mode_subject)});
wrap(gate, es, auth_level::user, get_mode_subject)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::put_mode_subject,
wrap(gate, es, put_mode_subject)});
wrap(gate, es, auth_level::superuser, put_mode_subject)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::delete_mode_subject,
wrap(gate, es, delete_mode_subject)});
wrap(gate, es, auth_level::superuser, delete_mode_subject)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_schemas_types,
wrap(gate, es, get_schemas_types)});
wrap(gate, es, auth_level::publik, get_schemas_types)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_schemas_ids_id,
wrap(gate, es, get_schemas_ids_id)});
wrap(gate, es, auth_level::user, get_schemas_ids_id)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_schemas_ids_id_versions,
wrap(gate, es, get_schemas_ids_id_versions)});
wrap(gate, es, auth_level::user, get_schemas_ids_id_versions)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_schemas_ids_id_subjects,
wrap(gate, es, get_schemas_ids_id_subjects)});
wrap(gate, es, auth_level::user, get_schemas_ids_id_subjects)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_subjects,
wrap(gate, es, get_subjects)});
wrap(gate, es, auth_level::user, get_subjects)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_subject_versions,
wrap(gate, es, get_subject_versions)});
wrap(gate, es, auth_level::user, get_subject_versions)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::post_subject,
wrap(gate, es, post_subject)});
wrap(gate, es, auth_level::user, post_subject)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::post_subject_versions,
wrap(gate, es, post_subject_versions)});
wrap(gate, es, auth_level::user, post_subject_versions)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_subject_versions_version,
wrap(gate, es, get_subject_versions_version)});
wrap(gate, es, auth_level::user, get_subject_versions_version)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_subject_versions_version_schema,
wrap(gate, es, get_subject_versions_version_schema)});
wrap(gate, es, auth_level::user, get_subject_versions_version_schema)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::
get_subject_versions_version_referenced_by,
wrap(gate, es, get_subject_versions_version_referenced_by)});
wrap(
gate,
es,
auth_level::user,
get_subject_versions_version_referenced_by)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::
get_subject_versions_version_referenced_by_deprecated,
wrap(gate, es, get_subject_versions_version_referenced_by)});
wrap(
gate,
es,
auth_level::user,
get_subject_versions_version_referenced_by)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::delete_subject,
wrap(gate, es, delete_subject)});
wrap(gate, es, auth_level::user, delete_subject)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::delete_subject_version,
wrap(gate, es, delete_subject_version)});
wrap(gate, es, auth_level::user, delete_subject_version)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::compatibility_subject_version,
wrap(gate, es, compatibility_subject_version)});
wrap(gate, es, auth_level::user, compatibility_subject_version)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::schema_registry_status_ready,
wrap(gate, es, status_ready)});
wrap(gate, es, auth_level::publik, status_ready)});

return routes;
}
Expand Down
56 changes: 53 additions & 3 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1936,6 +1936,7 @@ def __init__(self, context):

schema_registry_config = SchemaRegistryConfig()
schema_registry_config.authn_method = 'http_basic'
schema_registry_config.mode_mutability = True

super(SchemaRegistryBasicAuthTest,
self).__init__(context,
Expand All @@ -1951,7 +1952,7 @@ def __init__(self, context):
self.public_auth = (public_user.username, public_user.password)

def _init_users(self):
admin = Admin(self.redpanda, auth=self.super_auth)
admin = Admin(self.redpanda)
admin.create_user(username=self.user.username,
password=self.user.password,
algorithm=self.user.mechanism)
Expand Down Expand Up @@ -2176,17 +2177,66 @@ def test_config(self):
@cluster(num_nodes=3)
def test_mode(self):
"""
Smoketest get_mode endpoint
Smoketest mode endpoints
"""
self._init_users()

self.logger.debug("Get initial global mode")
result_raw = self._get_mode(auth=self.public_auth)
assert result_raw.json()['error_code'] == 40101

self.logger.debug("Get initial global mode")
result_raw = self._get_mode(auth=self.user_auth)
assert result_raw.json()["mode"] == "READWRITE"

result_raw = self._get_mode(auth=self.super_auth)
assert result_raw.json()["mode"] == "READWRITE"

self.logger.debug("Set global mode")
result_raw = self._set_mode(data=json.dumps({"mode": "READONLY"}),
auth=self.public_auth)
assert result_raw.json()['error_code'] == 40101

result_raw = self._set_mode(data=json.dumps({"mode": "READONLY"}),
auth=self.user_auth)
assert result_raw.json()['error_code'] == 403

result_raw = self._set_mode(data=json.dumps({"mode": "READONLY"}),
auth=self.super_auth)
assert result_raw.json()["mode"] == "READONLY"

sub = "test-sub"
self.logger.debug("Set subject mode")
result_raw = self._set_mode_subject(subject=sub,
data=json.dumps(
{"mode": "READONLY"}),
auth=self.public_auth)
assert result_raw.json()['error_code'] == 40101

result_raw = self._set_mode_subject(subject=sub,
data=json.dumps(
{"mode": "READONLY"}),
auth=self.user_auth)
assert result_raw.json()['error_code'] == 403

result_raw = self._set_mode_subject(subject=sub,
data=json.dumps(
{"mode": "READONLY"}),
auth=self.super_auth)
assert result_raw.json()["mode"] == "READONLY"

self.logger.debug("Delete subject mode")
result_raw = self._delete_mode_subject(subject=sub,
auth=self.public_auth)
assert result_raw.json()['error_code'] == 40101

result_raw = self._delete_mode_subject(subject=sub,
auth=self.user_auth)
assert result_raw.json()['error_code'] == 403

result_raw = self._delete_mode_subject(subject=sub,
auth=self.super_auth)
assert result_raw.json()["mode"] == "READONLY"

@cluster(num_nodes=3)
def test_post_compatibility_subject_version(self):
"""
Expand Down

0 comments on commit dc8c2e4

Please sign in to comment.