Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport into 5.16] Implement log replication prefix filtering (#8087) #8103

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 29 additions & 19 deletions src/server/bg_services/replication_log_parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async function get_log_candidates(source_bucket_id, rule_id, replication_config,

async function get_aws_log_candidates(source_bucket_id, rule_id, replication_config, candidates_limit, sync_deletions) {
const aws_log_replication_info = replication_config.log_replication_info.aws_log_replication_info;
const obj_prefix_filter = _get_obj_prefix_filter_for_rule(rule_id, replication_config);
const { logs_bucket, prefix } = aws_log_replication_info.logs_location;
const s3 = _get_source_bucket_aws_connection(source_bucket_id, aws_log_replication_info);
let log_object_continuation_token = _get_log_object_continuation_token_for_rule(rule_id, replication_config);
Expand All @@ -60,7 +61,7 @@ async function get_aws_log_candidates(source_bucket_id, rule_id, replication_con
}

const next_log_data = await _aws_get_next_log(s3, logs_bucket, next_log_entry.Contents[0].Key);
aws_parse_log_object(logs, next_log_data, sync_deletions);
aws_parse_log_object(logs, next_log_data, sync_deletions, obj_prefix_filter);

dbg.log1("get_aws_log_candidates: parsed logs ", logs);

Expand All @@ -85,7 +86,7 @@ async function get_azure_log_candidates(source_bucket_id, rule_id, replication_c
const namespace_resource = source_bucket.namespace.write_resource.resource;
const src_storage_account = namespace_resource.connection.access_key;
const src_container_name = namespace_resource.connection.target_bucket;
const prefix = replication_config.log_replication_info.azure_log_replication_info.prefix || '';
const obj_prefix_filter = _get_obj_prefix_filter_for_rule(rule_id, replication_config) || '';
const { logs_query_client, monitor_workspace_id } = _get_source_bucket_azure_connection(source_bucket_id);
let candidates;

Expand Down Expand Up @@ -126,7 +127,7 @@ async function get_azure_log_candidates(source_bucket_id, rule_id, replication_c
| project Time=_TimeReceived, Action=substring(Category, 7), Key=ObjectKey
| sort by Time asc
| where Action == "Write" or Action == "Delete"
| where Key startswith "/${src_storage_account.unwrap()}/${src_container_name}/${prefix}"
| where Key startswith "/${src_storage_account.unwrap()}/${src_container_name}/${obj_prefix_filter}"
| where Key !contains "test-delete-non-existing-"
| parse Key with * "/" StorageAccount "/" Container "/" Key
| project Time, Action, Key`;
Expand Down Expand Up @@ -281,30 +282,33 @@ async function _aws_get_next_log(s3, bucket, key) {
* @param {nb.ReplicationLogs} logs - Log array
* @param {*} log_object - AWS log object
* @param {boolean} sync_deletions - Whether deletions should be synced or not
* @param {string} obj_prefix_filter - Object prefix filter
*/
function aws_parse_log_object(logs, log_object, sync_deletions) {
function aws_parse_log_object(logs, log_object, sync_deletions, obj_prefix_filter) {
const log_string = log_object.Body.toString();
const log_array = log_string.split("\n");

for (const line of log_array) {
if (line !== '') {
const log = _parse_aws_log_entry(line);
if (log.operation) {
if (log.operation.includes('PUT.OBJECT') || log.operation.includes('POST.OBJECT')) {
logs.push({
key: log.key,
action: 'copy',
time: log.time,
});
dbg.log2('aws_parse_log_object:: key', log.key, 'contain copy (PUT or POST) entry');
}
if (log.operation.includes('DELETE.OBJECT') && sync_deletions && log.http_status === 204) {
logs.push({
key: log.key,
action: 'delete',
time: log.time,
});
dbg.log2('aws_parse_log_object:: key', log.key, 'contain delete (DELETE) entry');
if (obj_prefix_filter === undefined || log.key?.startsWith(obj_prefix_filter)) {
if (log.operation.includes('PUT.OBJECT') || log.operation.includes('POST.OBJECT')) {
logs.push({
key: log.key,
action: 'copy',
time: log.time,
});
dbg.log2('aws_parse_log_object:: key', log.key, 'contain copy (PUT or POST) entry');
}
if (log.operation.includes('DELETE.OBJECT') && sync_deletions && log.http_status === 204) {
logs.push({
key: log.key,
action: 'delete',
time: log.time,
});
dbg.log2('aws_parse_log_object:: key', log.key, 'contain delete (DELETE) entry');
}
}
}
}
Expand Down Expand Up @@ -537,6 +541,12 @@ function _get_log_object_continuation_token_for_rule(rule_id, replication_config
return replication_rule?.rule_log_status?.log_marker?.continuation_token;
}

function _get_obj_prefix_filter_for_rule(rule_id, replication_config) {
dbg.log1('_get_obj_prefix_filter_for_rule: ', rule_id, 'replication_config: ', replication_config);
const replication_rule = replication_config.rules.find(rule => rule.rule_id === rule_id);
return replication_rule?.filter?.prefix;
}

// EXPORTS
exports.get_log_candidates = get_log_candidates;
exports.aws_parse_log_object = aws_parse_log_object;
Expand Down
4 changes: 2 additions & 2 deletions src/server/system_services/replication_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ class ReplicationStore {
dbg.log1('find_log_based_replication_rules: ');
const replications = await this._replicationconfigs.find({ deleted: null });
const reduced_replications = _.filter(
replications, repl =>
repl.log_replication_info?.azure_log_replication_info || repl.log_replication_info?.aws_log_replication_info
replications, repl => repl.log_replication_info?.endpoint_type ||
repl.log_replication_info?.aws_log_replication_info
);
// TODO: Further transformation of the data can be done here - refer to find_rules_updated_longest_time_ago
dbg.log1('find_log_based_replication_rules: ', reduced_replications);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ describe('AWS S3 server log parsing tests', () => {
aaa test.bucket [13/Feb/2023:15:08:28 +0000] 1.1.1.1 arn:aws:iam::111:user/user AAA BATCH.DELETE.OBJECT text.txt - 204 - - 1 - - - - - AAA SigV4 ECDHE-RSA-AES128-GCM-SHA256 AuthHeader s3.us-east-2.amazonaws.com TLSv1.2 - -
` };
const action_dictionary = { 'test': 'delete', 'test.js': 'delete', 'code2': 'copy', 'test2': 'delete', 'testfile.js': 'delete', 'empty': 'copy', 'text.txt': 'delete' };
log_parser.aws_parse_log_object(logs, example_log, true);
log_parser.aws_parse_log_object(logs, example_log, true, '');
// Make sure the test doesn't pass in case the parsing fails
expect(logs.length).toEqual(Object.keys(action_dictionary).length);
// Make sure all expected actions are mapped to the appropriate keys
Expand All @@ -39,7 +39,7 @@ describe('AWS S3 server log parsing tests', () => {
});
// Test with sync_deletions set to false
logs.length = 0;
log_parser.aws_parse_log_object(logs, example_log, false);
log_parser.aws_parse_log_object(logs, example_log, false, '');
// Delete all action_dictionary keys whose value is delete
Object.keys(action_dictionary).forEach(key => {
if (action_dictionary[key] === 'delete') {
Expand All @@ -56,7 +56,7 @@ describe('AWS S3 server log parsing tests', () => {
aaa test.bucket [13/Feb/2023:09:08:28 +0000] 1.1.1.1 arn:aws:iam::111:user/user AAA BATCH.DELETE.OBJECT other_obj - 204 - - 1 - - - - - AAA SigV4 ECDHE-RSA-AES128-GCM-SHA256 AuthHeader s3.us-east-2.amazonaws.com TLSv1.2 - -
aaa test.bucket [13/Feb/2023:09:08:56 +0000] 0.0.0.0 arn:aws:iam::111:user/user AAA REST.PUT.OBJECT test "PUT /test.bucket/test?X-Amz-Security-Token=AAAAAAAAAAAAAAA=20230213T160856Z&X-Amz-AAAAAA HTTP/1.1" 200 - - 1 1 1 "https://s3.console.aws.amazon.com/s3/upload/test.bucket?region=us-east-2" "AAA/5.0 (AAA 1.1; AAA; AAA) AAA/1.1 (KHTML, like Gecko) AAA/1.1 AAA/1.1" - AAAA SigV4 ECDHE-RSA-AES128-GCM-SHA256 QueryString s3.us-east-2.amazonaws.com TLSv1.2 - -
` };
log_parser.aws_parse_log_object(logs, example_log, true);
log_parser.aws_parse_log_object(logs, example_log, true, '');
const candidates = log_parser.create_candidates(logs);
// DELETE log should be the latest log present inside the candidate, as candidate storing only latest log per key
expect(candidates.test.action).toEqual('delete');
Expand Down