Skip to content

Commit

Permalink
schema_registry: Support mode - readonly
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed May 13, 2024
1 parent dc8c2e4 commit dd6c831
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/v/pandaproxy/schema_registry/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,11 @@ inline error_info mode_not_readwrite(const subject& sub) {
fmt::format("Subject {} is not in read-write mode", sub())};
}

inline error_info mode_is_readonly(const std::optional<subject>& sub) {
return error_info{
error_code::subject_version_operaton_not_permitted,
fmt::format(
"Subject {} is in read-only mode", sub.value_or(subject{"null"}))};
}

} // namespace pandaproxy::schema_registry
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ delete_config_subject(server::request_t rq, server::reply_t rp) {

// ensure we see latest writes
co_await rq.service().writer().read_sync();
co_await rq.service().writer().check_mutable(sub);

compatibility_level lvl{};
try {
Expand Down
22 changes: 22 additions & 0 deletions src/v/pandaproxy/schema_registry/seq_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ ss::future<> seq_writer::read_sync() {
co_await wait_for(max_offset - model::offset{1});
}

ss::future<> seq_writer::check_mutable(std::optional<subject> const& sub) {
auto mode = sub ? co_await _store.get_mode(*sub, default_to_global::yes)
: co_await _store.get_mode();
if (mode == mode::read_only) {
throw as_exception(mode_is_readonly(sub));
}
co_return;
}

ss::future<> seq_writer::wait_for(model::offset offset) {
return container().invoke_on(0, _smp_opts, [offset](seq_writer& seq) {
if (auto waiters = seq._wait_for_sem.waiters(); waiters != 0) {
Expand Down Expand Up @@ -203,6 +212,8 @@ void seq_writer::advance_offset_inner(model::offset offset) {

ss::future<std::optional<schema_id>> seq_writer::do_write_subject_version(
subject_schema schema, model::offset write_at) {
co_await check_mutable(schema.schema.sub());

// Check if store already contains this data: if
// so, we do no I/O and return the schema ID.
auto projected = co_await _store.project_ids(schema).handle_exception(
Expand Down Expand Up @@ -267,6 +278,8 @@ ss::future<std::optional<bool>> seq_writer::do_write_config(
to_string_view(compat),
write_at);

co_await check_mutable(sub);

try {
// Check for no-op case
compatibility_level existing;
Expand Down Expand Up @@ -307,6 +320,8 @@ ss::future<bool> seq_writer::write_config(
ss::future<std::optional<bool>> seq_writer::do_delete_config(subject sub) {
vlog(plog.debug, "delete config sub={}", sub);

co_await check_mutable(sub);

try {
co_await _store.get_compatibility(sub, default_to_global::no);
} catch (const exception&) {
Expand Down Expand Up @@ -407,6 +422,8 @@ ss::future<bool> seq_writer::delete_mode(subject sub) {
/// Impermanent delete: update a version with is_deleted=true
ss::future<std::optional<bool>> seq_writer::do_delete_subject_version(
subject sub, schema_version version, model::offset write_at) {
co_await check_mutable(sub);

if (co_await _store.is_referenced(sub, version)) {
throw as_exception(has_references(sub, version));
}
Expand Down Expand Up @@ -445,6 +462,8 @@ seq_writer::delete_subject_version(subject sub, schema_version version) {

ss::future<std::optional<std::vector<schema_version>>>
seq_writer::do_delete_subject_impermanent(subject sub, model::offset write_at) {
co_await check_mutable(sub);

// Grab the versions before they're gone.
auto versions = co_await _store.get_versions(sub, include_deleted::no);

Expand Down Expand Up @@ -523,6 +542,9 @@ seq_writer::delete_subject_permanent_inner(
/// Check for whether our victim is already soft-deleted happens
/// within these store functions (will throw a 404-equivalent if so)
vlog(plog.debug, "delete_subject_permanent sub={}", sub);

co_await check_mutable(sub);

if (version.has_value()) {
// Check version first to see if the version exists
sequences = co_await _store.get_subject_version_written_at(
Expand Down
3 changes: 3 additions & 0 deletions src/v/pandaproxy/schema_registry/seq_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {

ss::future<> read_sync();

// Throws 42205 if the subject cannot be modified
ss::future<> check_mutable(std::optional<subject> const& sub);

// API for readers: notify us when they have read and applied an offset
ss::future<> advance_offset(model::offset offset);

Expand Down
145 changes: 145 additions & 0 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,151 @@ def test_mode(self):
assert result_raw.json(
)["message"] == "Invalid mode. Valid values are READWRITE, READONLY"

@cluster(num_nodes=3)
def test_mode_readonly(self):
"""
Test endpoints when in READONLY
"""
ro_subject = f"ro-{create_topic_names(1)[0]}-key"
rw_subject = f"rw-{create_topic_names(1)[0]}-key"

schema1 = json.dumps({"schema": schema1_def})
schema2 = json.dumps({"schema": schema2_def})

self.logger.info("Posting schema 1 as ro_subject key")
result_raw = self._post_subjects_subject_versions(
subject=ro_subject, data=json.dumps({"schema": schema1_def}))
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Set global mode to readonly")
result_raw = self._set_mode(data=json.dumps({"mode": "READONLY"}))
assert result_raw.status_code == 200
assert result_raw.json()["mode"] == "READONLY"

self.logger.debug("Override mode for rw_subject")
result_raw = self._set_mode_subject(subject=rw_subject,
data=json.dumps(
{"mode": "READWRITE"}))
assert result_raw.status_code == 200
assert result_raw.json()["mode"] == "READWRITE"

self.logger.info("Posting schema 1 as rw_subject key")
result_raw = self._post_subjects_subject_versions(subject=rw_subject,
data=schema1)
assert result_raw.status_code == requests.codes.ok

# mode
result_raw = self._get_mode()
assert result_raw.status_code == 200

for sub in [ro_subject, rw_subject]:
result_raw = self._get_mode_subject(subject=sub, fallback=True)
assert result_raw.status_code == 200

# config
result_raw = self._get_config()
assert result_raw.status_code == 200

for sub in [ro_subject, rw_subject]:
result_raw = self._get_config_subject(subject=sub, fallback=True)
assert result_raw.status_code == 200

# This is the default, check that setting it to the default/existing is failure, not quiet success
compat_back = json.dumps({"compatibility": "BACKWARD"})
result_raw = self._set_config(data=compat_back)
assert result_raw.status_code == 422
assert result_raw.json()["error_code"] == 42205
assert result_raw.json(
)["message"] == "Subject null is in read-only mode"

result_raw = self._set_config_subject(subject=ro_subject,
data=compat_back)
assert result_raw.status_code == 422
assert result_raw.json()["error_code"] == 42205
assert result_raw.json(
)["message"] == f"Subject {ro_subject} is in read-only mode"

result_raw = self._set_config_subject(subject=rw_subject,
data=compat_back)
assert result_raw.status_code == 200

# The config doesn't exist, but the mode is checked first
result_raw = self._delete_config_subject(subject=ro_subject)
assert result_raw.status_code == 422
assert result_raw.json()["error_code"] == 42205
assert result_raw.json(
)["message"] == f"Subject {ro_subject} is in read-only mode"

result_raw = self._delete_config_subject(subject=rw_subject)
assert result_raw.status_code == 200

# subjects
result_raw = self._get_subjects()
assert result_raw.status_code == 200

for sub in [ro_subject, rw_subject]:
result_raw = self._get_subjects_subject_versions(subject=sub)
assert result_raw.status_code == 200

result_raw = self._get_subjects_subject_versions_version(
subject=sub, version=1)
assert result_raw.status_code == 200

result_raw = self._get_subjects_subject_versions_version_referenced_by(
subject=sub, version=1)
assert result_raw.status_code == 200

self.logger.info("Checking for schema 1 as subject key")
result_raw = self._post_subjects_subject(subject=sub, data=schema1)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["id"] == 1
assert result_raw.json()["version"] == 1

self.logger.info("Checking for schema 1 as subject key")
result_raw = self._post_subjects_subject_versions(subject=sub,
data=schema1)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["id"] == 1

self.logger.info("Checking schema 2 as subject key")
result_raw = self._post_subjects_subject(subject=sub, data=schema2)
assert result_raw.status_code == 404
assert result_raw.json()["error_code"] == 40403
assert result_raw.json()["message"] == f"Schema not found"

self.logger.info("Posting schema 2 as ro_subject key")
result_raw = self._post_subjects_subject_versions(subject=ro_subject,
data=schema2)
assert result_raw.status_code == 422
assert result_raw.json()["error_code"] == 42205
assert result_raw.json(
)["message"] == f"Subject {ro_subject} is in read-only mode"

self.logger.info("Posting schema 2 as rw_subject key")
result_raw = self._post_subjects_subject_versions(subject=rw_subject,
data=schema2)
assert result_raw.status_code == 200

# compatibility
for sub in [ro_subject, rw_subject]:
result_raw = self._post_compatibility_subject_version(subject=sub,
version=1,
data=schema2)
assert result_raw.status_code == 200

# schemas
result_raw = self._get_schemas_types()
assert result_raw.status_code == 200

result_raw = self._get_schemas_ids_id(id=1)
assert result_raw.status_code == 200

result_raw = self._get_schemas_ids_id_subjects(id=1)
assert result_raw.status_code == 200

result_raw = self._get_schemas_ids_id_versions(id=1)
assert result_raw.status_code == 200


class SchemaRegistryBasicAuthTest(SchemaRegistryEndpoints):
"""
Expand Down

0 comments on commit dd6c831

Please sign in to comment.