Skip to content

Commit

Permalink
Merge pull request #15228 from michael-redpanda/issues/15226
Browse files Browse the repository at this point in the history
audit: Disable auditing in recovery mode
  • Loading branch information
piyushredpanda authored Nov 30, 2023
2 parents c680258 + dea71ed commit 6f49d28
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 18 deletions.
38 changes: 30 additions & 8 deletions src/v/security/audit/audit_log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "kafka/protocol/produce.h"
#include "kafka/protocol/schemata/produce_response.h"
#include "kafka/server/handlers/topics/types.h"
#include "model/namespace.h"
#include "security/acl.h"
#include "security/audit/client_probe.h"
#include "security/audit/logger.h"
Expand Down Expand Up @@ -662,6 +663,10 @@ void audit_log_manager::set_enabled_events() {
"Unknown event_type observed");
}

bool audit_log_manager::recovery_mode_enabled() noexcept {
return config::node().recovery_mode_enabled.value();
}

audit_log_manager::audit_log_manager(
cluster::controller* controller, kafka::client::configuration& client_config)
: _audit_enabled(config::shard_local_cfg().audit_enabled.bind())
Expand Down Expand Up @@ -741,6 +746,12 @@ bool audit_log_manager::is_audit_event_enabled(event_type event_type) const {
}

ss::future<> audit_log_manager::start() {
if (recovery_mode_enabled()) {
vlog(
adtlog.warn,
"Redpanda is operating in recovery mode. Auditing is disabled!");
co_return;
}
_probe = std::make_unique<audit_probe>();
_probe->setup_metrics([this] {
return static_cast<double>(pending_events())
Expand Down Expand Up @@ -896,7 +907,7 @@ ss::future<> audit_log_manager::drain() {

std::optional<audit_log_manager::audit_event_passthrough>
audit_log_manager::should_enqueue_audit_event() const {
if (!_audit_enabled()) {
if (recovery_mode_enabled() || !_audit_enabled()) {
return std::make_optional(audit_event_passthrough::yes);
}
if (_as.abort_requested()) {
Expand Down Expand Up @@ -926,27 +937,35 @@ audit_log_manager::should_enqueue_audit_event() const {
}

std::optional<audit_log_manager::audit_event_passthrough>
audit_log_manager::should_enqueue_audit_event(event_type type) const {
if (!is_audit_event_enabled(type)) {
audit_log_manager::should_enqueue_audit_event(
event_type type, ignore_enabled_events ignore_events) const {
if (
ignore_events == ignore_enabled_events::no
&& !is_audit_event_enabled(type)) {
return std::make_optional(audit_event_passthrough::yes);
}
return should_enqueue_audit_event();
}

std::optional<audit_log_manager::audit_event_passthrough>
audit_log_manager::should_enqueue_audit_event(
event_type type, const security::acl_principal& principal) const {
event_type type,
const security::acl_principal& principal,
ignore_enabled_events ignore_events) const {
if (_audit_excluded_principals.contains(principal)) {
return std::make_optional(audit_event_passthrough::yes);
}

return should_enqueue_audit_event(type);
return should_enqueue_audit_event(type, ignore_events);
}

std::optional<audit_log_manager::audit_event_passthrough>
audit_log_manager::should_enqueue_audit_event(
kafka::api_key key, const security::acl_principal& principal) const {
return should_enqueue_audit_event(kafka_api_to_event_type(key), principal);
kafka::api_key key,
const security::acl_principal& principal,
ignore_enabled_events ignore_events) const {
return should_enqueue_audit_event(
kafka_api_to_event_type(key), principal, ignore_events);
}

std::optional<audit_log_manager::audit_event_passthrough>
Expand All @@ -968,11 +987,14 @@ audit_log_manager::should_enqueue_audit_event(
kafka::api_key key,
const security::acl_principal& principal,
const model::topic& t) const {
auto ignore_events = ignore_enabled_events::no;
if (_audit_excluded_topics.contains(t)) {
return std::make_optional(audit_event_passthrough::yes);
} else if (t == model::kafka_audit_logging_topic) {
ignore_events = ignore_enabled_events::yes;
}

return should_enqueue_audit_event(key, principal);
return should_enqueue_audit_event(key, principal, ignore_events);
}

} // namespace security::audit
23 changes: 15 additions & 8 deletions src/v/security/audit/audit_log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ class audit_log_manager
if constexpr (std::is_same_v<T, model::topic>) {
if (auto val = should_enqueue_audit_event(
api, result.principal, resource_name);
val.has_value()
&& resource_name != model::kafka_audit_logging_topic) {
val.has_value()) {
return (bool)*val;
}
} else {
Expand Down Expand Up @@ -133,8 +132,7 @@ class audit_log_manager
if constexpr (std::is_same_v<T, model::topic>) {
if (auto val = should_enqueue_audit_event(
api, result.principal, resource_name);
val.has_value()
&& resource_name != model::kafka_audit_logging_topic) {
val.has_value()) {
return (bool)*val;
}
} else {
Expand Down Expand Up @@ -220,17 +218,24 @@ class audit_log_manager
bool report_redpanda_app_event(is_started);

private:
using ignore_enabled_events
= ss::bool_class<struct ignore_enabled_events_tag>;
/// The following methods return nullopt in the case the event should
/// be audited, otherwise the optional is filled with the value representing
/// whether it could not be enqueued due to error or due to the event
/// not having attributes of desired trackable events
std::optional<audit_event_passthrough> should_enqueue_audit_event() const;
std::optional<audit_event_passthrough>
should_enqueue_audit_event(event_type) const;
std::optional<audit_event_passthrough> should_enqueue_audit_event(
event_type, const security::acl_principal&) const;
event_type,
ignore_enabled_events ignore_events = ignore_enabled_events::no) const;
std::optional<audit_event_passthrough> should_enqueue_audit_event(
event_type,
const security::acl_principal&,
ignore_enabled_events ignore_events = ignore_enabled_events::no) const;
std::optional<audit_event_passthrough> should_enqueue_audit_event(
kafka::api_key, const security::acl_principal&) const;
kafka::api_key,
const security::acl_principal&,
ignore_enabled_events ignore_events = ignore_enabled_events::no) const;
std::optional<audit_event_passthrough>
should_enqueue_audit_event(event_type, const security::audit::user&) const;
std::optional<audit_event_passthrough>
Expand Down Expand Up @@ -265,6 +270,8 @@ class audit_log_manager
return result;
}

static bool recovery_mode_enabled() noexcept;

private:
class audit_msg {
public:
Expand Down
59 changes: 57 additions & 2 deletions tests/rptest/tests/audit_log_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,13 @@ def modify_audit_excluded_principals(self, principals: [str]):
"""
self._modify_cluster_config({'audit_excluded_principals': principals})

def change_max_buffer_size_per_shard(self, new_size: int):
"""
Modifies the audit_queue_max_buffer_size_per_shard configuration
"""
self._modify_cluster_config(
{'audit_queue_max_buffer_size_per_shard': new_size})

def modify_node_config(self, node, update_fn, skip_readiness_check=True):
"""Modifies the current node configuration, restarts the node for
changes to take effect
Expand Down Expand Up @@ -555,7 +562,7 @@ def ingest(self, records):
)
return
self.next_offset_ingest = len(records)
new_records = [json.loads(msg['value']) for msg in records]
new_records = [json.loads(msg['value']) for msg in new_records]
self.logger.info(f"Ingested: {len(new_records)} records")
self.logger.debug(f'Ingested records:')
for rec in new_records:
Expand Down Expand Up @@ -670,6 +677,54 @@ def test_drain_on_audit_disabled(self):
lambda record_count: record_count == 3,
"One stop event observed for shutdown node")

@cluster(num_nodes=5)
def test_recovery_mode(self):
"""
Tests that audit logging does not start when in recovery mode
"""

# Expect to find the audit system to come up
_ = self.find_matching_record(
partial(AuditLogTestsAppLifecycle.is_lifecycle_match,
"Audit System", True),
lambda record_count: record_count == 3,
"Single redpanda audit start event per node")
# Change goes into effect next restart
self.change_max_buffer_size_per_shard(1)
self.modify_audit_event_types(['admin', 'authenticate'])

# Restart and ensure we see the error message
self.redpanda.restart_nodes(
self.redpanda.nodes,
override_cfg_params={"recovery_mode_enabled": True})
wait_until(lambda: self.redpanda.search_log_any(
'Redpanda is operating in recovery mode. Auditing is disabled!'),
timeout_sec=30,
backoff_sec=2,
err_msg="Did not find expected log statement")

# Execute a few Admin API calls that would be normally audited
# If everything is working, these should return true with
# no issue
for _ in range(0, 10):
_ = self.admin.get_features()

# Change goes into effect next restart
self.change_max_buffer_size_per_shard(1024 * 1024)
self.modify_audit_event_types([])
self.redpanda.restart_nodes(
self.redpanda.nodes,
override_cfg_params={"recovery_mode_enabled": False})
# Now we should see it 6 times, 3 times for initial boot, and 3 more times for this latest
# boot. Seeing >6 would mean auditing somehow worked while in recovery mode
records = self.find_matching_record(
partial(AuditLogTestsAppLifecycle.is_lifecycle_match,
"Audit System", True),
lambda record_count: record_count >= 6,
"Single redpanda audit start event per node")
assert len(
records) == 6, f'Expected 6 start up records, found {len(records)}'


class AuditLogTestAdminApi(AuditLogTestBase):
"""Validates that audit logs are generated from admin API
Expand Down Expand Up @@ -1857,7 +1912,7 @@ def match_authn_user(user, svc_name, result, record):
_ = self.find_matching_record(
lambda record: match_authn_user(self.username, self.
sr_audit_svc_name, 1, record),
lambda record_count: record_count > 1, 'authn attempt in sr')
lambda record_count: record_count == 1, 'authn attempt in sr')

@cluster(num_nodes=5)
def test_sr_audit_bad_authn(self):
Expand Down

0 comments on commit 6f49d28

Please sign in to comment.