Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slogSender using positions in file instead of mmap #8889

Merged
merged 11 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .eslintrc.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ module.exports = {
plugins: ['@typescript-eslint', 'prettier'],
extends: ['@agoric', 'plugin:ava/recommended'],
rules: {
// UNTIL on Endo with https://github.com/endojs/endo/pull/2032
'@endo/no-nullish-coalescing': 'off',

'@typescript-eslint/prefer-ts-expect-error': 'warn',
'@typescript-eslint/no-floating-promises': 'error',
// so that floating-promises can be explicitly permitted with void operator
Expand Down
3 changes: 1 addition & 2 deletions packages/telemetry/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
"@opentelemetry/semantic-conventions": "~1.9.0",
"anylogger": "^0.21.0",
"better-sqlite3": "^9.1.1",
"bufferfromfile": "agoric-labs/BufferFromFile#Agoric-built",
"tmp": "^0.2.1"
},
"devDependencies": {
Expand All @@ -64,6 +63,6 @@
"workerThreads": false
},
"typeCoverage": {
"atLeast": 87.55
"atLeast": 87.14
}
}
237 changes: 151 additions & 86 deletions packages/telemetry/src/flight-recorder.js
turadg marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
// @ts-check
/* global Buffer */
/// <reference types="ses" />

// https://github.com/Agoric/agoric-sdk/issues/3742#issuecomment-1028451575
// I'd mmap() a 100MB file, reserve a few bytes for offsets, then use the rest
// as a circular buffer to hold length-prefixed records. The agd process would
// keep writing new events into the RAM window and updating the start/end
// pointers, with some sequencing to make sure the record gets written before
// the pointer is updated. Then, no mattter how abruptly the process is
// terminated, as long as the host computer itself is still running, the on-disk
// file would contain the most recent state, and anybody who reads the file will
// get the most recent state. The host kernel (linux) is under no obligation to
// flush it to disk any particular time, but knows when reads happen, so there's
// no coherency problem, and the speed is unaffected by disk write speeds.

import BufferFromFile from 'bufferfromfile';
mhofman marked this conversation as resolved.
Show resolved Hide resolved
import { promises as fsPromises } from 'fs';
import path from 'path';
import fs from 'node:fs';
import fsp from 'node:fs/promises';
import path from 'node:path';
import { serializeSlogObj } from './serialize-slog-obj.js';

const { Fail } = assert;
Expand All @@ -31,12 +21,16 @@ const I_ARENA_START = 4 * BigUint64Array.BYTES_PER_ELEMENT;

const RECORD_HEADER_SIZE = BigUint64Array.BYTES_PER_ELEMENT;

/**
* Initializes a circular buffer with the given size, creating the buffer file if it doesn't exist or is not large enough.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I've observed that with the current mmap implementation, resizing does not actually work (crashes if I remember, I forgot to write down my findings in #8425)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, more reason for this PR I suppose. Worth writing it there now?

*
* @param {string} bufferFile - the file path for the circular buffer
* @param {number} circularBufferSize - the size of the circular buffer
* @returns {Promise<bigint>} the size of the initialized circular buffer
*/
const initializeCircularBuffer = async (bufferFile, circularBufferSize) => {
if (!circularBufferSize) {
return undefined;
}
// If the file doesn't exist, or is not large enough, create it.
const stbuf = await fsPromises.stat(bufferFile).catch(e => {
const stbuf = await fsp.stat(bufferFile).catch(e => {
if (e.code === 'ENOENT') {
return undefined;
}
Expand All @@ -57,59 +51,29 @@ const initializeCircularBuffer = async (bufferFile, circularBufferSize) => {
header.setBigUint64(I_CIRC_START, 0n);
header.setBigUint64(I_CIRC_END, 0n);

await fsPromises.mkdir(path.dirname(bufferFile), { recursive: true });
await fsPromises.writeFile(bufferFile, headerBuf);
await fsp.mkdir(path.dirname(bufferFile), { recursive: true });
await fsp.writeFile(bufferFile, headerBuf);

if (stbuf && stbuf.size >= circularBufferSize) {
// File is big enough.
return arenaSize;
}

// Increase the file size.
await fsPromises.truncate(bufferFile, circularBufferSize);
await fsp.truncate(bufferFile, circularBufferSize);
return arenaSize;
};

export const makeMemoryMappedCircularBuffer = async ({
circularBufferSize = DEFAULT_CBUF_SIZE,
stateDir = '/tmp',
circularBufferFilename,
}) => {
const filename = circularBufferFilename || `${stateDir}/${DEFAULT_CBUF_FILE}`;
// console.log({ circularBufferFilename, filename });

const newArenaSize = await initializeCircularBuffer(
filename,
circularBufferSize,
);
/** @typedef {Awaited<ReturnType<typeof makeSimpleCircularBuffer>>} CircularBuffer */

/**
* @type {Uint8Array}
* BufferFromFile mmap()s the file into the process address space.
*/
const fileBuf = BufferFromFile(filename).Uint8Array();
const header = new DataView(fileBuf.buffer, 0, I_ARENA_START);

// Detect the arena size from the header, if not initialized.
const hdrArenaSize = header.getBigUint64(I_ARENA_SIZE);
const arenaSize = newArenaSize || hdrArenaSize;

const hdrMagic = header.getBigUint64(I_MAGIC);
SLOG_MAGIC === hdrMagic ||
Fail`${filename} is not a slog buffer; wanted magic ${SLOG_MAGIC}, got ${hdrMagic}`;
arenaSize === hdrArenaSize ||
Fail`${filename} arena size mismatch; wanted ${arenaSize}, got ${hdrArenaSize}`;
const arena = new Uint8Array(
fileBuf.buffer,
header.byteLength,
Number(arenaSize),
);

/**
* @param {Uint8Array} outbuf
* @param {number} [offset] offset relative to the current trailing edge (circStart) of the data
* @returns {IteratorResult<Uint8Array, void>}
*/
/**
*
* @param {bigint} arenaSize
* @param {DataView} header
* @param {(outbuf: Uint8Array, readStart: number, firstReadLength: number) => void} readRecord
* @param {(record: Uint8Array, firstWriteLength: number, circEnd: bigint) => Promise<void>} writeRecord
*/
function finishCircularBuffer(arenaSize, header, readRecord, writeRecord) {
const readCircBuf = (outbuf, offset = 0) => {
offset + outbuf.byteLength <= arenaSize ||
Fail`Reading past end of circular buffer`;
Expand All @@ -132,19 +96,13 @@ export const makeMemoryMappedCircularBuffer = async ({
// The data is contiguous, like ---AAABBB---
return { done: true, value: undefined };
}
outbuf.set(arena.subarray(readStart, readStart + firstReadLength));
if (firstReadLength < outbuf.byteLength) {
outbuf.set(
arena.subarray(0, outbuf.byteLength - firstReadLength),
firstReadLength,
);
}
readRecord(outbuf, readStart, firstReadLength);
return { done: false, value: outbuf };
};

/** @param {Uint8Array} data */
const writeCircBuf = data => {
if (RECORD_HEADER_SIZE + data.byteLength > arena.byteLength) {
/** @type {(data: Uint8Array) => Promise<void>} */
const writeCircBuf = async data => {
if (RECORD_HEADER_SIZE + data.byteLength > arenaSize) {
// The data is too big to fit in the arena, so skip it.
const tooBigRecord = JSON.stringify({
type: 'slog-record-too-big',
Expand All @@ -153,14 +111,17 @@ export const makeMemoryMappedCircularBuffer = async ({
data = new TextEncoder().encode(tooBigRecord);
}

if (RECORD_HEADER_SIZE + data.byteLength > arena.byteLength) {
if (RECORD_HEADER_SIZE + data.byteLength > arenaSize) {
// Silently drop, it just doesn't fit.
return;
}

// Allocate for the data and a header
const record = new Uint8Array(RECORD_HEADER_SIZE + data.byteLength);
// Set the data, after the header
record.set(data, RECORD_HEADER_SIZE);
Comment on lines +119 to 122
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never realized we were copying the data so many times, but that under the future improvement category.


// Set the size in the header
const lengthPrefix = new DataView(record.buffer);
lengthPrefix.setBigUint64(0, BigInt(data.byteLength));

Expand Down Expand Up @@ -206,31 +167,135 @@ export const makeMemoryMappedCircularBuffer = async ({
);
}

arena.set(record.subarray(0, firstWriteLength), Number(circEnd));
if (firstWriteLength < record.byteLength) {
// Write to the beginning of the arena.
arena.set(record.subarray(firstWriteLength, record.byteLength), 0);
}
header.setBigUint64(
turadg marked this conversation as resolved.
Show resolved Hide resolved
I_CIRC_END,
(circEnd + BigInt(record.byteLength)) % arenaSize,
);

return writeRecord(record, firstWriteLength, circEnd);
Comment on lines -209 to +175
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I note that technically the arena header is now updated before the record is written, but that is not consequential

};

const writeJSON = (obj, jsonObj = serializeSlogObj(obj)) => {
// Prepend a newline so that the file can be more easily manipulated.
const data = new TextEncoder().encode(`\n${jsonObj}`);
// console.log('have obj', obj, data);
writeCircBuf(data);
return { readCircBuf, writeCircBuf };
}

/**
* @param {{
* circularBufferSize?: number,
* stateDir?: string,
* circularBufferFilename?: string
* }} opts
*/
export const makeSimpleCircularBuffer = async ({
circularBufferSize = DEFAULT_CBUF_SIZE,
stateDir = '/tmp',
circularBufferFilename,
}) => {
const filename = circularBufferFilename || `${stateDir}/${DEFAULT_CBUF_FILE}`;

const newArenaSize = await initializeCircularBuffer(
filename,
circularBufferSize,
);

const file = await fsp.open(filename, 'r+');

const headerBuffer = Buffer.alloc(I_ARENA_START);

await file.read({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this check bytesRead ?

buffer: headerBuffer,
length: I_ARENA_START,
position: 0,
});
const header = new DataView(headerBuffer.buffer);

// Detect the arena size from the header, if not initialized.
const hdrArenaSize = header.getBigUint64(I_ARENA_SIZE);
const arenaSize = newArenaSize || hdrArenaSize;

const hdrMagic = header.getBigUint64(I_MAGIC);
SLOG_MAGIC === hdrMagic ||
Fail`${filename} is not a slog buffer; wanted magic ${SLOG_MAGIC}, got ${hdrMagic}`;
arenaSize === hdrArenaSize ||
Fail`${filename} arena size mismatch; wanted ${arenaSize}, got ${hdrArenaSize}`;

/** @type {(outbuf: Uint8Array, readStart: number, firstReadLength: number) => void} */
const readRecord = (outbuf, readStart, firstReadLength) => {
const bytesRead = fs.readSync(file.fd, outbuf, {
length: firstReadLength,
position: Number(readStart) + I_ARENA_START,
});
assert.equal(bytesRead, firstReadLength, 'Too few bytes read');

if (bytesRead < outbuf.byteLength) {
fs.readSync(file.fd, outbuf, {
offset: firstReadLength,
length: outbuf.byteLength - firstReadLength,
position: I_ARENA_START,
});
}
};

/**
* Writes to the file, offset by the header size. Also updates the file header.
*
* @param {Uint8Array} record
* @param {number} firstWriteLength
* @param {bigint} circEnd
*/
const writeRecord = async (record, firstWriteLength, circEnd) => {
await file.write(
record,
// TS saying options bag not available
0,
firstWriteLength,
I_ARENA_START + Number(circEnd),
);
if (firstWriteLength < record.byteLength) {
// Write to the beginning of the arena.
await file.write(
record,
firstWriteLength,
record.byteLength - firstWriteLength,
I_ARENA_START,
);
}

// Write out the updated file header.
// This is somewhat independent of writing the record itself, but it needs
// updating each time a record is written.
await file.write(headerBuffer, undefined, undefined, 0);
};

return { readCircBuf, writeCircBuf, writeJSON };
return finishCircularBuffer(arenaSize, header, readRecord, writeRecord);
};

export const makeSlogSender = async opts => {
const { writeJSON } = await makeMemoryMappedCircularBuffer(opts);
/**
*
* @param {Pick<Awaited<ReturnType<typeof makeSimpleCircularBuffer>>, 'writeCircBuf'>} circBuf
*/
export const makeSlogSenderFromBuffer = ({ writeCircBuf }) => {
/** @type {Promise<void>} */
let toWrite = Promise.resolve();
const writeJSON = (obj, serialized = serializeSlogObj(obj)) => {
// Prepend a newline so that the file can be more easily manipulated.
const data = new TextEncoder().encode(`\n${serialized}`);
// console.log('have obj', obj, data);
toWrite = toWrite.then(() => writeCircBuf(data));
turadg marked this conversation as resolved.
Show resolved Hide resolved
};
return Object.assign(writeJSON, {
forceFlush: async () => {},
forceFlush: async () => {
await toWrite;
},
usesJsonObject: true,
});
};

/**
* Loaded dynamically by makeSlogSender()
*
* @type {import('./index.js').MakeSlogSender}
*/
export const makeSlogSender = async opts => {
const { writeCircBuf } = await makeSimpleCircularBuffer(opts);
return makeSlogSenderFromBuffer({ writeCircBuf });
};
4 changes: 2 additions & 2 deletions packages/telemetry/src/frcat-entrypoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import '@endo/init';

import { makeMemoryMappedCircularBuffer } from './flight-recorder.js';
import { makeSimpleCircularBuffer } from './flight-recorder.js';

const main = async () => {
const files = process.argv.slice(2);
Expand All @@ -14,7 +14,7 @@ const main = async () => {
}

for await (const file of files) {
const { readCircBuf } = await makeMemoryMappedCircularBuffer({
const { readCircBuf } = await makeSimpleCircularBuffer({
circularBufferFilename: file,
circularBufferSize: 0,
});
Expand Down
11 changes: 7 additions & 4 deletions packages/telemetry/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ export * from './make-slog-sender.js';
* shutdown?: () => Promise<void>;
* }} SlogSender
*/
/**
* @typedef {(opts: import('./index.js').MakeSlogSenderOptions) => SlogSender | undefined} MakeSlogSender
*/
/**
* @typedef {MakeSlogSenderCommonOptions & Record<string, unknown>} MakeSlogSenderOptions
* @typedef {object} MakeSlogSenderCommonOptions
Expand All @@ -34,9 +37,9 @@ export const tryFlushSlogSender = async (
await Promise.resolve(slogSender?.forceFlush?.()).catch(err => {
log?.('Failed to flush slog sender', err);
if (err.errors) {
err.errors.forEach(error => {
for (const error of err.errors) {
log?.('nested error:', error);
});
}
}
if (env.SLOGSENDER_FAIL_ON_ERROR) {
throw err;
Expand Down Expand Up @@ -67,12 +70,12 @@ export const getResourceAttributes = ({
}
if (OTEL_RESOURCE_ATTRIBUTES) {
// Allow overriding resource attributes.
OTEL_RESOURCE_ATTRIBUTES.split(',').forEach(kv => {
for (const kv of OTEL_RESOURCE_ATTRIBUTES.split(',')) {
const match = kv.match(/^([^=]*)=(.*)$/);
if (match) {
resourceAttributes[match[1]] = match[2];
}
});
}
}
return resourceAttributes;
};
Expand Down
Loading
Loading