Skip to content

Commit

Permalink
Flav/improve confluence workflow (#9678)
Browse files Browse the repository at this point in the history
* Improve fetching root pages

* Improve workflow history size.
  • Loading branch information
flvndvd authored Dec 31, 2024
1 parent 1e41443 commit 4cd69c3
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 64 deletions.
45 changes: 44 additions & 1 deletion connectors/src/connectors/confluence/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ export async function confluenceGetActiveChildPageRefsActivity({
// Confluence has a single main landing page.
// However, users have the ability to create "orphaned" root pages that don't link from the main landing.
// It's important to ensure these pages are also imported.
export async function confluenceGetRootPageRefsActivity({
async function getRootPageRefsActivity({
connectorId,
confluenceCloudId,
spaceId,
Expand Down Expand Up @@ -647,6 +647,49 @@ export async function confluenceGetRootPageRefsActivity({
}
}

// Activity to handle fetching, upserting, and filtering root pages.
export async function fetchAndUpsertRootPagesActivity(params: {
confluenceCloudId: string;
connectorId: ModelId;
forceUpsert: boolean;
isBatchSync: boolean;
spaceId: string;
spaceName: string;
visitedAtMs: number;
}): Promise<string[]> {
const { connectorId, confluenceCloudId, spaceId } = params;

// Get the root level pages for the space.
const rootPageRefs = await getRootPageRefsActivity({
connectorId,
confluenceCloudId,
spaceId,
});
if (rootPageRefs.length === 0) {
return [];
}

const allowedRootPageIds: string[] = [];

// Check and upsert pages, filter allowed ones.
for (const rootPageRef of rootPageRefs) {
const successfullyUpsert = await confluenceCheckAndUpsertPageActivity({
...params,
pageRef: rootPageRef,
});

// If the page fails the upsert operation, it indicates the page is restricted.
// Such pages should not be added to the list of allowed pages.
if (successfullyUpsert) {
allowedRootPageIds.push(rootPageRef.id);
}
}

console.log(">> allowedRootPageIds", allowedRootPageIds);

return allowedRootPageIds;
}

export async function confluenceGetTopLevelPageIdsActivity({
confluenceCloudId,
connectorId,
Expand Down
125 changes: 62 additions & 63 deletions connectors/src/connectors/confluence/temporal/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ const {
confluenceUpdatePagesParentIdsActivity,
confluenceCheckAndUpsertPageActivity,
confluenceGetActiveChildPageRefsActivity,
confluenceGetRootPageRefsActivity,
fetchConfluenceSpaceIdsForConnectorActivity,
confluenceUpsertPageWithFullParentsActivity,

Expand All @@ -38,6 +37,9 @@ const {

fetchConfluenceConfigurationActivity,
confluenceUpsertSpaceFolderActivity,

fetchAndUpsertRootPagesActivity,

getSpaceIdsToSyncActivity,
} = proxyActivities<typeof activities>({
startToCloseTimeout: "30 minutes",
Expand All @@ -51,6 +53,7 @@ const {
// avoid exceeding Temporal's max workflow size limit,
// since a Confluence page can have an unbounded number of pages.
const TEMPORAL_WORKFLOW_MAX_HISTORY_LENGTH = 10_000;
const TEMPORAL_WORKFLOW_MAX_HISTORY_SIZE_MB = 10;

export async function confluenceSyncWorkflow({
connectorId,
Expand Down Expand Up @@ -161,39 +164,16 @@ export async function confluenceSpaceSyncWorkflow(
spaceName,
});

// Get the root level pages for the space.
const rootPageRefs = await confluenceGetRootPageRefsActivity({
connectorId,
const allowedRootPageIds = await fetchAndUpsertRootPagesActivity({
...params,
confluenceCloudId,
spaceId,
spaceName,
visitedAtMs,
});
if (rootPageRefs.length === 0) {
return;
}

const allowedRootPageRefs = new Map<string, ConfluencePageRef>(
rootPageRefs.map((r) => [r.id, r])
);

// Upsert the root pages.
for (const rootPageRef of allowedRootPageRefs.values()) {
const successfullyUpsert = await confluenceCheckAndUpsertPageActivity({
...params,
spaceName,
pageRef: rootPageRef,
visitedAtMs,
});

// If the page fails the upsert operation, it indicates the page is restricted.
// Such pages should be excluded from the list of allowed pages.
if (!successfullyUpsert) {
allowedRootPageRefs.delete(rootPageRef.id);
}
}

// Fetch all top-level pages within a specified space. Top-level pages
// refer to those directly nested under the space's root pages.
for (const allowedRootPageId of allowedRootPageRefs.keys()) {
for (const allowedRootPageId of allowedRootPageIds) {
let nextPageCursor: string | null = "";
do {
const { topLevelPageRefs, nextPageCursor: nextCursor } =
Expand Down Expand Up @@ -246,64 +226,83 @@ export async function confluenceSpaceSyncWorkflow(
);
}

type StackElement = ConfluencePageRef | { parentId: string; cursor: string };

interface confluenceSyncTopLevelChildPagesWorkflowInput {
confluenceCloudId: string;
connectorId: ModelId;
forceUpsert: boolean;
isBatchSync: boolean;
spaceId: string;
spaceName: string;
topLevelPageRefs: ConfluencePageRef[];
topLevelPageRefs: StackElement[];
visitedAtMs: number;
}

// This Workflow implements a DFS algorithm to synchronize all pages not
// subject to restrictions. It stops importing child pages
// if a parent page is restricted.
// Page restriction checks are performed by `confluenceCheckAndUpsertPageActivity`;
// where false denotes restriction. Children of unrestricted pages are
// stacked for subsequent import.
/**
* This workflow implements a DFS algorithm to synchronize all pages not subject to restrictions.
* It uses a stack to process pages and their children, with a special handling for pagination:
* - Regular pages are processed and their children are added to the stack
* - Cursor elements in the stack represent continuation points for pages with many children
* This ensures we never store too many pages in the workflow history while maintaining proper
* traversal.
*
* The workflow stops importing child pages if a parent page is restricted.
* Page restriction checks are performed by `confluenceCheckAndUpsertPageActivity`.
*/
export async function confluenceSyncTopLevelChildPagesWorkflow(
params: confluenceSyncTopLevelChildPagesWorkflowInput
) {
const { spaceName, topLevelPageRefs, visitedAtMs } = params;
const stack = [...topLevelPageRefs];
const stack: StackElement[] = [...topLevelPageRefs];

while (stack.length > 0) {
const currentPageRef = stack.pop();
if (!currentPageRef) {
const current = stack.pop();
if (!current) {
throw new Error("No more pages to parse.");
}

const successfullyUpsert = await confluenceCheckAndUpsertPageActivity({
...params,
spaceName,
pageRef: currentPageRef,
visitedAtMs,
});
if (!successfullyUpsert) {
continue;
}
// Check if it's a page reference or cursor.
const isPageRef = "id" in current;

// Fetch child pages of the current top level page.
let nextPageCursor: string | null = "";
do {
const { childPageRefs, nextPageCursor: nextCursor } =
await confluenceGetActiveChildPageRefsActivity({
...params,
parentPageId: currentPageRef.id,
pageCursor: nextPageCursor,
});
// If it's a page, process it first.
if (isPageRef) {
const successfullyUpsert = await confluenceCheckAndUpsertPageActivity({
...params,
spaceName,
pageRef: current,
visitedAtMs,
});
if (!successfullyUpsert) {
continue;
}
}

nextPageCursor = nextCursor; // Prepare for the next iteration.
// Get child pages using either initial empty cursor or saved cursor.
const { childPageRefs, nextPageCursor } =
await confluenceGetActiveChildPageRefsActivity({
...params,
parentPageId: isPageRef ? current.id : current.parentId,
pageCursor: isPageRef ? "" : current.cursor,
});

stack.push(...childPageRefs);
} while (nextPageCursor !== null);
// Add children and next cursor if there are more.
stack.push(...childPageRefs);
if (nextPageCursor !== null) {
stack.push({
parentId: isPageRef ? current.id : current.parentId,
cursor: nextPageCursor,
});
}

// If additional pages are pending and workflow limits are reached, continue in a new workflow.
// Check if we would exceed limits by continuing.
const hasReachedWorkflowLimits =
workflowInfo().historyLength > TEMPORAL_WORKFLOW_MAX_HISTORY_LENGTH ||
workflowInfo().historySize >
TEMPORAL_WORKFLOW_MAX_HISTORY_SIZE_MB * 1024 * 1024;
if (
stack.length > 0 &&
workflowInfo().historyLength > TEMPORAL_WORKFLOW_MAX_HISTORY_LENGTH
hasReachedWorkflowLimits &&
(stack.length > 0 || childPageRefs.length > 0 || nextPageCursor !== null)
) {
await continueAsNew<typeof confluenceSyncTopLevelChildPagesWorkflow>({
...params,
Expand Down

0 comments on commit 4cd69c3

Please sign in to comment.