From 30bd125566335fb80d8ddb569957a989c35d6347 Mon Sep 17 00:00:00 2001 From: jackyalbo Date: Sun, 9 Jun 2024 18:13:29 +0300 Subject: [PATCH] PR3 - Persistent logging Signed-off-by: jackyalbo --- config.js | 4 + src/api/bucket_api.js | 15 +-- src/cmd/manage_nsfs.js | 7 ++ src/cmd/nsfs.js | 14 +-- src/endpoint/endpoint.js | 18 +++- src/endpoint/s3/ops/s3_get_bucket_logging.js | 4 +- src/endpoint/s3/s3_bucket_logging.js | 26 +++-- src/endpoint/s3/s3_rest.js | 16 ++- src/manage_nsfs/manage_nsfs_cli_errors.js | 13 ++- src/manage_nsfs/manage_nsfs_cli_responses.js | 9 ++ src/manage_nsfs/manage_nsfs_constants.js | 3 +- src/manage_nsfs/manage_nsfs_events_utils.js | 22 ++++ src/manage_nsfs/manage_nsfs_help_utils.js | 10 +- src/manage_nsfs/manage_nsfs_logging.js | 67 ++++++++++++ src/manage_nsfs/nsfs_schema_utils.js | 17 +++ src/sdk/bucketspace_nb.js | 4 +- src/sdk/object_sdk.js | 6 +- src/server/bg_services/bucket_logs_upload.js | 26 ++++- src/server/system_services/bucket_server.js | 14 +-- .../system_services/schemas/log_schema.js | 38 +++++++ src/test/unit_tests/coretest.js | 2 +- src/test/unit_tests/test_bucketspace_fs.js | 2 - src/test/unit_tests/test_np_bucket_logging.js | 19 +++- src/test/unit_tests/test_s3_ops.js | 41 +++++++ src/util/bucket_logs_utils.js | 101 ++++++++++++++++++ src/util/persistent_logger.js | 10 +- src/util/time_utils.js | 15 +++ 27 files changed, 471 insertions(+), 52 deletions(-) create mode 100644 src/manage_nsfs/manage_nsfs_logging.js create mode 100644 src/server/system_services/schemas/log_schema.js create mode 100644 src/util/bucket_logs_utils.js diff --git a/config.js b/config.js index dde39713d3..5ac118c997 100644 --- a/config.js +++ b/config.js @@ -659,6 +659,10 @@ config.BUCKET_DIFF_FOR_REPLICATION = true; config.BUCKET_LOG_UPLOAD_ENABLED = true; config.BUCKET_LOG_UPLOADER_DELAY = 5 * 60 * 1000; +config.BUCKET_LOG_TYPE = process.env.GUARANTEED_LOGS_PATH ? 'PERSISTENT' : 'BEST_EFFORT'; +config.PERSISTENT_BUCKET_LOG_DIR = process.env.GUARANTEED_LOGS_PATH; +config.PERSISTENT_BUCKET_LOG_NS = 'bucket_logging'; +config.BUCKET_LOG_CONCURRENCY = 10; /////////////////////////// // KEY ROTATOR // diff --git a/src/api/bucket_api.js b/src/api/bucket_api.js index 81d33a542d..989df3231a 100644 --- a/src/api/bucket_api.js +++ b/src/api/bucket_api.js @@ -782,16 +782,13 @@ module.exports = { method: 'PUT', params: { type: 'object', - required: ['name', 'log_bucket', 'log_prefix'], + required: ['name', 'logging'], properties: { name: { $ref: 'common_api#/definitions/bucket_name' }, - log_bucket: { - $ref: 'common_api#/definitions/bucket_name' - }, - log_prefix: { - type: 'string', + logging: { + $ref: 'common_api#/definitions/bucket_logging' }, }, }, @@ -828,7 +825,11 @@ module.exports = { }, }, reply: { - $ref: 'common_api#/definitions/bucket_logging', + oneOf: [{ + $ref: 'common_api#/definitions/bucket_logging', + }, { + type: 'null' + }] }, auth: { system: ['admin', 'user'] diff --git a/src/cmd/manage_nsfs.js b/src/cmd/manage_nsfs.js index e0b71613eb..9a7ae66669 100644 --- a/src/cmd/manage_nsfs.js +++ b/src/cmd/manage_nsfs.js @@ -15,6 +15,7 @@ const SensitiveString = require('../util/sensitive_string'); const ManageCLIError = require('../manage_nsfs/manage_nsfs_cli_errors').ManageCLIError; const ManageCLIResponse = require('../manage_nsfs/manage_nsfs_cli_responses').ManageCLIResponse; const manage_nsfs_glacier = require('../manage_nsfs/manage_nsfs_glacier'); +const manage_nsfs_logging = require('../manage_nsfs/manage_nsfs_logging'); const nsfs_schema_utils = require('../manage_nsfs/nsfs_schema_utils'); const { print_usage } = require('../manage_nsfs/manage_nsfs_help_utils'); const { TYPES, ACTIONS, CONFIG_SUBDIRS, @@ -74,6 +75,8 @@ async function main(argv = minimist(process.argv.slice(2))) { await whitelist_ips_management(argv); } else if (type === TYPES.GLACIER) { await glacier_management(argv); + } else if (type === TYPES.LOGGING) { + await logging_management(user_input); } else { // we should not get here (we check it before) throw_cli_error(ManageCLIError.InvalidType); @@ -713,5 +716,9 @@ async function manage_glacier_operations(action, argv) { } } +async function logging_management(user_input) { + await manage_nsfs_logging.export_bucket_logging(user_input); +} + exports.main = main; if (require.main === module) main(); diff --git a/src/cmd/nsfs.js b/src/cmd/nsfs.js index 1bd91cfff6..f986224c5d 100644 --- a/src/cmd/nsfs.js +++ b/src/cmd/nsfs.js @@ -2,13 +2,6 @@ 'use strict'; /* eslint-disable complexity */ -require('../util/dotenv').load(); -require('aws-sdk/lib/maintenance_mode_message').suppress = true; - -const dbg = require('../util/debug_module')(__filename); -if (!dbg.get_process_name()) dbg.set_process_name('nsfs'); -dbg.original_console(); - // DO NOT PUT NEW REQUIREMENTS BEFORE SETTING process.env.NC_NSFS_NO_DB_ENV = 'true' // NC nsfs deployments specifying process.env.LOCAL_MD_SERVER=true deployed together with a db // when a system_store object is initialized VaccumAnalyzer is being called once a day. @@ -17,6 +10,13 @@ dbg.original_console(); if (process.env.LOCAL_MD_SERVER !== 'true') { process.env.NC_NSFS_NO_DB_ENV = 'true'; } +require('../util/dotenv').load(); +require('aws-sdk/lib/maintenance_mode_message').suppress = true; + +const dbg = require('../util/debug_module')(__filename); +if (!dbg.get_process_name()) dbg.set_process_name('nsfs'); +dbg.original_console(); + const config = require('../../config'); const os = require('os'); diff --git a/src/endpoint/endpoint.js b/src/endpoint/endpoint.js index 37214d3a37..add9836c6b 100755 --- a/src/endpoint/endpoint.js +++ b/src/endpoint/endpoint.js @@ -42,6 +42,7 @@ const endpoint_stats_collector = require('../sdk/endpoint_stats_collector'); const { NamespaceMonitor } = require('../server/bg_services/namespace_monitor'); const { SemaphoreMonitor } = require('../server/bg_services/semaphore_monitor'); const prom_reporting = require('../server/analytic_services/prometheus_reporting'); +const { PersistentLogger } = require('../util/persistent_logger'); const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent; const cluster = /** @type {import('node:cluster').Cluster} */ ( /** @type {unknown} */ (require('node:cluster')) @@ -63,6 +64,7 @@ dbg.log0('endpoint: replacing old umask: ', old_umask.toString(8), 'with new uma * func_sdk?: FuncSDK; * sts_sdk?: StsSDK; * virtual_hosts?: readonly string[]; + * bucket_logger?: PersistentLogger; * }} EndpointRequest */ @@ -97,6 +99,7 @@ async function create_https_server(ssl_cert_info, honorCipherOrder, endpoint_han */ /* eslint-disable max-statements */ async function main(options = {}) { + let bucket_logger; try { // setting process title needed for letting GPFS to identify the noobaa endpoint processes see issue #8039. if (config.ENDPOINT_PROCESS_TITLE) { @@ -127,6 +130,12 @@ async function main(options = {}) { dbg.log0('Configured Virtual Hosts:', virtual_hosts); dbg.log0('Configured Location Info:', location_info); + bucket_logger = config.BUCKET_LOG_TYPE === 'PERSISTENT' && + new PersistentLogger(config.PERSISTENT_BUCKET_LOG_DIR, config.PERSISTENT_BUCKET_LOG_NS, { + locking: 'SHARED', + poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL, + }); + process.on('warning', e => dbg.warn(e.stack)); let internal_rpc_client; @@ -164,8 +173,8 @@ async function main(options = {}) { init_request_sdk = create_init_request_sdk(rpc, internal_rpc_client, object_io); } - const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts); - const endpoint_request_handler_sts = create_endpoint_handler(init_request_sdk, virtual_hosts, true); + const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ false, bucket_logger); + const endpoint_request_handler_sts = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ true); const ssl_cert_info = await ssl_utils.get_ssl_cert_info('S3', options.nsfs_config_root); const https_server = await create_https_server(ssl_cert_info, true, endpoint_request_handler); @@ -243,6 +252,8 @@ async function main(options = {}) { //noobaa crashed event new NoobaaEvent(NoobaaEvent.ENDPOINT_CRASHED).create_event(undefined, undefined, err); handle_server_error(err); + } finally { + if (bucket_logger) bucket_logger.close(); } } @@ -251,7 +262,7 @@ async function main(options = {}) { * @param {readonly string[]} virtual_hosts * @returns {EndpointHandler} */ -function create_endpoint_handler(init_request_sdk, virtual_hosts, sts) { +function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger) { const blob_rest_handler = process.env.ENDPOINT_BLOB_ENABLED === 'true' ? blob_rest : unavailable_handler; const lambda_rest_handler = config.DB_TYPE === 'mongodb' ? lambda_rest : unavailable_handler; @@ -259,6 +270,7 @@ function create_endpoint_handler(init_request_sdk, virtual_hosts, sts) { const endpoint_request_handler = (req, res) => { endpoint_utils.prepare_rest_request(req); req.virtual_hosts = virtual_hosts; + if (logger) req.bucket_logger = logger; init_request_sdk(req, res); if (req.url.startsWith('/2015-03-31/functions')) { return lambda_rest_handler(req, res); diff --git a/src/endpoint/s3/ops/s3_get_bucket_logging.js b/src/endpoint/s3/ops/s3_get_bucket_logging.js index 5375c638f6..a58d0b39a1 100644 --- a/src/endpoint/s3/ops/s3_get_bucket_logging.js +++ b/src/endpoint/s3/ops/s3_get_bucket_logging.js @@ -1,6 +1,8 @@ /* Copyright (C) 2016 NooBaa */ 'use strict'; +const SensitiveString = require("../../../util/sensitive_string"); + /** * http://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETlogging.html */ @@ -11,7 +13,7 @@ async function get_bucket_logging(req) { return { BucketLoggingStatus: logging ? { LoggingEnabled: { - TargetBucket: logging.log_bucket, + TargetBucket: logging.log_bucket instanceof SensitiveString ? logging.log_bucket.unwrap() : logging.log_bucket, TargetPrefix: logging.log_prefix, } } : '' diff --git a/src/endpoint/s3/s3_bucket_logging.js b/src/endpoint/s3/s3_bucket_logging.js index f06771d6ca..e643d9a386 100644 --- a/src/endpoint/s3/s3_bucket_logging.js +++ b/src/endpoint/s3/s3_bucket_logging.js @@ -5,16 +5,16 @@ const dbg = require('../../util/debug_module')(__filename); const http_utils = require('../../util/http_utils'); const dgram = require('node:dgram'); const { Buffer } = require('node:buffer'); - +const config = require('../../../config'); async function send_bucket_op_logs(req, res) { - if (req.params && req.params.bucket) { + if (req.params && req.params.bucket && req.op_name !== 'put_bucket') { const bucket_info = await req.object_sdk.read_bucket_sdk_config_info(req.params.bucket); dbg.log2("read_bucket_sdk_config_info = ", bucket_info); if (is_bucket_logging_enabled(bucket_info)) { dbg.log2("Bucket logging is enabled for Bucket : ", req.params.bucket); - endpoint_bucket_op_logs(req.op_name, req, res, bucket_info); + await endpoint_bucket_op_logs(req.op_name, req, res, bucket_info); } } } @@ -54,15 +54,28 @@ const create_syslog_udp_socket = (() => { })(); -function endpoint_bucket_op_logs(op_name, req, res, source_bucket) { +async function endpoint_bucket_op_logs(op_name, req, res, source_bucket) { // 1 - Get all the information to be logged in a log message. // 2 - Format it and send it to log bucket/syslog. const s3_log = get_bucket_log_record(op_name, source_bucket, req, res); dbg.log1("Bucket operation logs = ", s3_log); + switch (config.BUCKET_LOG_TYPE) { + case 'PERSISTENT': { + await req.bucket_logger.append(JSON.stringify(s3_log)); + break; + } + default: { + send_op_logs_to_syslog(req.object_sdk.rpc_client.rpc.router.syslog, s3_log); + } + } + +} + +function send_op_logs_to_syslog(syslog, s3_log) { const buffer = Buffer.from(JSON.stringify(s3_log)); - const {client, port, hostname} = create_syslog_udp_socket(req.object_sdk.rpc_client.rpc.router.syslog); + const {client, port, hostname} = create_syslog_udp_socket(syslog); if (client && port && hostname) { client.send(buffer, port, hostname, err => { if (err) { @@ -72,13 +85,12 @@ function endpoint_bucket_op_logs(op_name, req, res, source_bucket) { } else { dbg.log0(`Could not send bucket logs: client: ${client} port: ${port} hostname:${hostname}`); } - } function get_bucket_log_record(op_name, source_bucket, req, res) { const client_ip = http_utils.parse_client_ip(req); - let status_code; + let status_code = 102; if (res && res.statusCode) { status_code = res.statusCode; } diff --git a/src/endpoint/s3/s3_rest.js b/src/endpoint/s3/s3_rest.js index 11bbf9c609..7eb1b7281f 100755 --- a/src/endpoint/s3/s3_rest.js +++ b/src/endpoint/s3/s3_rest.js @@ -12,6 +12,7 @@ const s3_logging = require('./s3_bucket_logging'); const time_utils = require('../../util/time_utils'); const http_utils = require('../../util/http_utils'); const signature_utils = require('../../util/signature_utils'); +const config = require('../../../config'); const S3_MAX_BODY_LEN = 4 * 1024 * 1024; @@ -68,6 +69,11 @@ async function s3_rest(req, res) { await handle_website_error(req, res, err); } else { handle_error(req, res, err); + try { + await s3_logging.send_bucket_op_logs(req, res); // logging again with error + } catch (err1) { + dbg.error("Could not log bucket operation:", err1); + } } } } @@ -118,7 +124,13 @@ async function handle_request(req, res) { usage_report.s3_usage_info.total_calls += 1; usage_report.s3_usage_info[op_name] = (usage_report.s3_usage_info[op_name] || 0) + 1; - + if (config.BUCKET_LOG_TYPE === 'PERSISTENT') { + try { + await s3_logging.send_bucket_op_logs(req); // logging intension - no result + } catch (err) { + dbg.error("Could not log bucket operation:", err); + } + } if (req.query && req.query.versionId) { const caching = await req.object_sdk.read_bucket_sdk_caching_info(req.params.bucket); @@ -151,7 +163,7 @@ async function handle_request(req, res) { http_utils.send_reply(req, res, reply, options); collect_bucket_usage(op, req, res); try { - await s3_logging.send_bucket_op_logs(req); + await s3_logging.send_bucket_op_logs(req, res); // logging again with result } catch (err) { dbg.error("Could not log bucket operation:", err); } diff --git a/src/manage_nsfs/manage_nsfs_cli_errors.js b/src/manage_nsfs/manage_nsfs_cli_errors.js index db1c22ce76..f8b1bc92c4 100644 --- a/src/manage_nsfs/manage_nsfs_cli_errors.js +++ b/src/manage_nsfs/manage_nsfs_cli_errors.js @@ -83,7 +83,7 @@ ManageCLIError.InvalidArgumentType = Object.freeze({ ManageCLIError.InvalidType = Object.freeze({ code: 'InvalidType', - message: 'Invalid type, available types are account, bucket or whitelist', + message: 'Invalid type, available types are account, bucket, logging or whitelist', http_code: 400, }); @@ -157,6 +157,16 @@ ManageCLIError.InvalidMasterKey = Object.freeze({ http_code: 500, }); +////////////////////////////// +//// BUCKET LOGGING ERRORS /// +////////////////////////////// + +ManageCLIError.LoggingExportFailed = Object.freeze({ + code: 'LoggingExportFailed', + message: 'Logging export attmept failed', + http_code: 500, +}); + //////////////////////// //// ACCOUNT ERRORS //// //////////////////////// @@ -423,6 +433,7 @@ const NSFS_CLI_ERROR_EVENT_MAP = { AccountDeleteForbiddenHasBuckets: NoobaaEvent.ACCOUNT_DELETE_FORBIDDEN, BucketAlreadyExists: NoobaaEvent.BUCKET_ALREADY_EXISTS, BucketSetForbiddenNoBucketOwner: NoobaaEvent.UNAUTHORIZED, + LoggingExportFailed: NoobaaEvent.LOGGING_FAILED, }; exports.ManageCLIError = ManageCLIError; diff --git a/src/manage_nsfs/manage_nsfs_cli_responses.js b/src/manage_nsfs/manage_nsfs_cli_responses.js index 72a913e4d5..789f35bf2b 100644 --- a/src/manage_nsfs/manage_nsfs_cli_responses.js +++ b/src/manage_nsfs/manage_nsfs_cli_responses.js @@ -102,12 +102,21 @@ ManageCLIResponse.BucketList = Object.freeze({ list: {} }); +/////////////////////////////// +// LOGGING RESPONSES /// +/////////////////////////////// +ManageCLIResponse.LoggingExported = Object.freeze({ + code: 'LoggingExported', + status: {} +}); + const NSFS_CLI_SUCCESS_EVENT_MAP = { AccountCreated: NoobaaEvent.ACCOUNT_CREATED, AccountDeleted: NoobaaEvent.ACCOUNT_DELETED, BucketCreated: NoobaaEvent.BUCKET_CREATED, BucketDeleted: NoobaaEvent.BUCKET_DELETE, WhiteListIPUpdated: NoobaaEvent.WHITELIST_UPDATED, + LoggingExported: NoobaaEvent.LOGGING_EXPORTED, }; exports.ManageCLIResponse = ManageCLIResponse; diff --git a/src/manage_nsfs/manage_nsfs_constants.js b/src/manage_nsfs/manage_nsfs_constants.js index 6add8fb916..8e720fd412 100644 --- a/src/manage_nsfs/manage_nsfs_constants.js +++ b/src/manage_nsfs/manage_nsfs_constants.js @@ -6,7 +6,8 @@ const TYPES = { BUCKET: 'bucket', IP_WHITELIST: 'whitelist', GLACIER: 'glacier', - HEALTH: 'health' + HEALTH: 'health', + LOGGING: 'logging', }; const ACTIONS = { diff --git a/src/manage_nsfs/manage_nsfs_events_utils.js b/src/manage_nsfs/manage_nsfs_events_utils.js index af4485e1b0..94b2cf7835 100644 --- a/src/manage_nsfs/manage_nsfs_events_utils.js +++ b/src/manage_nsfs/manage_nsfs_events_utils.js @@ -325,4 +325,26 @@ NoobaaEvent.INVALID_BUCKET_STATE = Object.freeze({ state: 'HEALTHY', }); +NoobaaEvent.LOGGING_EXPORTED = Object.freeze({ + event_code: 'bucket_logging_exported', + entity_type: 'NODE', + event_type: 'INFO', + message: 'Bucket logs was exported to target buckets', + description: 'Bucket logs was successfully exported to target buckets', + scope: 'NODE', + severity: 'INFO', + state: 'HEALTHY', +}); + +NoobaaEvent.LOGGING_FAILED = Object.freeze({ + event_code: 'bucket_logging_export_failed', + entity_type: 'NODE', + event_type: 'ERROR', + message: 'Bucket logging export failed.', + description: 'Bucket logging export failed due to error', + scope: 'NODE', + severity: 'ERROR', + state: 'DEGRADED', +}); + exports.NoobaaEvent = NoobaaEvent; diff --git a/src/manage_nsfs/manage_nsfs_help_utils.js b/src/manage_nsfs/manage_nsfs_help_utils.js index a04085bdf0..dcc13b495b 100644 --- a/src/manage_nsfs/manage_nsfs_help_utils.js +++ b/src/manage_nsfs/manage_nsfs_help_utils.js @@ -23,7 +23,7 @@ Usage: const ARGUMENTS = ` Arguments: - Set the resource type: account, bucket, or whitelist + Set the resource type: account, bucket, logging or whitelist Action could be: add, update, list, status, and delete for accounts/buckets `; @@ -59,6 +59,11 @@ Flags: in format: '["127.0.0.1", "192.0.10.0", "3002:0bd6:0000:0000:0000:ee00:0033:6778"]' `; +const LOGGING_FLAGS = ` +logging Use this to upload all the bucket logging collected in the system to their target buckets + +`; + const GLOBAL_CONFIG_ROOT_ALL_FLAG = ` --config_root (optional) Use configuration files path (default config.NSFS_NC_DEFAULT_CONF_DIR) --config_root_backend (optional) Use the filesystem type in the configuration (default config.NSFS_NC_CONFIG_DIR_BACKEND) @@ -222,6 +227,9 @@ function print_usage(type, action) { case TYPES.IP_WHITELIST: process.stdout.write(WHITELIST_FLAGS.trimStart()); break; + case TYPES.LOGGING: + process.stdout.write(LOGGING_FLAGS.trimStart() + GLOBAL_CONFIG_ROOT_ALL_FLAG.trimStart()); + break; case TYPES.GLACIER: print_help_glacier(action); break; diff --git a/src/manage_nsfs/manage_nsfs_logging.js b/src/manage_nsfs/manage_nsfs_logging.js new file mode 100644 index 0000000000..ad139a656a --- /dev/null +++ b/src/manage_nsfs/manage_nsfs_logging.js @@ -0,0 +1,67 @@ +/* Copyright (C) 2024 NooBaa */ +'use strict'; + +const path = require('path'); +const config = require('../../config'); +const { throw_cli_error, get_config_file_path, get_config_data, write_stdout_response} = require('../manage_nsfs/manage_nsfs_cli_utils'); +const nc_mkm = require('../manage_nsfs/nc_master_key_manager').get_instance(); +const ManageCLIError = require('../manage_nsfs/manage_nsfs_cli_errors').ManageCLIError; +const ManageCLIResponse = require('../manage_nsfs/manage_nsfs_cli_responses').ManageCLIResponse; +const { export_logs_to_target } = require('../util/bucket_logs_utils'); +const native_fs_utils = require('../util/native_fs_utils'); +const http_utils = require('../util/http_utils'); +const AWS = require('aws-sdk'); +const { RpcError } = require('../rpc'); + +const buckets_dir_name = '/buckets'; +const accounts_dir_name = '/accounts'; +let config_root; +let config_root_backend; +let buckets_dir_path; +let accounts_dir_path; + +// This command goes over the logs in the persistent log and move the entries to log objects in the target buckets +async function export_bucket_logging(user_input) { + const fs_context = native_fs_utils.get_process_fs_context(); + config_root_backend = user_input.config_root_backend || config.NSFS_NC_CONFIG_DIR_BACKEND; + config_root = user_input.config_root || config.NSFS_NC_CONF_DIR; + buckets_dir_path = path.join(config_root, buckets_dir_name); + accounts_dir_path = path.join(config_root, accounts_dir_name); + const endpoint = `https://127.0.0.1:${config.ENDPOINT_SSL_PORT}`; + const noobaa_con = new AWS.S3({ + endpoint, + s3ForcePathStyle: true, + sslEnabled: false, + httpOptions: { + agent: http_utils.get_unsecured_agent(endpoint) + } + }); + const success = await export_logs_to_target(fs_context, noobaa_con, get_bucket_owner_keys); + if (success) { + write_stdout_response(ManageCLIResponse.LoggingExported); + } else { + throw_cli_error(ManageCLIError.LoggingExportFailed); + } +} + +/** + * return bucket owner's access and secret key + * @param {string} log_bucket_name + * @returns {Promise} + */ +async function get_bucket_owner_keys(log_bucket_name) { + const log_bucket_path = get_config_file_path(buckets_dir_path, log_bucket_name); + let log_bucket_owner; + try { + const log_bucket_config_data = await get_config_data(config_root_backend, log_bucket_path); + log_bucket_owner = log_bucket_config_data.bucket_owner; + } catch (err) { + if (err.code === 'ENOENT') throw new RpcError('NO_SUCH_BUCKET', 'No such bucket: ' + log_bucket_name); + throw err; + } + const owner_path = get_config_file_path(accounts_dir_path, log_bucket_owner); + const owner_config_data = await get_config_data(config_root_backend, owner_path, true); + return nc_mkm.decrypt_access_keys(owner_config_data); +} + +exports.export_bucket_logging = export_bucket_logging; diff --git a/src/manage_nsfs/nsfs_schema_utils.js b/src/manage_nsfs/nsfs_schema_utils.js index 5514a9d797..607f0c687e 100644 --- a/src/manage_nsfs/nsfs_schema_utils.js +++ b/src/manage_nsfs/nsfs_schema_utils.js @@ -22,6 +22,7 @@ ajv.addSchema(common_api); const bucket_schema = require('../server/system_services/schemas/nsfs_bucket_schema'); const account_schema = require('../server/system_services/schemas/nsfs_account_schema'); const nsfs_config_schema = require('../server/system_services/schemas/nsfs_config_schema'); +const log_schema = require('../server/system_services/schemas/log_schema'); _.each(common_api.definitions, schema => { schema_utils.strictify(schema, { @@ -42,6 +43,7 @@ schema_utils.strictify(nsfs_config_schema, {}); const validate_account = ajv.compile(account_schema); const validate_bucket = ajv.compile(bucket_schema); const validate_nsfs_config = ajv.compile(nsfs_config_schema); +const validate_logging = ajv.compile(log_schema); /** * validate_account_schema validates an account object against the NC NSFS account schema @@ -99,6 +101,20 @@ function warn_invalid_schema(type, invalid_schema, err_msg) { console.warn(`nsfs_schema_utils ${type} ${invalid_schema} is invalid, schema check is disabled, skipping - err=${err_msg}`); } +/** + * validate_log_schema validates a logging object against the NC NSFS log schema + * @param {object} log + */ +function validate_log_schema(log) { + const valid = validate_logging(log); + if (!valid) { + const first_err = validate_logging.errors[0]; + const err_msg = first_err.message ? create_schema_err_msg(first_err) : undefined; + if (config.NC_DISABLE_SCHEMA_CHECK === true) return warn_invalid_schema('logging', log, err_msg); + throw new RpcError('INVALID_SCHEMA', err_msg); + } +} + /** * create_schema_err_msg would use the original error message we got from avj * and adds additional info in case we have it @@ -116,3 +132,4 @@ function create_schema_err_msg(err) { exports.validate_account_schema = validate_account_schema; exports.validate_bucket_schema = validate_bucket_schema; exports.validate_nsfs_config_schema = validate_nsfs_config_schema; +exports.validate_log_schema = validate_log_schema; diff --git a/src/sdk/bucketspace_nb.js b/src/sdk/bucketspace_nb.js index 2567b1fcd0..4e1038fcd7 100644 --- a/src/sdk/bucketspace_nb.js +++ b/src/sdk/bucketspace_nb.js @@ -154,9 +154,9 @@ class BucketSpaceNB { }); } - async get_bucket_logging(req) { + async get_bucket_logging(params) { return this.rpc_client.bucket.get_bucket_logging({ - name: req.params.bucket + name: params.name }); } diff --git a/src/sdk/object_sdk.js b/src/sdk/object_sdk.js index ef6a161b0f..c57efd5a8c 100644 --- a/src/sdk/object_sdk.js +++ b/src/sdk/object_sdk.js @@ -962,7 +962,7 @@ class ObjectSDK { async put_bucket_logging(params) { const bs = this._get_bucketspace(); - return bs.put_bucket_logging(params, this); + return bs.put_bucket_logging(params); } async delete_bucket_logging(params) { @@ -970,9 +970,9 @@ class ObjectSDK { return bs.delete_bucket_logging(params); } - async get_bucket_logging(req) { + async get_bucket_logging(params) { const bs = this._get_bucketspace(); - return bs.get_bucket_logging(req); + return bs.get_bucket_logging(params); } /////////////////////// diff --git a/src/server/bg_services/bucket_logs_upload.js b/src/server/bg_services/bucket_logs_upload.js index a1b9183e7f..1035d26d94 100644 --- a/src/server/bg_services/bucket_logs_upload.js +++ b/src/server/bg_services/bucket_logs_upload.js @@ -7,6 +7,9 @@ const system_utils = require('../utils/system_utils'); const config = require('../../../config'); const cloud_utils = require('../../util/cloud_utils'); const fs = require('fs'); +const { export_logs_to_target } = require('../../util/bucket_logs_utils'); +const { get_process_fs_context } = require('../../util/native_fs_utils'); +const RpcError = require('../../rpc/rpc_error'); const BUCKET_LOGS_PATH = '/log/noobaa_bucket_logs/'; @@ -35,7 +38,17 @@ class BucketLogUploader { if (!this.noobaa_connection) { throw new Error('noobaa endpoint connection is not started yet...'); } - await this.get_and_upload_bucket_log(); + if (config.BUCKET_LOG_TYPE === 'PERSISTENT') { + const fs_context = get_process_fs_context(); + const success = await export_logs_to_target(fs_context, this.noobaa_connection, this.get_bucket_owner_keys); + if (success) { + dbg.log0('Logs were uploaded succesfully to their target buckets'); + } else { + dbg.error('Logs upload failed - will retry in the next cycle'); + } + } else { + await this.get_and_upload_bucket_log(); + } } async run_batch() { @@ -176,6 +189,17 @@ class BucketLogUploader { } } + async get_bucket_owner_keys(bucket_name) { + const bucket = system_store.data.systems[0].buckets_by_name[bucket_name]; + if (!bucket) { + dbg.error('BUCKET NOT FOUND', bucket_name); + throw new RpcError('NO_SUCH_BUCKET', 'No such bucket: ' + bucket_name); + } + return [{ + access_key: bucket.owner_account.access_keys[0].access_key.unwrap(), + secret_key: bucket.owner_account.access_keys[0].secret_key.unwrap(), + }]; + } } exports.BucketLogUploader = BucketLogUploader; diff --git a/src/server/system_services/bucket_server.js b/src/server/system_services/bucket_server.js index f5de098980..01738d5d07 100644 --- a/src/server/system_services/bucket_server.js +++ b/src/server/system_services/bucket_server.js @@ -391,26 +391,22 @@ async function delete_bucket_tagging(req) { async function put_bucket_logging(req) { dbg.log0('put_bucket_logging:', req.rpc_params); const bucket = find_bucket(req); - const log_bucket_name = req.rpc_params.log_bucket; + const log_bucket_name = req.rpc_params.logging.log_bucket; const log_bucket = req.system.buckets_by_name && req.system.buckets_by_name[log_bucket_name.unwrap()]; if (!log_bucket) { dbg.error('TARGET BUCKET NOT EXIST', log_bucket); throw new RpcError('INVALID_TARGET_BUCKET', 'The target bucket for logging does not exist'); } - if (log_bucket.owner_account._id.toString() !== req.account._id.toString()) { + if (log_bucket.owner_account._id.toString() !== bucket.owner_account._id.toString()) { dbg.error('TARGET BUCKET NOT OWNED BY USER', log_bucket); throw new RpcError('INVALID_TARGET_BUCKET', 'The target bucket for logging is not owned by you'); } - const logging = { - log_bucket: req.rpc_params.log_bucket, - log_prefix: req.rpc_params.log_prefix - }; await system_store.make_changes({ update: { buckets: [{ _id: bucket._id, - logging + logging: req.rpc_params.logging, }] } }); @@ -421,10 +417,10 @@ async function get_bucket_logging(req) { dbg.log0('get_bucket_logging:', req.rpc_params); const bucket = find_bucket(req); - const logging = bucket.logging && { + const logging = bucket.logging ? { log_bucket: bucket.logging.log_bucket, log_prefix: bucket.logging.log_prefix - }; + } : null; return logging; } diff --git a/src/server/system_services/schemas/log_schema.js b/src/server/system_services/schemas/log_schema.js new file mode 100644 index 0000000000..145c54cd4b --- /dev/null +++ b/src/server/system_services/schemas/log_schema.js @@ -0,0 +1,38 @@ +/* Copyright (C) 2016 NooBaa */ +'use strict'; + +// This Schema is not a DB schema - it is used only for verifying bucket logging entries! +/* Example for bucket logging entry: +{ + noobaa_bucket_logging: 'true', + op: 'GET', + bucket_owner: 'account1', + source_bucket: 'bucket3', + object_key: '/bucket3/?encoding-type=url&max-keys=1000&prefix=&delimiter=%2F', + log_bucket: 'logs1', + log_prefix: 'bucket3._logs/', + remote_ip: '::ffff:127.0.0.1', + request_uri: '/bucket3/?encoding-type=url&max-keys=1000&prefix=&delimiter=%2F', + http_status: 102, + request_id: 'lxirvmtw-dnck9b-uz7' +} */ + +module.exports = { + $id: 'log_schema', + type: 'object', + required: ['op', 'bucket_owner', 'source_bucket', 'object_key', 'log_bucket', + 'log_prefix', 'remote_ip', 'http_status', 'request_id' + ], + properties: { + op: { type: 'string' }, + bucket_owner: { type: 'string' }, + source_bucket: { type: 'string' }, + object_key: { type: 'string' }, + log_bucket: { type: 'string' }, + log_prefix: { type: 'string' }, + remote_ip: { type: 'string' }, + request_uri: { type: 'string' }, + http_status: { type: 'integer' }, + request_id: { type: 'string' }, + }, +}; diff --git a/src/test/unit_tests/coretest.js b/src/test/unit_tests/coretest.js index 57b8666776..9d084d06bc 100644 --- a/src/test/unit_tests/coretest.js +++ b/src/test/unit_tests/coretest.js @@ -134,7 +134,7 @@ function setup(options = {}) { const object_io = new ObjectIO(); const endpoint_request_handler = endpoint.create_endpoint_handler( - endpoint.create_init_request_sdk(server_rpc.rpc, rpc_client, object_io), []); + endpoint.create_init_request_sdk(server_rpc.rpc, rpc_client, object_io), [], false); const endpoint_request_handler_sts = endpoint.create_endpoint_handler( endpoint.create_init_request_sdk(server_rpc.rpc, rpc_client, object_io), [], true); diff --git a/src/test/unit_tests/test_bucketspace_fs.js b/src/test/unit_tests/test_bucketspace_fs.js index ce90e18a60..e92d7136e7 100644 --- a/src/test/unit_tests/test_bucketspace_fs.js +++ b/src/test/unit_tests/test_bucketspace_fs.js @@ -604,14 +604,12 @@ mocha.describe('bucketspace_fs', function() { const param = {name: test_bucket, logging: { ...logging} }; await bucketspace_fs.put_bucket_logging(param); const output_log = await bucketspace_fs.get_bucket_logging(param); - console.log('JAJA1 output_log', output_log, 'logging', logging); assert.deepEqual(output_log, logging); }); mocha.it('delete_bucket_logging', async function() { const param = {name: test_bucket}; await bucketspace_fs.delete_bucket_logging(param); const output_log = await bucketspace_fs.get_bucket_logging(param); - console.log('JAJA2 output_log', output_log); assert.ok(output_log === undefined); }); }); diff --git a/src/test/unit_tests/test_np_bucket_logging.js b/src/test/unit_tests/test_np_bucket_logging.js index 8a7ce395d3..491e78dfbe 100644 --- a/src/test/unit_tests/test_np_bucket_logging.js +++ b/src/test/unit_tests/test_np_bucket_logging.js @@ -7,8 +7,9 @@ const _ = require('lodash'); const P = require('../../util/promise'); const coretest = require('./coretest'); const { rpc_client } = coretest; //, PASSWORD, SYSTEM +const { BucketLogUploader } = require('../../server/bg_services/bucket_logs_upload'); -coretest.setup({ pools_to_create: [coretest.POOL_LIST[0]] }); +coretest.setup({ pools_to_create: [coretest.POOL_LIST[1]] }); mocha.describe('noobaa bucket logging configuration validity tests', function() { const source1 = 'source-bucket-1'; @@ -63,11 +64,25 @@ mocha.describe('noobaa bucket logging configuration validity tests', function() mocha.it('_get bucket logging ', async function() { await _get_bucket_logging(no_source_bucket, true, "NO_SUCH_BUCKET"); }); + + mocha.it('get bucket owner keys ', async function() { + const uploader = new BucketLogUploader({ + name: 'Bucket Log Uploader', + client: rpc_client, + }); + const uploader_keys = await uploader.get_bucket_owner_keys(source1); + const bucket = await rpc_client.bucket.read_bucket({ name: source1 }); + const owner = await rpc_client.account.read_account({ email: bucket.owner_account.email }); + assert.strictEqual(uploader_keys[0].access_key, owner.access_keys[0].access_key.unwrap()); + assert.strictEqual(uploader_keys[0].secret_key, owner.access_keys[0].secret_key.unwrap()); + }); }); async function _put_bucket_logging(source_bucket_name, log_bucket_name, log_prefix, should_fail, error_message) { try { - await rpc_client.bucket.put_bucket_logging({ name: source_bucket_name, log_bucket: log_bucket_name, log_prefix: log_prefix }); + await rpc_client.bucket.put_bucket_logging({ name: source_bucket_name, logging: + { log_bucket: log_bucket_name, log_prefix: log_prefix } + }); if (should_fail) { assert.fail(`put_bucket_logging should fail but it passed`); } diff --git a/src/test/unit_tests/test_s3_ops.js b/src/test/unit_tests/test_s3_ops.js index 3ec11d8ccf..dc3165c12a 100644 --- a/src/test/unit_tests/test_s3_ops.js +++ b/src/test/unit_tests/test_s3_ops.js @@ -53,6 +53,7 @@ const azure_mock_connection_string = `DefaultEndpointsProtocol=http;AccountName= mocha.describe('s3_ops', function() { + /** @type {S3} */ let s3; let s3_client_params; // Bucket name for the source namespace resource @@ -62,6 +63,10 @@ mocha.describe('s3_ops', function() { let source_bucket; let other_platform_bucket; let is_other_platform_bucket_created = false; + const logging = { + TargetBucket: BKT2, + TargetPrefix: BKT1 + '/', + }; mocha.before(async function() { const self = this; @@ -105,6 +110,42 @@ mocha.describe('s3_ops', function() { const res = await s3.listBuckets({}); assert(res.Buckets.find(bucket => bucket.Name === BKT1)); }); + mocha.it('should enable bucket logging', async function() { + await s3.createBucket({ Bucket: BKT2 }); + await s3.putBucketLogging({ + Bucket: BKT1, + BucketLoggingStatus: { + LoggingEnabled: logging + }, + }); + const res_logging = await s3.getBucketLogging({ Bucket: BKT1 }); + assert.equal(res_logging.$metadata.httpStatusCode, 200); + assert.deepEqual(res_logging.LoggingEnabled, logging); + }); + mocha.it('should fail to enable bucket logging', async function() { + await s3.deleteBucket({ Bucket: BKT2 }); + try { + await s3.putBucketLogging({ + Bucket: BKT1, + BucketLoggingStatus: { + LoggingEnabled: logging + }, + }); + assert.fail('should not set bucket logging'); + } catch (err) { + assert.strictEqual(err.Code, 'InvalidTargetBucketForLogging'); + assert.strictEqual(err.$metadata.httpStatusCode, 400); + } + }); + mocha.it('should disable bucket logging', async function() { + await s3.putBucketLogging({ + Bucket: BKT1, + BucketLoggingStatus: {}, + }); + const res_logging = await s3.getBucketLogging({ Bucket: BKT1 }); + assert.equal(res_logging.$metadata.httpStatusCode, 200); + assert.equal(res_logging.LoggingEnabled, null); + }); mocha.it('should delete bucket', async function() { await s3.deleteBucket({ Bucket: BKT1 }); }); diff --git a/src/util/bucket_logs_utils.js b/src/util/bucket_logs_utils.js new file mode 100644 index 0000000000..6d9adafccb --- /dev/null +++ b/src/util/bucket_logs_utils.js @@ -0,0 +1,101 @@ +/* Copyright (C) 2024 NooBaa */ +'use strict'; + +const dbg = require('../util/debug_module')(__filename); +const config = require('../../config'); +const stream = require('stream'); +const crypto = require('crypto'); +const { PersistentLogger, LogFile } = require('../util/persistent_logger'); +const { format_aws_date } = require('../util/time_utils'); +const nsfs_schema_utils = require('../manage_nsfs/nsfs_schema_utils'); +const Semaphore = require('../util/semaphore'); +const P = require('../util/promise'); + +const sem = new Semaphore(config.BUCKET_LOG_CONCURRENCY); + +// delimiter between bucket name and log object name. +// Assuming noobaa bucket name follows the s3 bucket +// naming rules and can not have "_" in its name. +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html + +const BUCKET_NAME_DEL = "_"; + +/** + * This function will process the persistent log of bucket logging + * and will upload the log files in using provided noobaa connection + * @param {nb.NativeFSContext} fs_context + * @param {AWS.S3} s3_connection + * @param {function} bucket_to_owner_keys_func + */ +async function export_logs_to_target(fs_context, s3_connection, bucket_to_owner_keys_func) { + const log = new PersistentLogger(config.PERSISTENT_BUCKET_LOG_DIR, config.PERSISTENT_BUCKET_LOG_NS, { locking: 'EXCLUSIVE' }); + try { + return log.process(async file => _upload_to_targets(fs_context, s3_connection, file, bucket_to_owner_keys_func)); + } catch (err) { + dbg.error('processing log file failed', log.file); + throw err; + } finally { + await log.close(); + } +} + +/** + * This function gets a persistent log file, will go over it's entries one by one, + * and will upload the entry to the target_bucket using the provided s3 connection + * in order to know which user to use to upload to each bucket we will need to provide bucket_to_owner_keys_func + * @param {nb.NativeFSContext} fs_context + * @param {AWS.S3} s3_connection + * @param {string} log_file + * @param {function} bucket_to_owner_keys_func + * @returns {Promise} + */ +async function _upload_to_targets(fs_context, s3_connection, log_file, bucket_to_owner_keys_func) { + const bucket_streams = {}; + const promises = []; + try { + const file = new LogFile(fs_context, log_file); + dbg.log1('uploading file to target buckets', log_file); + await file.collect_and_process(async entry => { + const log_entry = JSON.parse(entry); + nsfs_schema_utils.validate_log_schema(log_entry); + const target_bucket = log_entry.log_bucket; + const log_prefix = log_entry.log_prefix; + const source_bucket = log_entry.source_bucket; + if (!bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket]) { /* new stream is needed for each target bucket, but also for each source bucket + - as mulitple buckets can't be written to the same object */ + const date = new Date(); + const upload_stream = new stream.PassThrough(); + let access_keys; + try { + access_keys = await bucket_to_owner_keys_func(target_bucket); + } catch (err) { + dbg.warn('Error when trying to resolve bucket keys', err); + if (err.rpc_code === 'NO_SUCH_BUCKET') return; // If the log_bucket doesn't exist any more - nowhere to upload - just skip + } + s3_connection.config.credentials.accessKeyId = access_keys[0].access_key; + s3_connection.config.credentials.secretAccessKey = access_keys[0].secret_key; + const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex'); + promises.push(sem.surround(() => P.retry({ + attempts: 3, + delay_ms: 1000, + func: () => s3_connection.upload({ + Bucket: target_bucket, + Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`, + Body: upload_stream, + }).promise() + }))); + bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket] = upload_stream; + } + dbg.log2(`uploading entry: ${entry} to target bucket: ${target_bucket}`); + bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket].write(entry + '\n'); + }); + Object.values(bucket_streams).forEach(st => st.end()); + await Promise.all(promises); + } catch (error) { + dbg.error('unexpected error in upload to bucket:', error, 'for:', log_file); + return false; + } + return true; +} + +exports.export_logs_to_target = export_logs_to_target; diff --git a/src/util/persistent_logger.js b/src/util/persistent_logger.js index 5072f73278..660cd146ce 100644 --- a/src/util/persistent_logger.js +++ b/src/util/persistent_logger.js @@ -152,13 +152,17 @@ class PersistentLogger { return; } + let result = true; for (const file of filtered_files) { dbg.log1('Processing', this.dir, file); const delete_processed = await cb(path.join(this.dir, file.name)); if (delete_processed) { await nb_native().fs.unlink(this.fs_context, path.join(this.dir, file.name)); + } else { + result = false; } } + return result; } /** @@ -168,6 +172,7 @@ class PersistentLogger { */ async process(cb) { let failure_log = null; + let result = false; try { // This logger is getting opened only so that we can process all the process the entries @@ -179,7 +184,7 @@ class PersistentLogger { try { // Process all the inactive and currently active log - await this._process(async file => cb(file, failure_log.append.bind(failure_log))); + result = await this._process(async file => cb(file, failure_log.append.bind(failure_log))); } catch (error) { dbg.error('failed to process logs, error:', error, 'log_namespace:', this.namespace); } @@ -198,6 +203,7 @@ class PersistentLogger { } catch (error) { dbg.error('failed to replace active failure log:', error, 'log_namespace:', this.namespace); } + return result; } finally { if (failure_log) await failure_log.close(); } @@ -228,7 +234,7 @@ class PersistentLogger { // process is continuously moving the active file this.init_lock.surround(async () => { // If the file has changed, re-init - if (stat.ino !== this.fh_stat.ino) { + if (this.fh_stat && stat.ino !== this.fh_stat.ino) { dbg.log1('active file changed, closing for namespace:', this.namespace); await this.close(); } diff --git a/src/util/time_utils.js b/src/util/time_utils.js index 77f11151de..58d9b61d42 100644 --- a/src/util/time_utils.js +++ b/src/util/time_utils.js @@ -82,6 +82,20 @@ function format_time_duration(millis, show_millis) { return `${hours_str}:${mins_str}:${secs_str}`; } +/** +* @param {Date} date +*/ +function format_aws_date(date) { + const year = date.getFullYear(); + const month = String(date.getMonth() + 1).padStart(2, '0'); + const day = String(date.getDate()).padStart(2, '0'); + const hours = String(date.getHours()).padStart(2, '0'); + const minutes = String(date.getMinutes()).padStart(2, '0'); + const seconds = String(date.getSeconds()).padStart(2, '0'); + + return `${year}-${month}-${day}-${hours}-${minutes}-${seconds}`; +} + /** * round_up_to_next_time_of_day takes a date and rounds it up based on * the given hours, mins and secs. @@ -123,4 +137,5 @@ exports.format_http_header_date = format_http_header_date; exports.parse_http_header_date = parse_http_header_date; exports.parse_amz_date = parse_amz_date; exports.format_time_duration = format_time_duration; +exports.format_aws_date = format_aws_date; exports.round_up_to_next_time_of_day = round_up_to_next_time_of_day;