From 8fdeb6f3cf1903fa9bf6831137972e4aa7d8224a Mon Sep 17 00:00:00 2001 From: jackyalbo Date: Thu, 20 Jun 2024 17:40:25 +0300 Subject: [PATCH] attempt with one bucket log Signed-off-by: jackyalbo --- config.js | 4 +- src/endpoint/endpoint.js | 16 ++++- src/endpoint/s3/s3_bucket_logging.js | 6 +- src/manage_nsfs/manage_nsfs_logging.js | 94 ++++++++++++++------------ src/test/unit_tests/coretest.js | 2 +- 5 files changed, 67 insertions(+), 55 deletions(-) diff --git a/config.js b/config.js index e318f4139c..087e3d4640 100644 --- a/config.js +++ b/config.js @@ -659,8 +659,8 @@ config.BUCKET_DIFF_FOR_REPLICATION = true; config.BUCKET_LOG_UPLOAD_ENABLED = true; config.BUCKET_LOG_UPLOADER_DELAY = 5 * 60 * 1000; -config.BUCKET_LOG_TYPE = process.env.BUCKET_LOG_TYPE || 'BEST_EFFORT'; -config.PERSISTENT_BUCKET_LOG_DIR = '/var/run/noobaa-nsfs/bucket_log'; +config.BUCKET_LOG_TYPE = process.env.GUARANTEED_LOGS_PATH ? 'PERSISTENT' : 'BEST_EFFORT'; +config.PERSISTENT_BUCKET_LOG_DIR = process.env.GUARANTEED_LOGS_PATH; config.PERSISTENT_BUCKET_LOG_NS = 'bucket_logging'; /////////////////////////// diff --git a/src/endpoint/endpoint.js b/src/endpoint/endpoint.js index 37214d3a37..78a7aaba9d 100755 --- a/src/endpoint/endpoint.js +++ b/src/endpoint/endpoint.js @@ -42,6 +42,7 @@ const endpoint_stats_collector = require('../sdk/endpoint_stats_collector'); const { NamespaceMonitor } = require('../server/bg_services/namespace_monitor'); const { SemaphoreMonitor } = require('../server/bg_services/semaphore_monitor'); const prom_reporting = require('../server/analytic_services/prometheus_reporting'); +const { PersistentLogger } = require('../util/persistent_logger'); const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent; const cluster = /** @type {import('node:cluster').Cluster} */ ( /** @type {unknown} */ (require('node:cluster')) @@ -63,6 +64,7 @@ dbg.log0('endpoint: replacing old umask: ', old_umask.toString(8), 'with new uma * func_sdk?: FuncSDK; * sts_sdk?: StsSDK; * virtual_hosts?: readonly string[]; + * bucket_logger?: PersistentLogger; * }} EndpointRequest */ @@ -97,6 +99,7 @@ async function create_https_server(ssl_cert_info, honorCipherOrder, endpoint_han */ /* eslint-disable max-statements */ async function main(options = {}) { + let bucket_logger; try { // setting process title needed for letting GPFS to identify the noobaa endpoint processes see issue #8039. if (config.ENDPOINT_PROCESS_TITLE) { @@ -127,6 +130,12 @@ async function main(options = {}) { dbg.log0('Configured Virtual Hosts:', virtual_hosts); dbg.log0('Configured Location Info:', location_info); + bucket_logger = config.BUCKET_LOG_TYPE === 'PERSISTENT' && + new PersistentLogger(config.PERSISTENT_BUCKET_LOG_DIR, config.PERSISTENT_BUCKET_LOG_NS, { + locking: 'SHARED', + poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL, + }); + process.on('warning', e => dbg.warn(e.stack)); let internal_rpc_client; @@ -164,7 +173,7 @@ async function main(options = {}) { init_request_sdk = create_init_request_sdk(rpc, internal_rpc_client, object_io); } - const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts); + const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts, false, bucket_logger); const endpoint_request_handler_sts = create_endpoint_handler(init_request_sdk, virtual_hosts, true); const ssl_cert_info = await ssl_utils.get_ssl_cert_info('S3', options.nsfs_config_root); @@ -243,6 +252,8 @@ async function main(options = {}) { //noobaa crashed event new NoobaaEvent(NoobaaEvent.ENDPOINT_CRASHED).create_event(undefined, undefined, err); handle_server_error(err); + } finally { + if (bucket_logger) bucket_logger.close(); } } @@ -251,7 +262,7 @@ async function main(options = {}) { * @param {readonly string[]} virtual_hosts * @returns {EndpointHandler} */ -function create_endpoint_handler(init_request_sdk, virtual_hosts, sts) { +function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger) { const blob_rest_handler = process.env.ENDPOINT_BLOB_ENABLED === 'true' ? blob_rest : unavailable_handler; const lambda_rest_handler = config.DB_TYPE === 'mongodb' ? lambda_rest : unavailable_handler; @@ -259,6 +270,7 @@ function create_endpoint_handler(init_request_sdk, virtual_hosts, sts) { const endpoint_request_handler = (req, res) => { endpoint_utils.prepare_rest_request(req); req.virtual_hosts = virtual_hosts; + if (logger) req.bucket_logger = logger; init_request_sdk(req, res); if (req.url.startsWith('/2015-03-31/functions')) { return lambda_rest_handler(req, res); diff --git a/src/endpoint/s3/s3_bucket_logging.js b/src/endpoint/s3/s3_bucket_logging.js index 4b07102e12..ca5b1a122d 100644 --- a/src/endpoint/s3/s3_bucket_logging.js +++ b/src/endpoint/s3/s3_bucket_logging.js @@ -5,10 +5,8 @@ const dbg = require('../../util/debug_module')(__filename); const http_utils = require('../../util/http_utils'); const dgram = require('node:dgram'); const { Buffer } = require('node:buffer'); -const { PersistentLogger } = require('../../util/persistent_logger'); const config = require('../../../config'); - async function send_bucket_op_logs(req, res) { if (req.params && req.params.bucket) { const bucket_info = await req.object_sdk.read_bucket_sdk_config_info(req.params.bucket); @@ -65,9 +63,7 @@ async function endpoint_bucket_op_logs(op_name, req, res, source_bucket) { switch (config.BUCKET_LOG_TYPE) { case 'PERSISTENT': { - const log = new PersistentLogger(config.PERSISTENT_BUCKET_LOG_DIR, req.params.bucket, { locking: 'SHARED' }); - await log.append(JSON.stringify(s3_log)); - await log.close(); + await req.bucket_logger.append(JSON.stringify(s3_log)); break; } default: { diff --git a/src/manage_nsfs/manage_nsfs_logging.js b/src/manage_nsfs/manage_nsfs_logging.js index 5b15634d68..438015434f 100644 --- a/src/manage_nsfs/manage_nsfs_logging.js +++ b/src/manage_nsfs/manage_nsfs_logging.js @@ -8,8 +8,6 @@ const AWS = require('aws-sdk'); const crypto = require('crypto'); const { PersistentLogger, LogFile } = require('../util/persistent_logger'); const config = require('../../config'); -const P = require('../util/promise'); -const nb_native = require('../util/nb_native'); const native_fs_utils = require('../util/native_fs_utils'); const http_utils = require('../util/http_utils'); const { throw_cli_error, get_config_file_path, get_config_data, write_stdout_response} = require('../manage_nsfs/manage_nsfs_cli_utils'); @@ -18,6 +16,10 @@ const ManageCLIError = require('../manage_nsfs/manage_nsfs_cli_errors').ManageCL const ManageCLIResponse = require('../manage_nsfs/manage_nsfs_cli_responses').ManageCLIResponse; const { format_aws_date } = require('../util/time_utils'); const nsfs_schema_utils = require('../manage_nsfs/nsfs_schema_utils'); +const Semaphore = require('../util/semaphore'); +const P = require('../util/promise'); + +const sem = new Semaphore(10); const buckets_dir_name = '/buckets'; const accounts_dir_name = '/accounts'; @@ -31,81 +33,83 @@ async function export_bucket_logging(user_input) { config_root = user_input.config_root || config.NSFS_NC_CONF_DIR; buckets_dir_path = path.join(config_root, buckets_dir_name); accounts_dir_path = path.join(config_root, accounts_dir_name); - const fs_context = native_fs_utils.get_process_fs_context(config_root_backend); - const log_entries = await nb_native().fs.readdir(fs_context, config.PERSISTENT_BUCKET_LOG_DIR); - try { - await P.map_with_concurrency(10, log_entries, async entry => - export_single_bucket(entry.name.split('.')[0]) // log files are .log - ); - write_stdout_response(ManageCLIResponse.LoggingExported, log_entries.length); + await export_multi_bucket(); // log files are .log + write_stdout_response(ManageCLIResponse.LoggingExported); } catch (err) { throw_cli_error(ManageCLIError.LoggingExportFailed, err); } } -async function export_single_bucket(bucket_name) { - const log = new PersistentLogger(config.PERSISTENT_BUCKET_LOG_DIR, bucket_name, { locking: 'EXCLUSIVE' }); +async function export_multi_bucket() { + const log = new PersistentLogger(config.PERSISTENT_BUCKET_LOG_DIR, config.PERSISTENT_BUCKET_LOG_NS, { locking: 'EXCLUSIVE' }); try { - const bucket_path = get_config_file_path(buckets_dir_path, bucket_name); - const bucket_config_data = await get_config_data(config_root_backend, bucket_path); - const log_bucket_path = get_config_file_path(buckets_dir_path, bucket_config_data.logging.log_bucket); - const log_bucket_config_data = await get_config_data(config_root_backend, log_bucket_path); - const log_bucket_owner = log_bucket_config_data.bucket_owner; - const owner_path = get_config_file_path(accounts_dir_path, log_bucket_owner); - const owner_config_data = await get_config_data(config_root_backend, owner_path, true); - const access_keys = await nc_mkm.decrypt_access_keys(owner_config_data); - - const fs_context = native_fs_utils.get_process_fs_context(); + const fs_context = native_fs_utils.get_process_fs_context(config_root_backend); const endpoint = `https://127.0.0.1:${config.ENDPOINT_SSL_PORT}`; const noobaa_con = new AWS.S3({ endpoint, - credentials: { - accessKeyId: access_keys[0].access_key, - secretAccessKey: access_keys[0].secret_key, - }, s3ForcePathStyle: true, sslEnabled: false, httpOptions: { agent: http_utils.get_unsecured_agent(endpoint) } }); - await log.process(async file => upload_to_bucket(fs_context, noobaa_con, bucket_config_data.logging, file)); + await log.process(async file => upload_to_targets(fs_context, noobaa_con, file)); } catch (err) { dbg.error('processing log file failed', log.file); - if (err.code === 'ENOENT') throw_cli_error(ManageCLIError.NoSuchBucket, bucket_name); throw err; } finally { await log.close(); } } -async function upload_to_bucket(fs_context, s3_connection, logging, log_file) { - const upload_stream = new stream.PassThrough(); - const date = new Date(); - const target_bucket = logging.log_bucket; - const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex'); - const log_prefix = logging.log_prefix; - const params = { - Bucket: target_bucket, - Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`, - Body: upload_stream, - }; +/** + * @param {nb.NativeFSContext} fs_context + * @param {AWS.S3} s3_connection + * @param {string} log_file + * @returns {Promise} + */ +async function upload_to_targets(fs_context, s3_connection, log_file) { + const bucket_streams = {}; + const promises = []; try { - const upload_to_target_bucket = s3_connection.upload(params).promise(); const file = new LogFile(fs_context, log_file); - dbg.log1('uploading file to target bucket', log_file, target_bucket); + dbg.log1('uploading file to target buckets', log_file); await file.collect_and_process(async entry => { const log_entry = JSON.parse(entry); nsfs_schema_utils.validate_log_schema(log_entry); - dbg.log2('moving log_entry to target bucket', log_entry, target_bucket); - if (log_entry.log_bucket === target_bucket) { - upload_stream.write(entry + '\n'); + const target_bucket = log_entry.log_bucket; + const log_prefix = log_entry.log_prefix; + const source_bucket = log_entry.source_bucket; + if (!bucket_streams[source_bucket]) { + const date = new Date(); + const upload_stream = new stream.PassThrough(); + const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex'); + const log_bucket_path = get_config_file_path(buckets_dir_path, target_bucket); + const log_bucket_config_data = await get_config_data(config_root_backend, log_bucket_path); + const log_bucket_owner = log_bucket_config_data.bucket_owner; + const owner_path = get_config_file_path(accounts_dir_path, log_bucket_owner); + const owner_config_data = await get_config_data(config_root_backend, owner_path, true); + const access_keys = await nc_mkm.decrypt_access_keys(owner_config_data); + s3_connection.config.credentials.accessKeyId = access_keys[0].access_key; + s3_connection.config.credentials.secretAccessKey = access_keys[0].secret_key; + promises.push(sem.surround(() => P.retry({ + attempts: 3, + delay_ms: 1000, + func: () => s3_connection.upload({ + Bucket: target_bucket, + Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`, + Body: upload_stream, + }).promise() + }))); + bucket_streams[source_bucket] = upload_stream; } + dbg.log1('uploading entry to target bucket', entry); + bucket_streams[source_bucket].write(entry + '\n'); }); - upload_stream.end(); - await upload_to_target_bucket; + Object.values(bucket_streams).forEach(st => st.end()); + await Promise.all(promises); } catch (error) { dbg.error('unexpected error in upload to bucket:', error, 'for:', log_file); return false; diff --git a/src/test/unit_tests/coretest.js b/src/test/unit_tests/coretest.js index 57b8666776..9d084d06bc 100644 --- a/src/test/unit_tests/coretest.js +++ b/src/test/unit_tests/coretest.js @@ -134,7 +134,7 @@ function setup(options = {}) { const object_io = new ObjectIO(); const endpoint_request_handler = endpoint.create_endpoint_handler( - endpoint.create_init_request_sdk(server_rpc.rpc, rpc_client, object_io), []); + endpoint.create_init_request_sdk(server_rpc.rpc, rpc_client, object_io), [], false); const endpoint_request_handler_sts = endpoint.create_endpoint_handler( endpoint.create_init_request_sdk(server_rpc.rpc, rpc_client, object_io), [], true);