Skip to content

Commit

Permalink
PR Feedback and added unit test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
markfields committed Jul 1, 2024
1 parent 4a326d5 commit 697ac97
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
3 changes: 1 addition & 2 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,7 @@ export const makeLegacySendBatchFn =
/** Helper type for type constraints passed through several functions.
* local - Did this client send the op?
* savedOp - Is this op being replayed after being serialized (having been sequenced previously)
* clientSequenceNumber - The clientSequenceNumber given on submit and used during transport
* (clientSequenceNumber may be overwritten to store index-within-batch by RemoteMessageProcessor)
* batchStartCsn - The clientSequenceNumber given on submit to the start of this batch
* message - The unpacked message. Likely a TypedContainerRuntimeMessage, but could also be a system op
* modernRuntimeMessage - Does this appear like a current TypedContainerRuntimeMessage?
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import { OpSplitter, isChunkedMessage } from "./opSplitter.js";
*/
export class RemoteMessageProcessor {
/**
* Client Sequence Number of the first message in this batch.
* Note: For chunked batches, this is the CSN of the "representative" chunk (the final chunk)
* Client Sequence Number of the first message in the current batch being processed.
* If undefined, we are expecting the next message to start a new batch.
*
* @remarks For chunked batches, this is the CSN of the "representative" chunk (the final chunk)
*/
private batchStartCsn: number | undefined;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ describe("RemoteMessageProcessor", () => {
batch = groupingManager.groupBatch(batch);
}

let leadingChunkCount = 0;
const outboundMessages: IBatchMessage[] = [];
if (option.compressionAndChunking.compression) {
const compressor = new OpCompressor(mockLogger);
Expand All @@ -141,6 +142,7 @@ describe("RemoteMessageProcessor", () => {
const splitter = new OpSplitter(
[],
(messages: IBatchMessage[], refSeqNum?: number) => {
++leadingChunkCount;
outboundMessages.push(...messages);
return 0;
},
Expand All @@ -157,6 +159,8 @@ describe("RemoteMessageProcessor", () => {
const messageProcessor = getMessageProcessor();
const actual: ISequencedDocumentMessage[] = [];
let seqNum = 1;
let actualBatchStartCsn: number | undefined;
let emptyProcessResultCount = 0;
for (const message of outboundMessages) {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const inboundMessage = {
Expand All @@ -169,9 +173,30 @@ describe("RemoteMessageProcessor", () => {
referenceSequenceNumber: message.referenceSequenceNumber,
} as ISequencedDocumentMessage;

const messages = messageProcessor.process(inboundMessage)?.messages ?? [];
actual.push(...messages);
const processResult = messageProcessor.process(inboundMessage);

// It'll be undefined for the first n-1 chunks if chunking is enabled
if (processResult === undefined) {
++emptyProcessResultCount;
continue;
}

actual.push(...processResult.messages);

if (actualBatchStartCsn === undefined) {
actualBatchStartCsn = processResult.batchStartCsn;
} else {
assert(
actualBatchStartCsn === processResult.batchStartCsn,
"batchStartCsn shouldn't change while processing a single batch",
);
}
}
assert.equal(
emptyProcessResultCount,
leadingChunkCount,
"expected empty result to be 1-1 with leading chunks",
);

const expected = option.grouping
? [
Expand All @@ -190,6 +215,7 @@ describe("RemoteMessageProcessor", () => {
];

assert.deepStrictEqual(actual, expected, "unexpected output");
assert.equal(actualBatchStartCsn, leadingChunkCount + 1, "unexpected batchStartCsn");
});
});

Expand Down

0 comments on commit 697ac97

Please sign in to comment.