Skip to content

Commit

Permalink
[#IC-369] UpsertServiceOrchestrator to consume pending tasks (#185)
Browse files Browse the repository at this point in the history
  • Loading branch information
BurnedMarshal authored Mar 24, 2022
1 parent edcbdc4 commit ae4dd95
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 0 deletions.
10 changes: 10 additions & 0 deletions UpdateVisibleServicesActivity/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"bindings": [
{
"name": "name",
"type": "activityTrigger",
"direction": "in"
}
],
"scriptFile": "../dist/UpdateVisibleServicesActivity/index.js"
}
47 changes: 47 additions & 0 deletions UpdateVisibleServicesActivity/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { Context } from "@azure/functions";

import * as t from "io-ts";

import { VisibleService } from "@pagopa/io-functions-commons/dist/src/models/visible_service";

const AddVisibleServiceInput = t.interface({
action: t.literal("UPSERT"),
visibleService: VisibleService
});

const RemoveVisibleServiceInput = t.interface({
action: t.literal("DELETE"),
visibleService: VisibleService
});

export const Input = t.taggedUnion("action", [
AddVisibleServiceInput,
RemoveVisibleServiceInput
]);

export type Input = t.TypeOf<typeof Input>;

const ResultSuccess = t.interface({
kind: t.literal("SUCCESS")
});

const ResultFailure = t.interface({
kind: t.literal("FAILURE"),
reason: t.string
});

export const Result = t.taggedUnion("kind", [ResultSuccess, ResultFailure]);

export type Result = t.TypeOf<typeof Result>;

/**
* Temporary Activity Handler to skip all pending UpsertServiceOrchestrator
* executions.
*/
export const getUpdateVisibleServicesActivityHandler = () => async (
_: Context,
__: unknown
): Promise<unknown> =>
Result.encode({
kind: "SUCCESS"
});
11 changes: 11 additions & 0 deletions UpdateVisibleServicesActivity/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* Temporary Activity to allow the orchestrator execution queue cleanup.
* This Activity will be removed again when the durable functions
* runtime complete all the pending orchestrations into the taskhub.
*/

import { getUpdateVisibleServicesActivityHandler } from "./handler";

const activityFunctionHandler = getUpdateVisibleServicesActivityHandler();

export default activityFunctionHandler;
10 changes: 10 additions & 0 deletions UpsertServiceOrchestrator/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
],
"scriptFile": "../dist/UpsertServiceOrchestrator/index.js"
}
181 changes: 181 additions & 0 deletions UpsertServiceOrchestrator/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import * as df from "durable-functions";

import { IOrchestrationFunctionContext } from "durable-functions/lib/src/classes";

import * as E from "fp-ts/lib/Either";
import { isSome, none, Option, some } from "fp-ts/lib/Option";

import { readableReport } from "@pagopa/ts-commons/lib/reporters";

import * as t from "io-ts";

import { UTCISODateFromString } from "@pagopa/ts-commons/lib/dates";

import { RetrievedService } from "@pagopa/io-functions-commons/dist/src/models/service";
import { VisibleService } from "@pagopa/io-functions-commons/dist/src/models/visible_service";
import {
Input as UpdateVisibleServicesActivityInput,
Result as UpdateVisibleServicesActivityResult
} from "../UpdateVisibleServicesActivity/handler";

// eslint-disable-next-line prefer-arrow/prefer-arrow-functions
export function retrievedServiceToVisibleService(
retrievedService: RetrievedService
): VisibleService {
const {
departmentName,
id,
organizationFiscalCode,
organizationName,
requireSecureChannels,
serviceId,
serviceMetadata,
serviceName,
version
} = retrievedService;
return {
departmentName,
id,
organizationFiscalCode,
organizationName,
requireSecureChannels,
serviceId,
serviceMetadata,
serviceName,
version
};
}

/**
* Carries information about created or updated service.
*
* When oldService is defined, the service has been updated, or it has been
* created otherwise.
*/
export const UpsertServiceEvent = t.intersection([
t.interface({
newService: RetrievedService,
updatedAt: UTCISODateFromString
}),
t.partial({
oldService: RetrievedService
})
]);

export type UpsertServiceEvent = t.TypeOf<typeof UpsertServiceEvent>;

/**
* Using the data of new and old service calculate the action to perform to the visible services
*/
// eslint-disable-next-line prefer-arrow/prefer-arrow-functions
function computeMaybeAction(
newService: RetrievedService,
oldService?: RetrievedService
): Option<UpdateVisibleServicesActivityInput["action"]> {
if (oldService === undefined) {
// A service has been created
return newService.isVisible ? some("UPSERT") : none;
}

// A service has been update

// Visibility not changed
if (oldService.isVisible === newService.isVisible) {
return newService.isVisible ? some("UPSERT") : none;
}

// Visibility changed
// If the old service was NOT visible and the new service IS visible return UPSERT, return DELETE otherwise
return !oldService.isVisible && newService.isVisible
? some("UPSERT")
: some("DELETE");
}

export const handler = function*(
context: IOrchestrationFunctionContext
): Generator<unknown> {
const input = context.df.getInput();

const retryOptions = new df.RetryOptions(5000, 10);
// eslint-disable-next-line functional/immutable-data
retryOptions.backoffCoefficient = 1.5;

// Check if input is valid
const errorOrUpsertServiceEvent = UpsertServiceEvent.decode(input);

if (E.isLeft(errorOrUpsertServiceEvent)) {
context.log.error(
`UpdateVisibleServicesActivity|Cannot parse input|ERROR=${readableReport(
errorOrUpsertServiceEvent.left
)}`
);
// We will never be able to recover from this, so don't trigger a retry
return [];
}

const upsertServiceEvent = errorOrUpsertServiceEvent.right;
const { newService, oldService } = upsertServiceEvent;

// Update visible services if needed
const maybeAction = computeMaybeAction(newService, oldService);
const visibleService = retrievedServiceToVisibleService(newService);
if (isSome(maybeAction)) {
const action = maybeAction.value;
context.log.verbose(
`UpdateVisibleServicesActivity|Visible services must be updated|SERVICE_ID=${visibleService.serviceId}|ACTION=${action}`
);
const updateVisibleServicesActivityInput = UpdateVisibleServicesActivityInput.encode(
{
action,
visibleService
}
);

try {
const updateVisibleServicesActivityResultJson = yield context.df.callActivityWithRetry(
"UpdateVisibleServicesActivity",
retryOptions,
updateVisibleServicesActivityInput
);

const errorOrUpdateVisibleServicesActivityResult = UpdateVisibleServicesActivityResult.decode(
updateVisibleServicesActivityResultJson
);

if (E.isLeft(errorOrUpdateVisibleServicesActivityResult)) {
context.log.error(
`UpdateVisibleServicesActivity|Can't decode result|SERVICE_ID=${
visibleService.serviceId
}|ERROR=${readableReport(
errorOrUpdateVisibleServicesActivityResult.left
)}`
);

return [];
}

const updateVisibleServicesActivityResult =
errorOrUpdateVisibleServicesActivityResult.right;

if (updateVisibleServicesActivityResult.kind === "SUCCESS") {
context.log.verbose(
`UpdateVisibleServicesActivity|Update success|SERVICE_ID=${visibleService.serviceId}|ACTION=${action}`
);
} else {
context.log.error(
`UpdateVisibleServicesActivity|Activity failure|SERVICE_ID=${visibleService.serviceId}|ERROR=${updateVisibleServicesActivityResult.reason}`
);
}
} catch (e) {
context.log.error(
`UpdateVisibleServicesActivity|Max retry exceeded|SERVICE_ID=${visibleService.serviceId}|ERROR=${e}`
);
}
} else {
context.log.verbose(
`UpdateVisibleServicesActivity|No need to update visible services|SERVICE_ID=${visibleService.serviceId}`
);
}

return [];
};
13 changes: 13 additions & 0 deletions UpsertServiceOrchestrator/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/**
* Temporary Orchestrator to allow the orchestrator execution queue cleanup.
* This Orchestrator will be removed again when the durable functions
* runtime complete all the pending orchestrations into the taskhub.
*/

import * as df from "durable-functions";

import { handler } from "./handler";

const orchestrator = df.orchestrator(handler);

export default orchestrator;

0 comments on commit ae4dd95

Please sign in to comment.