-
Notifications
You must be signed in to change notification settings - Fork 180
/
computePropertiesWorkflow.ts
114 lines (98 loc) · 3.04 KB
/
computePropertiesWorkflow.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/* eslint-disable no-await-in-loop */
import {
continueAsNew,
LoggerSinks,
proxyActivities,
proxySinks,
sleep,
} from "@temporalio/workflow";
import * as wf from "@temporalio/workflow";
// Only import the activity types
import type * as activities from "../temporal/activities";
import { EnrichedJourney } from "../types";
const { defaultWorkerLogger: logger } = proxySinks<LoggerSinks>();
const {
computePropertiesIncremental,
computePropertiesIncrementalArgs,
config,
} = proxyActivities<typeof activities>({
startToCloseTimeout: "5 minutes",
});
export const userJourneyInitialize = wf.defineSignal<[string]>(
"userJourneyInitialize"
);
export function generateComputePropertiesId(workspaceId: string) {
return `compute-properties-workflow-${workspaceId}`;
}
export const POLLING_JITTER_COEFFICIENT = 1000;
export interface ComputedPropertiesWorkflowParams {
workspaceId: string;
// TODO deprecated, remove
tableVersion: string;
maxPollingAttempts?: number;
shouldContinueAsNew?: boolean;
basePollingPeriod?: number;
pollingJitterCoefficient?: number;
subscribedJourneys?: EnrichedJourney[];
}
export async function computePropertiesWorkflow({
tableVersion,
workspaceId,
shouldContinueAsNew = false,
maxPollingAttempts = 1500,
// useful primarily for testing
basePollingPeriod: basePollingPeriodOverride,
pollingJitterCoefficient = POLLING_JITTER_COEFFICIENT,
subscribedJourneys = [],
}: ComputedPropertiesWorkflowParams): Promise<ComputedPropertiesWorkflowParams> {
for (let i = 0; i < maxPollingAttempts; i++) {
const currentTime = Date.now();
logger.info("segmentsNotificationWorkflow polling attempt", {
i,
currentTime,
maxPollingAttempts,
});
const { computePropertiesInterval } = await config([
"computePropertiesInterval",
]);
try {
const args = await computePropertiesIncrementalArgs({
workspaceId,
});
await computePropertiesIncremental({
...args,
now: currentTime,
});
} catch (e) {
logger.error("computePropertiesWorkflow failed to re-compute", {
err: e,
});
}
// only use override if shouldContinueAsNew is false, in order to allow value
// to be reconfigured at deploy time
const basePollingInterval =
shouldContinueAsNew || !basePollingPeriodOverride
? computePropertiesInterval
: basePollingPeriodOverride;
const period =
basePollingInterval + Math.random() * pollingJitterCoefficient;
logger.debug("segmentsNotificationWorkflow sleeping started", {
period,
});
// sleep for 10 seconds + up to 1 seconds of jitter for next polling period
await sleep(period);
}
const params: ComputedPropertiesWorkflowParams = {
basePollingPeriod: basePollingPeriodOverride,
maxPollingAttempts,
pollingJitterCoefficient,
shouldContinueAsNew,
subscribedJourneys,
tableVersion,
workspaceId,
};
if (shouldContinueAsNew) {
await continueAsNew<typeof computePropertiesWorkflow>(params);
}
return params;
}