From 5f69bf6c55adf94dac2be639f52eb6e0b43ab3c8 Mon Sep 17 00:00:00 2001 From: Mark Fields Date: Mon, 1 Jul 2024 15:14:02 -0700 Subject: [PATCH] Offline: Provide incoming batch's starting CSN to the PendingStateManager (#21714) On submit, the PendingStateManager stores the batch's starting CSN along with each message. We need the corresponding information for incoming messages too, for two reasons: 1. We can assert that they match as we process incoming acks for local messages 2. We need it to compute batch ID, when checking for the same batch coming from a forked container. Today, two things happening in RemoteMessageProcessor keep the correct info from flowing to the PSM: 1. Grouped Batches overwrite clientSequenceNumber with the index into the batch 2. Without Grouped Batching, batch messages are processed one-by-one, so the context of the start of the batch is not available when later messages are sent to PSM. Here's the fix - the RemoteMessageProcessor has the full context and can return the batch's starting CSN with each message, irrespective of grouped batching, compression, chunking, etc. --- .../container-runtime/src/containerRuntime.ts | 28 +++++--- .../src/opLifecycle/remoteMessageProcessor.ts | 69 +++++++++++++++++-- .../src/pendingStateManager.ts | 18 +++++ .../remoteMessageProcessor.spec.ts | 39 +++++++++-- .../src/test/pendingStateManager.spec.ts | 14 ++-- 5 files changed, 146 insertions(+), 22 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 5641f159f5f9..b72aeb2d0a19 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -677,23 +677,26 @@ export const makeLegacySendBatchFn = }; /** Helper type for type constraints passed through several functions. + * local - Did this client send the op? + * savedOp - Is this op being replayed after being serialized (having been sequenced previously) + * batchStartCsn - The clientSequenceNumber given on submit to the start of this batch * message - The unpacked message. Likely a TypedContainerRuntimeMessage, but could also be a system op * modernRuntimeMessage - Does this appear like a current TypedContainerRuntimeMessage? - * local - Did this client send the op? */ -type MessageWithContext = +type MessageWithContext = { + local: boolean; + savedOp?: boolean; + batchStartCsn: number; +} & ( | { message: InboundSequencedContainerRuntimeMessage; modernRuntimeMessage: true; - local: boolean; - savedOp?: boolean; } | { message: InboundSequencedContainerRuntimeMessageOrSystemMessage; modernRuntimeMessage: false; - local: boolean; - savedOp?: boolean; - }; + } +); const summarizerRequestUrl = "_summarizer"; @@ -2619,7 +2622,13 @@ export class ContainerRuntime // but will not modify the contents object (likely it will replace it on the message). const messageCopy = { ...messageArg }; const savedOp = (messageCopy.metadata as ISavedOpMetadata)?.savedOp; - for (const message of this.remoteMessageProcessor.process(messageCopy)) { + const processResult = this.remoteMessageProcessor.process(messageCopy); + if (processResult === undefined) { + // This means the incoming message is an incomplete part of a message or batch + // and we need to process more messages before the rest of the system can understand it. + return; + } + for (const message of processResult.messages) { const msg: MessageWithContext = modernRuntimeMessage ? { // Cast it since we expect it to be this based on modernRuntimeMessage computation above. @@ -2629,12 +2638,14 @@ export class ContainerRuntime message: message as InboundSequencedContainerRuntimeMessage, local, modernRuntimeMessage, + batchStartCsn: processResult.batchStartCsn, } : // Unrecognized message will be ignored. { message, local, modernRuntimeMessage, + batchStartCsn: processResult.batchStartCsn, }; msg.savedOp = savedOp; @@ -2682,6 +2693,7 @@ export class ContainerRuntime if (local && messageWithContext.modernRuntimeMessage) { localOpMetadata = this.pendingStateManager.processPendingLocalMessage( messageWithContext.message, + messageWithContext.batchStartCsn, ); } diff --git a/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts b/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts index 52b59410e4b0..fc19b73c1461 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts @@ -3,6 +3,7 @@ * Licensed under the MIT License. */ +import { assert } from "@fluidframework/core-utils/internal"; import { MessageType, ISequencedDocumentMessage, @@ -27,6 +28,14 @@ import { OpSplitter, isChunkedMessage } from "./opSplitter.js"; * @internal */ export class RemoteMessageProcessor { + /** + * Client Sequence Number of the first message in the current batch being processed. + * If undefined, we are expecting the next message to start a new batch. + * + * @remarks For chunked batches, this is the CSN of the "representative" chunk (the final chunk) + */ + private batchStartCsn: number | undefined; + constructor( private readonly opSplitter: OpSplitter, private readonly opDecompressor: OpDecompressor, @@ -61,17 +70,21 @@ export class RemoteMessageProcessor { * For ops that weren't virtualized (e.g. System ops that the ContainerRuntime will ultimately ignore), * a singleton array [remoteMessageCopy] is returned */ - public process( - remoteMessageCopy: ISequencedDocumentMessage, - ): InboundSequencedContainerRuntimeMessageOrSystemMessage[] { + public process(remoteMessageCopy: ISequencedDocumentMessage): + | { + messages: InboundSequencedContainerRuntimeMessageOrSystemMessage[]; + batchStartCsn: number; + } + | undefined { let message = remoteMessageCopy; + ensureContentsDeserialized(message); if (isChunkedMessage(message)) { const chunkProcessingResult = this.opSplitter.processChunk(message); // Only continue further if current chunk is the final chunk if (!chunkProcessingResult.isFinalChunk) { - return []; + return; } // This message will always be compressed message = chunkProcessingResult.message; @@ -90,12 +103,56 @@ export class RemoteMessageProcessor { } if (isGroupedBatch(message)) { - return this.opGroupingManager.ungroupOp(message).map(unpack); + // We should be awaiting a new batch (batchStartCsn undefined) + assert(this.batchStartCsn === undefined, "Grouped batch interrupting another batch"); + return { + messages: this.opGroupingManager.ungroupOp(message).map(unpack), + batchStartCsn: message.clientSequenceNumber, + }; } + const batchStartCsn = this.getAndUpdateBatchStartCsn(message); + // Do a final unpack of runtime messages in case the message was not grouped, compressed, or chunked unpackRuntimeMessage(message); - return [message as InboundSequencedContainerRuntimeMessageOrSystemMessage]; + return { + messages: [message as InboundSequencedContainerRuntimeMessageOrSystemMessage], + batchStartCsn, + }; + } + + /** + * Based on pre-existing batch tracking info and the current message's batch metadata, + * this will return the starting CSN for this message's batch, and will also update + * the batch tracking info (this.batchStartCsn) based on whether we're still mid-batch. + */ + private getAndUpdateBatchStartCsn(message: ISequencedDocumentMessage): number { + const batchMetadataFlag = (message.metadata as { batch: boolean | undefined })?.batch; + if (this.batchStartCsn === undefined) { + // We are waiting for a new batch + assert(batchMetadataFlag !== false, "Unexpected batch end marker"); + + // Start of a new multi-message batch + if (batchMetadataFlag === true) { + this.batchStartCsn = message.clientSequenceNumber; + return this.batchStartCsn; + } + + // Single-message batch (Since metadata flag is undefined) + // IMPORTANT: Leave this.batchStartCsn undefined, we're ready for the next batch now. + return message.clientSequenceNumber; + } + + // We are in the middle or end of an existing multi-message batch. Return the current batchStartCsn + const batchStartCsn = this.batchStartCsn; + + assert(batchMetadataFlag !== true, "Unexpected batch start marker"); + if (batchMetadataFlag === false) { + // Batch end? Then get ready for the next batch to start + this.batchStartCsn = undefined; + } + + return batchStartCsn; } } diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index 6e729dc828b4..0fe54af6c3d6 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -11,6 +11,7 @@ import { ITelemetryLoggerExt, DataProcessingError, LoggingError, + extractSafePropertiesFromMessage, } from "@fluidframework/telemetry-utils/internal"; import Deque from "double-ended-queue"; @@ -256,9 +257,12 @@ export class PendingStateManager implements IDisposable { * Processes a local message once its ack'd by the server. It verifies that there was no data corruption and that * the batch information was preserved for batch messages. * @param message - The message that got ack'd and needs to be processed. + * @param batchStartCsn - The clientSequenceNumber of the start of this message's batch (assigned during submit) + * (not to be confused with message.clientSequenceNumber - the overwritten value in case of grouped batching) */ public processPendingLocalMessage( message: InboundSequencedContainerRuntimeMessage, + batchStartCsn: number, ): unknown { // Pre-processing part - This may be the start of a batch. this.maybeProcessBatchBegin(message); @@ -273,6 +277,20 @@ export class PendingStateManager implements IDisposable { this.pendingMessages.shift(); + if (pendingMessage.batchStartCsn !== batchStartCsn) { + this.logger?.sendErrorEvent({ + eventName: "BatchClientSequenceNumberMismatch", + details: { + processingBatch: !!this.pendingBatchBeginMessage, + pendingBatchCsn: pendingMessage.batchStartCsn, + batchStartCsn, + messageBatchMetadata: (message.metadata as any)?.batch, + pendingMessageBatchMetadata: (pendingMessage.opMetadata as any)?.batch, + }, + messageDetails: extractSafePropertiesFromMessage(message), + }); + } + const messageContent = buildPendingMessageContent(message); // Stringified content should match diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts index 47e5e952073d..5c3e618b83e8 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts @@ -132,6 +132,7 @@ describe("RemoteMessageProcessor", () => { batch = groupingManager.groupBatch(batch); } + let leadingChunkCount = 0; const outboundMessages: IBatchMessage[] = []; if (option.compressionAndChunking.compression) { const compressor = new OpCompressor(mockLogger); @@ -141,6 +142,7 @@ describe("RemoteMessageProcessor", () => { const splitter = new OpSplitter( [], (messages: IBatchMessage[], refSeqNum?: number) => { + ++leadingChunkCount; outboundMessages.push(...messages); return 0; }, @@ -157,6 +159,8 @@ describe("RemoteMessageProcessor", () => { const messageProcessor = getMessageProcessor(); const actual: ISequencedDocumentMessage[] = []; let seqNum = 1; + let actualBatchStartCsn: number | undefined; + let emptyProcessResultCount = 0; for (const message of outboundMessages) { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions const inboundMessage = { @@ -169,8 +173,30 @@ describe("RemoteMessageProcessor", () => { referenceSequenceNumber: message.referenceSequenceNumber, } as ISequencedDocumentMessage; - actual.push(...messageProcessor.process(inboundMessage)); + const processResult = messageProcessor.process(inboundMessage); + + // It'll be undefined for the first n-1 chunks if chunking is enabled + if (processResult === undefined) { + ++emptyProcessResultCount; + continue; + } + + actual.push(...processResult.messages); + + if (actualBatchStartCsn === undefined) { + actualBatchStartCsn = processResult.batchStartCsn; + } else { + assert( + actualBatchStartCsn === processResult.batchStartCsn, + "batchStartCsn shouldn't change while processing a single batch", + ); + } } + assert.equal( + emptyProcessResultCount, + leadingChunkCount, + "expected empty result to be 1-1 with leading chunks", + ); const expected = option.grouping ? [ @@ -189,6 +215,7 @@ describe("RemoteMessageProcessor", () => { ]; assert.deepStrictEqual(actual, expected, "unexpected output"); + assert.equal(actualBatchStartCsn, leadingChunkCount + 1, "unexpected batchStartCsn"); }); }); @@ -205,7 +232,7 @@ describe("RemoteMessageProcessor", () => { metadata: { meta: "data" }, }; const documentMessage = message as ISequencedDocumentMessage; - const processResult = messageProcessor.process(documentMessage); + const processResult = messageProcessor.process(documentMessage)?.messages ?? []; assert.strictEqual(processResult.length, 1, "only expected a single processed message"); const result = processResult[0]; @@ -223,7 +250,7 @@ describe("RemoteMessageProcessor", () => { metadata: { meta: "data" }, }; const documentMessage = message as ISequencedDocumentMessage; - const processResult = messageProcessor.process(documentMessage); + const processResult = messageProcessor.process(documentMessage)?.messages ?? []; assert.strictEqual(processResult.length, 1, "only expected a single processed message"); const result = processResult[0]; @@ -284,6 +311,10 @@ describe("RemoteMessageProcessor", () => { }, }, ]; - assert.deepStrictEqual(result, expected, "unexpected processing of groupedBatch"); + assert.deepStrictEqual( + result, + { messages: expected, batchStartCsn: 12 }, + "unexpected processing of groupedBatch", + ); }); }); diff --git a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts index 27be17665277..ad5ffadbab71 100644 --- a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts @@ -161,10 +161,11 @@ describe("Pending State Manager", () => { ); }; - const process = (messages: Partial[]) => + const process = (messages: Partial[], batchStartCsn: number) => messages.forEach((message) => { pendingStateManager.processPendingLocalMessage( message as InboundSequencedContainerRuntimeMessage, + batchStartCsn, ); }); @@ -193,7 +194,7 @@ describe("Pending State Manager", () => { ]; submitBatch(messages); - process(messages); + process(messages, 0 /* batchStartCsn */); assert(closeError === undefined); }); @@ -215,7 +216,7 @@ describe("Pending State Manager", () => { ]; submitBatch(messages); - process(messages); + process(messages, 0 /* batchStartCsn */); assert(isILoggingError(closeError)); assert.strictEqual(closeError.errorType, ContainerErrorTypes.dataProcessingError); assert.strictEqual(closeError.getTelemetryProperties().hasBatchStart, true); @@ -239,6 +240,7 @@ describe("Pending State Manager", () => { ...message, type: "otherType", })), + 0 /* batchStartCsn */, ); assert(isILoggingError(closeError)); assert.strictEqual(closeError.errorType, ContainerErrorTypes.dataProcessingError); @@ -265,6 +267,7 @@ describe("Pending State Manager", () => { ...message, contents: undefined, })), + 0 /* batchStartCsn */, ); assert.strictEqual(closeError?.errorType, ContainerErrorTypes.dataProcessingError); }); @@ -286,6 +289,7 @@ describe("Pending State Manager", () => { ...message, contents: { prop1: true }, })), + 0 /* batchStartCsn */, ); assert.strictEqual(closeError?.errorType, ContainerErrorTypes.dataProcessingError); }); @@ -308,6 +312,7 @@ describe("Pending State Manager", () => { ...message, contents: { prop1: true }, })), + 0 /* batchStartCsn */, ); assert.strictEqual(closeError, undefined, "unexpected close"); }); @@ -322,7 +327,7 @@ describe("Pending State Manager", () => { sequenceNumber: i + 1, // starting with sequence number 1 so first assert does not filter any op })); submitBatch(messages); - process(messages); + process(messages, 0 /* batchStartCsn */); let pendingState = pendingStateManager.getLocalState(0).pendingStates; assert.strictEqual(pendingState.length, 10); pendingState = pendingStateManager.getLocalState(5).pendingStates; @@ -421,6 +426,7 @@ describe("Pending State Manager", () => { ); pendingStateManager.processPendingLocalMessage( futureRuntimeMessage as ISequencedDocumentMessage & UnknownContainerRuntimeMessage, + 1 /* batchStartCsn */, ); }); });