Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Ranger): Pull policies from the Ranger Service and update using resources policies #1388

Merged
merged 11 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_START_MANUAL_COMPACT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_GET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_SET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_CM_GET_RANGER_POLICY, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_USE_RANGER_ACCESS_CONTROL, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL

#define CURRENT_THREAD_POOL THREAD_POOL_META_STATE
Expand Down
73 changes: 48 additions & 25 deletions src/runtime/ranger/ranger_resource_policy_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ DSN_DEFINE_uint32(security,
"access control policy from "
"Ranger service.");
DSN_DEFINE_string(ranger, ranger_service_url, "", "Apache Ranger service url.");
DSN_DEFINE_string(ranger, ranger_service_name, "", "use policy name.");
DSN_DEFINE_string(ranger,
ranger_service_name,
"",
"The name of the policies defined in the Ranger service.");

#define RETURN_ERR_IF_MISSING_MEMBER(obj, member) \
do { \
Expand Down Expand Up @@ -122,7 +125,7 @@ const std::map<std::string, access_type> kAccessTypeMaping({{"READ", access_type
{"CONTROL", access_type::kControl}});
} // anonymous namespace

std::chrono::milliseconds load_ranger_policy_retry_delay_ms(10000);
const std::chrono::milliseconds kLoadRangerPolicyRetryDelayMs(10000);

ranger_resource_policy_manager::ranger_resource_policy_manager(
dsn::replication::meta_service *meta_svc)
Expand Down Expand Up @@ -226,9 +229,12 @@ dsn::error_code ranger_resource_policy_manager::update_policies_from_ranger_serv
// for the newly created table, its app envs must be empty. This needs to be executed
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
// periodically to update the table's app envs, regardless of whether the Ranger policy is
// updated or not.
CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies to app envs failed.");
LOG_DEBUG("Sync policies to app envs succeeded.");
return dsn::ERR_OK;
err_code = sync_policies_to_app_envs();
if (err_code == dsn::ERR_OK) {
LOG_DEBUG("Sync policies to app envs succeeded.");
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
return dsn::ERR_OK;
}
ERR_LOG_AND_RETURN_NOT_OK(err_code, "Sync policies to app envs failed.");
}
ERR_LOG_AND_RETURN_NOT_OK(err_code, "Parse Ranger policies failed.");

Expand Down Expand Up @@ -431,21 +437,23 @@ void ranger_resource_policy_manager::start_to_dump_and_sync_policies()
{
LOG_DEBUG("Start to create Ranger policy meta root on remote storage.");
dsn::task_ptr sync_task = dsn::tasking::create_task(
LPC_CM_GET_RANGER_POLICY, &_tracker, [this]() { dump_and_sync_policies(); });
LPC_USE_RANGER_ACCESS_CONTROL, &_tracker, [this]() { dump_and_sync_policies(); });
_meta_svc->get_remote_storage()->create_node(
_ranger_policy_meta_root, LPC_CM_GET_RANGER_POLICY, [this, sync_task](dsn::error_code err) {
_ranger_policy_meta_root,
LPC_USE_RANGER_ACCESS_CONTROL,
[this, sync_task](dsn::error_code err) {
if (err == dsn::ERR_OK || err == dsn::ERR_NODE_ALREADY_EXIST) {
LOG_DEBUG("Create Ranger policy meta root succeed.");
sync_task->enqueue();
return;
}
CHECK_EQ(err, dsn::ERR_TIMEOUT);
LOG_ERROR("Create Ranger policy meta root timeout, try it later.");
dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY,
LOG_ERROR("Create Ranger policy meta root timeout, retry later.");
dsn::tasking::enqueue(LPC_USE_RANGER_ACCESS_CONTROL,
&_tracker,
[this]() { start_to_dump_and_sync_policies(); },
0,
load_ranger_policy_retry_delay_ms);
kLoadRangerPolicyRetryDelayMs);
});
}

Expand All @@ -459,26 +467,32 @@ void ranger_resource_policy_manager::dump_and_sync_policies()
update_cached_policies();
LOG_DEBUG("Update using resources policies succeed.");

CHECK_EQ_MSG(dsn::ERR_OK, sync_policies_to_app_envs(), "Sync policies to app envs failed.");
LOG_DEBUG("Sync policies to app envs succeeded.");
if (dsn::ERR_OK == sync_policies_to_app_envs()) {
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
LOG_ERROR("Sync policies to app envs failed.");
} else {
LOG_DEBUG("Sync policies to app envs succeeded.");
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
}
}

void ranger_resource_policy_manager::dump_policies_to_remote_storage()
{
dsn::blob value = json::json_forwarder<all_resource_policies>::encode(_all_resource_policies);
_meta_svc->get_remote_storage()->set_data(
_ranger_policy_meta_root, value, LPC_CM_GET_RANGER_POLICY, [this](dsn::error_code e) {
_ranger_policy_meta_root, value, LPC_USE_RANGER_ACCESS_CONTROL, [this](dsn::error_code e) {
if (e == dsn::ERR_OK) {
LOG_DEBUG("Dump Ranger policies to remote storage succeed.");
return;
}
CHECK_EQ_MSG(e, dsn::ERR_TIMEOUT, "Dump Ranger policies to remote storage failed.");
LOG_ERROR("Dump Ranger policies to remote storage timeout, retry later.");
dsn::tasking::enqueue(LPC_CM_GET_RANGER_POLICY,
&_tracker,
[this]() { dump_policies_to_remote_storage(); },
0,
load_ranger_policy_retry_delay_ms);
if (e == dsn::ERR_TIMEOUT) {
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
LOG_ERROR("Dump Ranger policies to remote storage timeout, retry later.");
dsn::tasking::enqueue(LPC_USE_RANGER_ACCESS_CONTROL,
&_tracker,
[this]() { dump_policies_to_remote_storage(); },
0,
kLoadRangerPolicyRetryDelayMs);
return;
}
LOG_ERROR("Dump Ranger policies to remote storage failed.");
});
}

Expand Down Expand Up @@ -525,7 +539,7 @@ dsn::error_code ranger_resource_policy_manager::sync_policies_to_app_envs()
req->__set_app_name(app.app_name);
req->__set_keys(
{dsn::replication::replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES});
bool has_match_policy = false;
bool is_policy_matched = false;
for (const auto &policy : table_policies->second) {
if (policy.database_names.count(database_name) == 0) {
continue;
Expand All @@ -534,23 +548,24 @@ dsn::error_code ranger_resource_policy_manager::sync_policies_to_app_envs()
// if table name does not conform to the naming rules(database_name.table_name),
// database is defined by "*" in ranger for acl matching
if (policy.table_names.count("*") != 0 || policy.table_names.count(table_name) != 0) {
has_match_policy = true;
is_policy_matched = true;
req->__set_op(dsn::replication::app_env_operation::type::APP_ENV_OP_SET);
req->__set_values(
{json::json_forwarder<acl_policies>::encode(policy.policies).to_string()});

dsn::replication::update_app_env_rpc rpc(std::move(req), LPC_CM_GET_RANGER_POLICY);
dsn::replication::update_app_env_rpc rpc(std::move(req),
LPC_USE_RANGER_ACCESS_CONTROL);
_meta_svc->get_server_state()->set_app_envs(rpc);
ERR_LOG_AND_RETURN_NOT_OK(rpc.response().err, "set_app_envs failed.");
break;
}
}

// There is no matched policy, clear app Ranger policy
if (!has_match_policy) {
if (!is_policy_matched) {
req->__set_op(dsn::replication::app_env_operation::type::APP_ENV_OP_DEL);

dsn::replication::update_app_env_rpc rpc(std::move(req), LPC_CM_GET_RANGER_POLICY);
dsn::replication::update_app_env_rpc rpc(std::move(req), LPC_USE_RANGER_ACCESS_CONTROL);
_meta_svc->get_server_state()->del_app_envs(rpc);
ERR_LOG_AND_RETURN_NOT_OK(rpc.response().err, "del_app_envs failed.");
}
Expand All @@ -569,5 +584,13 @@ std::string get_database_name_from_app_name(const std::string &app_name)
return prefix;
}

std::string get_table_name_from_app_name(const std::string &app_name)
{
std::string database_name = get_database_name_from_app_name(app_name);
if (database_name.empty()) {
return app_name;
}
return app_name.substr(database_name.size() + 1);
}
} // namespace ranger
} // namespace dsn
4 changes: 4 additions & 0 deletions src/runtime/ranger/ranger_resource_policy_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,9 @@ class ranger_resource_policy_manager
// "{database_name}.{table_name}", use "." to split database name and table name.
// Return an empty string if 'app_name' is not a valid Ranger rule table name.
std::string get_database_name_from_app_name(const std::string &app_name);
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved

// Try to get the table_name of 'app_name'.
// Return 'app_name' if 'app_name' is not a valid Ranger rule table name.
std::string get_table_name_from_app_name(const std::string &app_name);
} // namespace ranger
} // namespace dsn
41 changes: 41 additions & 0 deletions src/runtime/test/ranger_resource_policy_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,5 +215,46 @@ TEST(ranger_resource_policy_manager_test, ranger_resource_policy_serialized_test
EXPECT_EQ(test.expected_result, actual_result);
}
}

TEST(ranger_resource_policy_manager_test, get_database_name_from_app_name_test)
{
struct test_case
{
std::string app_name;
std::string expected_result;
} tests[] = {{"", ""},
{".", ""},
{"...", ""},
{"database_name.", "database_name"},
{".table_name", ""},
{"app_name", ""},
{"database_name.table_name", "database_name"},
{"a.b.c", "a"}};
for (const auto &test : tests) {
auto actual_result = get_database_name_from_app_name(test.app_name);
EXPECT_EQ(test.expected_result, actual_result);
}
}

TEST(ranger_resource_policy_manager_test, get_table_name_from_app_name_test)
{
struct test_case
{
std::string app_name;
std::string expected_result;
} tests[] = {{"", ""},
{".", "."},
{"...", "..."},
{"database_name.", ""},
{".table_name", ".table_name"},
{"app_name", "app_name"},
{"database_name.table_name", "table_name"},
{"a.b.c", "b.c"}};
for (const auto &test : tests) {
auto actual_result = get_table_name_from_app_name(test.app_name);
EXPECT_EQ(test.expected_result, actual_result);
}
}

} // namespace ranger
} // namespace dsn