Skip to content

Commit

Permalink
attempt with one bucket log
Browse files Browse the repository at this point in the history
Signed-off-by: jackyalbo <jacky.albo@gmail.com>
  • Loading branch information
jackyalbo committed Jun 26, 2024
1 parent 494cdf3 commit 8fdeb6f
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 55 deletions.
4 changes: 2 additions & 2 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,8 @@ config.BUCKET_DIFF_FOR_REPLICATION = true;

config.BUCKET_LOG_UPLOAD_ENABLED = true;
config.BUCKET_LOG_UPLOADER_DELAY = 5 * 60 * 1000;
config.BUCKET_LOG_TYPE = process.env.BUCKET_LOG_TYPE || 'BEST_EFFORT';
config.PERSISTENT_BUCKET_LOG_DIR = '/var/run/noobaa-nsfs/bucket_log';
config.BUCKET_LOG_TYPE = process.env.GUARANTEED_LOGS_PATH ? 'PERSISTENT' : 'BEST_EFFORT';
config.PERSISTENT_BUCKET_LOG_DIR = process.env.GUARANTEED_LOGS_PATH;
config.PERSISTENT_BUCKET_LOG_NS = 'bucket_logging';

///////////////////////////
Expand Down
16 changes: 14 additions & 2 deletions src/endpoint/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const endpoint_stats_collector = require('../sdk/endpoint_stats_collector');
const { NamespaceMonitor } = require('../server/bg_services/namespace_monitor');
const { SemaphoreMonitor } = require('../server/bg_services/semaphore_monitor');
const prom_reporting = require('../server/analytic_services/prometheus_reporting');
const { PersistentLogger } = require('../util/persistent_logger');
const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent;
const cluster = /** @type {import('node:cluster').Cluster} */ (
/** @type {unknown} */ (require('node:cluster'))
Expand All @@ -63,6 +64,7 @@ dbg.log0('endpoint: replacing old umask: ', old_umask.toString(8), 'with new uma
* func_sdk?: FuncSDK;
* sts_sdk?: StsSDK;
* virtual_hosts?: readonly string[];
* bucket_logger?: PersistentLogger;
* }} EndpointRequest
*/

Expand Down Expand Up @@ -97,6 +99,7 @@ async function create_https_server(ssl_cert_info, honorCipherOrder, endpoint_han
*/
/* eslint-disable max-statements */
async function main(options = {}) {
let bucket_logger;
try {
// setting process title needed for letting GPFS to identify the noobaa endpoint processes see issue #8039.
if (config.ENDPOINT_PROCESS_TITLE) {
Expand Down Expand Up @@ -127,6 +130,12 @@ async function main(options = {}) {
dbg.log0('Configured Virtual Hosts:', virtual_hosts);
dbg.log0('Configured Location Info:', location_info);

bucket_logger = config.BUCKET_LOG_TYPE === 'PERSISTENT' &&
new PersistentLogger(config.PERSISTENT_BUCKET_LOG_DIR, config.PERSISTENT_BUCKET_LOG_NS, {
locking: 'SHARED',
poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL,
});

process.on('warning', e => dbg.warn(e.stack));

let internal_rpc_client;
Expand Down Expand Up @@ -164,7 +173,7 @@ async function main(options = {}) {
init_request_sdk = create_init_request_sdk(rpc, internal_rpc_client, object_io);
}

const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts);
const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts, false, bucket_logger);
const endpoint_request_handler_sts = create_endpoint_handler(init_request_sdk, virtual_hosts, true);

const ssl_cert_info = await ssl_utils.get_ssl_cert_info('S3', options.nsfs_config_root);
Expand Down Expand Up @@ -243,6 +252,8 @@ async function main(options = {}) {
//noobaa crashed event
new NoobaaEvent(NoobaaEvent.ENDPOINT_CRASHED).create_event(undefined, undefined, err);
handle_server_error(err);
} finally {
if (bucket_logger) bucket_logger.close();
}
}

Expand All @@ -251,14 +262,15 @@ async function main(options = {}) {
* @param {readonly string[]} virtual_hosts
* @returns {EndpointHandler}
*/
function create_endpoint_handler(init_request_sdk, virtual_hosts, sts) {
function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger) {
const blob_rest_handler = process.env.ENDPOINT_BLOB_ENABLED === 'true' ? blob_rest : unavailable_handler;
const lambda_rest_handler = config.DB_TYPE === 'mongodb' ? lambda_rest : unavailable_handler;

/** @type {EndpointHandler} */
const endpoint_request_handler = (req, res) => {
endpoint_utils.prepare_rest_request(req);
req.virtual_hosts = virtual_hosts;
if (logger) req.bucket_logger = logger;
init_request_sdk(req, res);
if (req.url.startsWith('/2015-03-31/functions')) {
return lambda_rest_handler(req, res);
Expand Down
6 changes: 1 addition & 5 deletions src/endpoint/s3/s3_bucket_logging.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ const dbg = require('../../util/debug_module')(__filename);
const http_utils = require('../../util/http_utils');
const dgram = require('node:dgram');
const { Buffer } = require('node:buffer');
const { PersistentLogger } = require('../../util/persistent_logger');
const config = require('../../../config');


async function send_bucket_op_logs(req, res) {
if (req.params && req.params.bucket) {
const bucket_info = await req.object_sdk.read_bucket_sdk_config_info(req.params.bucket);
Expand Down Expand Up @@ -65,9 +63,7 @@ async function endpoint_bucket_op_logs(op_name, req, res, source_bucket) {

switch (config.BUCKET_LOG_TYPE) {
case 'PERSISTENT': {
const log = new PersistentLogger(config.PERSISTENT_BUCKET_LOG_DIR, req.params.bucket, { locking: 'SHARED' });
await log.append(JSON.stringify(s3_log));
await log.close();
await req.bucket_logger.append(JSON.stringify(s3_log));
break;
}
default: {
Expand Down
94 changes: 49 additions & 45 deletions src/manage_nsfs/manage_nsfs_logging.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ const AWS = require('aws-sdk');
const crypto = require('crypto');
const { PersistentLogger, LogFile } = require('../util/persistent_logger');
const config = require('../../config');
const P = require('../util/promise');
const nb_native = require('../util/nb_native');
const native_fs_utils = require('../util/native_fs_utils');
const http_utils = require('../util/http_utils');
const { throw_cli_error, get_config_file_path, get_config_data, write_stdout_response} = require('../manage_nsfs/manage_nsfs_cli_utils');
Expand All @@ -18,6 +16,10 @@ const ManageCLIError = require('../manage_nsfs/manage_nsfs_cli_errors').ManageCL
const ManageCLIResponse = require('../manage_nsfs/manage_nsfs_cli_responses').ManageCLIResponse;
const { format_aws_date } = require('../util/time_utils');
const nsfs_schema_utils = require('../manage_nsfs/nsfs_schema_utils');
const Semaphore = require('../util/semaphore');
const P = require('../util/promise');

const sem = new Semaphore(10);

const buckets_dir_name = '/buckets';
const accounts_dir_name = '/accounts';
Expand All @@ -31,81 +33,83 @@ async function export_bucket_logging(user_input) {
config_root = user_input.config_root || config.NSFS_NC_CONF_DIR;
buckets_dir_path = path.join(config_root, buckets_dir_name);
accounts_dir_path = path.join(config_root, accounts_dir_name);
const fs_context = native_fs_utils.get_process_fs_context(config_root_backend);
const log_entries = await nb_native().fs.readdir(fs_context, config.PERSISTENT_BUCKET_LOG_DIR);

try {
await P.map_with_concurrency(10, log_entries, async entry =>
export_single_bucket(entry.name.split('.')[0]) // log files are <bucket_name>.log
);
write_stdout_response(ManageCLIResponse.LoggingExported, log_entries.length);
await export_multi_bucket(); // log files are <bucket_name>.log
write_stdout_response(ManageCLIResponse.LoggingExported);
} catch (err) {
throw_cli_error(ManageCLIError.LoggingExportFailed, err);
}

}

async function export_single_bucket(bucket_name) {
const log = new PersistentLogger(config.PERSISTENT_BUCKET_LOG_DIR, bucket_name, { locking: 'EXCLUSIVE' });
async function export_multi_bucket() {
const log = new PersistentLogger(config.PERSISTENT_BUCKET_LOG_DIR, config.PERSISTENT_BUCKET_LOG_NS, { locking: 'EXCLUSIVE' });
try {
const bucket_path = get_config_file_path(buckets_dir_path, bucket_name);
const bucket_config_data = await get_config_data(config_root_backend, bucket_path);
const log_bucket_path = get_config_file_path(buckets_dir_path, bucket_config_data.logging.log_bucket);
const log_bucket_config_data = await get_config_data(config_root_backend, log_bucket_path);
const log_bucket_owner = log_bucket_config_data.bucket_owner;
const owner_path = get_config_file_path(accounts_dir_path, log_bucket_owner);
const owner_config_data = await get_config_data(config_root_backend, owner_path, true);
const access_keys = await nc_mkm.decrypt_access_keys(owner_config_data);

const fs_context = native_fs_utils.get_process_fs_context();
const fs_context = native_fs_utils.get_process_fs_context(config_root_backend);
const endpoint = `https://127.0.0.1:${config.ENDPOINT_SSL_PORT}`;
const noobaa_con = new AWS.S3({
endpoint,
credentials: {
accessKeyId: access_keys[0].access_key,
secretAccessKey: access_keys[0].secret_key,
},
s3ForcePathStyle: true,
sslEnabled: false,
httpOptions: {
agent: http_utils.get_unsecured_agent(endpoint)
}
});
await log.process(async file => upload_to_bucket(fs_context, noobaa_con, bucket_config_data.logging, file));
await log.process(async file => upload_to_targets(fs_context, noobaa_con, file));
} catch (err) {
dbg.error('processing log file failed', log.file);
if (err.code === 'ENOENT') throw_cli_error(ManageCLIError.NoSuchBucket, bucket_name);
throw err;
} finally {
await log.close();
}
}

async function upload_to_bucket(fs_context, s3_connection, logging, log_file) {
const upload_stream = new stream.PassThrough();
const date = new Date();
const target_bucket = logging.log_bucket;
const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex');
const log_prefix = logging.log_prefix;
const params = {
Bucket: target_bucket,
Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`,
Body: upload_stream,
};
/**
* @param {nb.NativeFSContext} fs_context
* @param {AWS.S3} s3_connection
* @param {string} log_file
* @returns {Promise<Boolean>}
*/
async function upload_to_targets(fs_context, s3_connection, log_file) {
const bucket_streams = {};
const promises = [];
try {
const upload_to_target_bucket = s3_connection.upload(params).promise();
const file = new LogFile(fs_context, log_file);
dbg.log1('uploading file to target bucket', log_file, target_bucket);
dbg.log1('uploading file to target buckets', log_file);
await file.collect_and_process(async entry => {
const log_entry = JSON.parse(entry);
nsfs_schema_utils.validate_log_schema(log_entry);
dbg.log2('moving log_entry to target bucket', log_entry, target_bucket);
if (log_entry.log_bucket === target_bucket) {
upload_stream.write(entry + '\n');
const target_bucket = log_entry.log_bucket;
const log_prefix = log_entry.log_prefix;
const source_bucket = log_entry.source_bucket;
if (!bucket_streams[source_bucket]) {
const date = new Date();
const upload_stream = new stream.PassThrough();
const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex');
const log_bucket_path = get_config_file_path(buckets_dir_path, target_bucket);
const log_bucket_config_data = await get_config_data(config_root_backend, log_bucket_path);
const log_bucket_owner = log_bucket_config_data.bucket_owner;
const owner_path = get_config_file_path(accounts_dir_path, log_bucket_owner);
const owner_config_data = await get_config_data(config_root_backend, owner_path, true);
const access_keys = await nc_mkm.decrypt_access_keys(owner_config_data);
s3_connection.config.credentials.accessKeyId = access_keys[0].access_key;
s3_connection.config.credentials.secretAccessKey = access_keys[0].secret_key;
promises.push(sem.surround(() => P.retry({
attempts: 3,
delay_ms: 1000,
func: () => s3_connection.upload({
Bucket: target_bucket,
Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`,
Body: upload_stream,
}).promise()
})));
bucket_streams[source_bucket] = upload_stream;
}
dbg.log1('uploading entry to target bucket', entry);
bucket_streams[source_bucket].write(entry + '\n');
});
upload_stream.end();
await upload_to_target_bucket;
Object.values(bucket_streams).forEach(st => st.end());
await Promise.all(promises);
} catch (error) {
dbg.error('unexpected error in upload to bucket:', error, 'for:', log_file);
return false;
Expand Down
2 changes: 1 addition & 1 deletion src/test/unit_tests/coretest.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ function setup(options = {}) {

const object_io = new ObjectIO();
const endpoint_request_handler = endpoint.create_endpoint_handler(
endpoint.create_init_request_sdk(server_rpc.rpc, rpc_client, object_io), []);
endpoint.create_init_request_sdk(server_rpc.rpc, rpc_client, object_io), [], false);
const endpoint_request_handler_sts = endpoint.create_endpoint_handler(
endpoint.create_init_request_sdk(server_rpc.rpc, rpc_client, object_io), [], true);

Expand Down

0 comments on commit 8fdeb6f

Please sign in to comment.