Skip to content

Commit

Permalink
Offline: Provide incoming batch's starting CSN to the PendingStateMan…
Browse files Browse the repository at this point in the history
…ager (#21714)

On submit, the PendingStateManager stores the batch's starting CSN along
with each message. We need the corresponding information for incoming
messages too, for two reasons:

1. We can assert that they match as we process incoming acks for local
messages
2. We need it to compute batch ID, when checking for the same batch
coming from a forked container.

Today, two things happening in RemoteMessageProcessor keep the correct
info from flowing to the PSM:

1. Grouped Batches overwrite clientSequenceNumber with the index into
the batch
2. Without Grouped Batching, batch messages are processed one-by-one, so
the context of the start of the batch is not available when later
messages are sent to PSM.

Here's the fix - the RemoteMessageProcessor has the full context and can
return the batch's starting CSN with each message, irrespective of
grouped batching, compression, chunking, etc.
  • Loading branch information
markfields authored Jul 1, 2024
1 parent 60152ed commit 5f69bf6
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 22 deletions.
28 changes: 20 additions & 8 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -677,23 +677,26 @@ export const makeLegacySendBatchFn =
};

/** Helper type for type constraints passed through several functions.
* local - Did this client send the op?
* savedOp - Is this op being replayed after being serialized (having been sequenced previously)
* batchStartCsn - The clientSequenceNumber given on submit to the start of this batch
* message - The unpacked message. Likely a TypedContainerRuntimeMessage, but could also be a system op
* modernRuntimeMessage - Does this appear like a current TypedContainerRuntimeMessage?
* local - Did this client send the op?
*/
type MessageWithContext =
type MessageWithContext = {
local: boolean;
savedOp?: boolean;
batchStartCsn: number;
} & (
| {
message: InboundSequencedContainerRuntimeMessage;
modernRuntimeMessage: true;
local: boolean;
savedOp?: boolean;
}
| {
message: InboundSequencedContainerRuntimeMessageOrSystemMessage;
modernRuntimeMessage: false;
local: boolean;
savedOp?: boolean;
};
}
);

const summarizerRequestUrl = "_summarizer";

Expand Down Expand Up @@ -2619,7 +2622,13 @@ export class ContainerRuntime
// but will not modify the contents object (likely it will replace it on the message).
const messageCopy = { ...messageArg };
const savedOp = (messageCopy.metadata as ISavedOpMetadata)?.savedOp;
for (const message of this.remoteMessageProcessor.process(messageCopy)) {
const processResult = this.remoteMessageProcessor.process(messageCopy);
if (processResult === undefined) {
// This means the incoming message is an incomplete part of a message or batch
// and we need to process more messages before the rest of the system can understand it.
return;
}
for (const message of processResult.messages) {
const msg: MessageWithContext = modernRuntimeMessage
? {
// Cast it since we expect it to be this based on modernRuntimeMessage computation above.
Expand All @@ -2629,12 +2638,14 @@ export class ContainerRuntime
message: message as InboundSequencedContainerRuntimeMessage,
local,
modernRuntimeMessage,
batchStartCsn: processResult.batchStartCsn,
}
: // Unrecognized message will be ignored.
{
message,
local,
modernRuntimeMessage,
batchStartCsn: processResult.batchStartCsn,
};
msg.savedOp = savedOp;

Expand Down Expand Up @@ -2682,6 +2693,7 @@ export class ContainerRuntime
if (local && messageWithContext.modernRuntimeMessage) {
localOpMetadata = this.pendingStateManager.processPendingLocalMessage(
messageWithContext.message,
messageWithContext.batchStartCsn,
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Licensed under the MIT License.
*/

import { assert } from "@fluidframework/core-utils/internal";
import {
MessageType,
ISequencedDocumentMessage,
Expand All @@ -27,6 +28,14 @@ import { OpSplitter, isChunkedMessage } from "./opSplitter.js";
* @internal
*/
export class RemoteMessageProcessor {
/**
* Client Sequence Number of the first message in the current batch being processed.
* If undefined, we are expecting the next message to start a new batch.
*
* @remarks For chunked batches, this is the CSN of the "representative" chunk (the final chunk)
*/
private batchStartCsn: number | undefined;

constructor(
private readonly opSplitter: OpSplitter,
private readonly opDecompressor: OpDecompressor,
Expand Down Expand Up @@ -61,17 +70,21 @@ export class RemoteMessageProcessor {
* For ops that weren't virtualized (e.g. System ops that the ContainerRuntime will ultimately ignore),
* a singleton array [remoteMessageCopy] is returned
*/
public process(
remoteMessageCopy: ISequencedDocumentMessage,
): InboundSequencedContainerRuntimeMessageOrSystemMessage[] {
public process(remoteMessageCopy: ISequencedDocumentMessage):
| {
messages: InboundSequencedContainerRuntimeMessageOrSystemMessage[];
batchStartCsn: number;
}
| undefined {
let message = remoteMessageCopy;

ensureContentsDeserialized(message);

if (isChunkedMessage(message)) {
const chunkProcessingResult = this.opSplitter.processChunk(message);
// Only continue further if current chunk is the final chunk
if (!chunkProcessingResult.isFinalChunk) {
return [];
return;
}
// This message will always be compressed
message = chunkProcessingResult.message;
Expand All @@ -90,12 +103,56 @@ export class RemoteMessageProcessor {
}

if (isGroupedBatch(message)) {
return this.opGroupingManager.ungroupOp(message).map(unpack);
// We should be awaiting a new batch (batchStartCsn undefined)
assert(this.batchStartCsn === undefined, "Grouped batch interrupting another batch");
return {
messages: this.opGroupingManager.ungroupOp(message).map(unpack),
batchStartCsn: message.clientSequenceNumber,
};
}

const batchStartCsn = this.getAndUpdateBatchStartCsn(message);

// Do a final unpack of runtime messages in case the message was not grouped, compressed, or chunked
unpackRuntimeMessage(message);
return [message as InboundSequencedContainerRuntimeMessageOrSystemMessage];
return {
messages: [message as InboundSequencedContainerRuntimeMessageOrSystemMessage],
batchStartCsn,
};
}

/**
* Based on pre-existing batch tracking info and the current message's batch metadata,
* this will return the starting CSN for this message's batch, and will also update
* the batch tracking info (this.batchStartCsn) based on whether we're still mid-batch.
*/
private getAndUpdateBatchStartCsn(message: ISequencedDocumentMessage): number {
const batchMetadataFlag = (message.metadata as { batch: boolean | undefined })?.batch;
if (this.batchStartCsn === undefined) {
// We are waiting for a new batch
assert(batchMetadataFlag !== false, "Unexpected batch end marker");

// Start of a new multi-message batch
if (batchMetadataFlag === true) {
this.batchStartCsn = message.clientSequenceNumber;
return this.batchStartCsn;
}

// Single-message batch (Since metadata flag is undefined)
// IMPORTANT: Leave this.batchStartCsn undefined, we're ready for the next batch now.
return message.clientSequenceNumber;
}

// We are in the middle or end of an existing multi-message batch. Return the current batchStartCsn
const batchStartCsn = this.batchStartCsn;

assert(batchMetadataFlag !== true, "Unexpected batch start marker");
if (batchMetadataFlag === false) {
// Batch end? Then get ready for the next batch to start
this.batchStartCsn = undefined;
}

return batchStartCsn;
}
}

Expand Down
18 changes: 18 additions & 0 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
ITelemetryLoggerExt,
DataProcessingError,
LoggingError,
extractSafePropertiesFromMessage,
} from "@fluidframework/telemetry-utils/internal";
import Deque from "double-ended-queue";

Expand Down Expand Up @@ -256,9 +257,12 @@ export class PendingStateManager implements IDisposable {
* Processes a local message once its ack'd by the server. It verifies that there was no data corruption and that
* the batch information was preserved for batch messages.
* @param message - The message that got ack'd and needs to be processed.
* @param batchStartCsn - The clientSequenceNumber of the start of this message's batch (assigned during submit)
* (not to be confused with message.clientSequenceNumber - the overwritten value in case of grouped batching)
*/
public processPendingLocalMessage(
message: InboundSequencedContainerRuntimeMessage,
batchStartCsn: number,
): unknown {
// Pre-processing part - This may be the start of a batch.
this.maybeProcessBatchBegin(message);
Expand All @@ -273,6 +277,20 @@ export class PendingStateManager implements IDisposable {

this.pendingMessages.shift();

if (pendingMessage.batchStartCsn !== batchStartCsn) {
this.logger?.sendErrorEvent({
eventName: "BatchClientSequenceNumberMismatch",
details: {
processingBatch: !!this.pendingBatchBeginMessage,
pendingBatchCsn: pendingMessage.batchStartCsn,
batchStartCsn,
messageBatchMetadata: (message.metadata as any)?.batch,
pendingMessageBatchMetadata: (pendingMessage.opMetadata as any)?.batch,
},
messageDetails: extractSafePropertiesFromMessage(message),
});
}

const messageContent = buildPendingMessageContent(message);

// Stringified content should match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ describe("RemoteMessageProcessor", () => {
batch = groupingManager.groupBatch(batch);
}

let leadingChunkCount = 0;
const outboundMessages: IBatchMessage[] = [];
if (option.compressionAndChunking.compression) {
const compressor = new OpCompressor(mockLogger);
Expand All @@ -141,6 +142,7 @@ describe("RemoteMessageProcessor", () => {
const splitter = new OpSplitter(
[],
(messages: IBatchMessage[], refSeqNum?: number) => {
++leadingChunkCount;
outboundMessages.push(...messages);
return 0;
},
Expand All @@ -157,6 +159,8 @@ describe("RemoteMessageProcessor", () => {
const messageProcessor = getMessageProcessor();
const actual: ISequencedDocumentMessage[] = [];
let seqNum = 1;
let actualBatchStartCsn: number | undefined;
let emptyProcessResultCount = 0;
for (const message of outboundMessages) {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const inboundMessage = {
Expand All @@ -169,8 +173,30 @@ describe("RemoteMessageProcessor", () => {
referenceSequenceNumber: message.referenceSequenceNumber,
} as ISequencedDocumentMessage;

actual.push(...messageProcessor.process(inboundMessage));
const processResult = messageProcessor.process(inboundMessage);

// It'll be undefined for the first n-1 chunks if chunking is enabled
if (processResult === undefined) {
++emptyProcessResultCount;
continue;
}

actual.push(...processResult.messages);

if (actualBatchStartCsn === undefined) {
actualBatchStartCsn = processResult.batchStartCsn;
} else {
assert(
actualBatchStartCsn === processResult.batchStartCsn,
"batchStartCsn shouldn't change while processing a single batch",
);
}
}
assert.equal(
emptyProcessResultCount,
leadingChunkCount,
"expected empty result to be 1-1 with leading chunks",
);

const expected = option.grouping
? [
Expand All @@ -189,6 +215,7 @@ describe("RemoteMessageProcessor", () => {
];

assert.deepStrictEqual(actual, expected, "unexpected output");
assert.equal(actualBatchStartCsn, leadingChunkCount + 1, "unexpected batchStartCsn");
});
});

Expand All @@ -205,7 +232,7 @@ describe("RemoteMessageProcessor", () => {
metadata: { meta: "data" },
};
const documentMessage = message as ISequencedDocumentMessage;
const processResult = messageProcessor.process(documentMessage);
const processResult = messageProcessor.process(documentMessage)?.messages ?? [];

assert.strictEqual(processResult.length, 1, "only expected a single processed message");
const result = processResult[0];
Expand All @@ -223,7 +250,7 @@ describe("RemoteMessageProcessor", () => {
metadata: { meta: "data" },
};
const documentMessage = message as ISequencedDocumentMessage;
const processResult = messageProcessor.process(documentMessage);
const processResult = messageProcessor.process(documentMessage)?.messages ?? [];

assert.strictEqual(processResult.length, 1, "only expected a single processed message");
const result = processResult[0];
Expand Down Expand Up @@ -284,6 +311,10 @@ describe("RemoteMessageProcessor", () => {
},
},
];
assert.deepStrictEqual(result, expected, "unexpected processing of groupedBatch");
assert.deepStrictEqual(
result,
{ messages: expected, batchStartCsn: 12 },
"unexpected processing of groupedBatch",
);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ describe("Pending State Manager", () => {
);
};

const process = (messages: Partial<ISequencedDocumentMessage>[]) =>
const process = (messages: Partial<ISequencedDocumentMessage>[], batchStartCsn: number) =>
messages.forEach((message) => {
pendingStateManager.processPendingLocalMessage(
message as InboundSequencedContainerRuntimeMessage,
batchStartCsn,
);
});

Expand Down Expand Up @@ -193,7 +194,7 @@ describe("Pending State Manager", () => {
];

submitBatch(messages);
process(messages);
process(messages, 0 /* batchStartCsn */);
assert(closeError === undefined);
});

Expand All @@ -215,7 +216,7 @@ describe("Pending State Manager", () => {
];

submitBatch(messages);
process(messages);
process(messages, 0 /* batchStartCsn */);
assert(isILoggingError(closeError));
assert.strictEqual(closeError.errorType, ContainerErrorTypes.dataProcessingError);
assert.strictEqual(closeError.getTelemetryProperties().hasBatchStart, true);
Expand All @@ -239,6 +240,7 @@ describe("Pending State Manager", () => {
...message,
type: "otherType",
})),
0 /* batchStartCsn */,
);
assert(isILoggingError(closeError));
assert.strictEqual(closeError.errorType, ContainerErrorTypes.dataProcessingError);
Expand All @@ -265,6 +267,7 @@ describe("Pending State Manager", () => {
...message,
contents: undefined,
})),
0 /* batchStartCsn */,
);
assert.strictEqual(closeError?.errorType, ContainerErrorTypes.dataProcessingError);
});
Expand All @@ -286,6 +289,7 @@ describe("Pending State Manager", () => {
...message,
contents: { prop1: true },
})),
0 /* batchStartCsn */,
);
assert.strictEqual(closeError?.errorType, ContainerErrorTypes.dataProcessingError);
});
Expand All @@ -308,6 +312,7 @@ describe("Pending State Manager", () => {
...message,
contents: { prop1: true },
})),
0 /* batchStartCsn */,
);
assert.strictEqual(closeError, undefined, "unexpected close");
});
Expand All @@ -322,7 +327,7 @@ describe("Pending State Manager", () => {
sequenceNumber: i + 1, // starting with sequence number 1 so first assert does not filter any op
}));
submitBatch(messages);
process(messages);
process(messages, 0 /* batchStartCsn */);
let pendingState = pendingStateManager.getLocalState(0).pendingStates;
assert.strictEqual(pendingState.length, 10);
pendingState = pendingStateManager.getLocalState(5).pendingStates;
Expand Down Expand Up @@ -421,6 +426,7 @@ describe("Pending State Manager", () => {
);
pendingStateManager.processPendingLocalMessage(
futureRuntimeMessage as ISequencedDocumentMessage & UnknownContainerRuntimeMessage,
1 /* batchStartCsn */,
);
});
});
Expand Down

0 comments on commit 5f69bf6

Please sign in to comment.