diff --git a/.github/workflows/multitenancy.yml b/.github/workflows/multitenancy.yml index 1689abb9..dbaee061 100644 --- a/.github/workflows/multitenancy.yml +++ b/.github/workflows/multitenancy.yml @@ -1,4 +1,4 @@ -name: Run Local Integration Tests Multitenant +name: Multitenant Integration Tests on: [push] diff --git a/.github/workflows/saas.yml b/.github/workflows/saas.yml index b91889a2..a624afc9 100644 --- a/.github/workflows/saas.yml +++ b/.github/workflows/saas.yml @@ -1,4 +1,4 @@ -name: Integration Tests +name: SaaS Integration Tests on: push: diff --git a/.github/workflows/singletenant.yml b/.github/workflows/singletenant.yml index 32c6138a..db4cd863 100644 --- a/.github/workflows/singletenant.yml +++ b/.github/workflows/singletenant.yml @@ -1,4 +1,4 @@ -name: Run Local Integration Tests +name: Single Tenant Integration Tests on: [push] diff --git a/.vscode/settings.json b/.vscode/settings.json index 4b1e6914..8e962fe3 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,7 +5,8 @@ "tasklist", "zeebe", "github", - "modeler" + "modeler", + "operate" ], "editor.formatOnSave": true, diff --git a/README.md b/README.md index ebdc7ef0..a4b9b93a 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![NPM](https://nodei.co/npm/@camunda8/sdk.png)](https://www.npmjs.com/package/@camunda8/sdk) -This is the official Camunda 8 JavaScript SDK. +This is the official Camunda 8 JavaScript SDK. It is written in TypeScript and runs on NodeJS ([why not in a web browser?](https://github.com/camunda-community-hub/camunda-8-js-sdk/issues/79)). ## Using the SDK in your project @@ -12,6 +12,10 @@ Install the SDK as a dependency: npm i @camunda8/sdk ``` +## A note on entity key types in the JavaScript SDK + +Entity keys in Camunda 8 are stored and represented as int64 numbers. The range of int64 extends to numbers that cannot be represented by the JavaScript `number` type. To deal with this, int64 keys are serialised by the SDK as the JavaScript `string` type. See [this issue](https://github.com/camunda-community-hub/camunda-8-js-sdk/issues/78) for more details. + ## Usage In this release, the functionality of the Camunda Platform 8 is exposed via dedicated clients for the component APIs. diff --git a/package-lock.json b/package-lock.json index 4a782fa8..4c8ba784 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,9 +19,11 @@ "got": "^11.8.6", "lodash.mergewith": "^4.6.2", "long": "^4.0.0", + "lossless-json": "^4.0.1", "neon-env": "^0.1.3", "node-fetch": "^2.7.0", "promise-retry": "^1.1.1", + "reflect-metadata": "^0.2.1", "stack-trace": "0.0.10", "typed-duration": "^1.0.12", "uuid": "^7.0.3" @@ -10585,6 +10587,11 @@ "node": ">=0.10.0" } }, + "node_modules/lossless-json": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/lossless-json/-/lossless-json-4.0.1.tgz", + "integrity": "sha512-l0L+ppmgPDnb+JGxNLndPtJZGNf6+ZmVaQzoxQm3u6TXmhdnsA+YtdVR8DjzZd/em58686CQhOFDPewfJ4l7MA==" + }, "node_modules/lowercase-keys": { "version": "2.0.0", "license": "MIT", @@ -14979,6 +14986,11 @@ "esprima": "~4.0.0" } }, + "node_modules/reflect-metadata": { + "version": "0.2.1", + "resolved": "https://registry.npmjs.org/reflect-metadata/-/reflect-metadata-0.2.1.tgz", + "integrity": "sha512-i5lLI6iw9AU3Uu4szRNPPEkomnkjRTaVt9hy/bn5g/oSzekBSMeLZblcjP74AW0vBabqERLLIrz+gR8QYR54Tw==" + }, "node_modules/regexp.prototype.flags": { "version": "1.5.1", "dev": true, diff --git a/package.json b/package.json index 4df23988..a19c27df 100644 --- a/package.json +++ b/package.json @@ -116,9 +116,11 @@ "got": "^11.8.6", "lodash.mergewith": "^4.6.2", "long": "^4.0.0", + "lossless-json": "^4.0.1", "neon-env": "^0.1.3", "node-fetch": "^2.7.0", "promise-retry": "^1.1.1", + "reflect-metadata": "^0.2.1", "stack-trace": "0.0.10", "typed-duration": "^1.0.12", "uuid": "^7.0.3" diff --git a/src/__tests__/lib/LosslessJsonParser.spec.ts b/src/__tests__/lib/LosslessJsonParser.spec.ts new file mode 100644 index 00000000..28c8a15a --- /dev/null +++ b/src/__tests__/lib/LosslessJsonParser.spec.ts @@ -0,0 +1,73 @@ +import { + ChildDto, + Int32, + Int64, + LosslessDto, + parseArrayWithAnnotations, + parseWithAnnotations, +} from 'lib' +import { ProcessInstanceStatistics } from 'operate/lib/OperateDto' + +describe('LosslessJSONParser methods', () => { + test('It correctly handles nested Dtos', () => { + class DecisionInstanceOutput extends LosslessDto { + @Int32 + public someInt32Field!: number + } + + class ProcessDefinitionDto extends LosslessDto { + @ChildDto(DecisionInstanceOutput) + public decisionsOutputs!: DecisionInstanceOutput[] + + @Int64 + public total!: string // Example of another field + } + + const json = `{ + "total": 3, + "decisionsOutputs": [ + { + "someInt32Field": 123 + } + ] + }` + + const parsedDto = parseWithAnnotations(json, ProcessDefinitionDto) + expect(parsedDto.decisionsOutputs[0].someInt32Field).toBe(123) // 123 (number) + expect(parsedDto.total).toBe('3') // 3 (string) + }) + + test('it can handle an array', () => { + class ProcessInstanceStatisticsWithInt32 extends ProcessInstanceStatistics { + @Int32 + smallNumber!: number + } + const json = `[ + { + "activityId": "activityId", + "active": 1, + "canceled": 2, + "incidents": 3, + "completed": 4, + "smallNumber": 100 + }, + { + "activityId": "activityId2", + "active": 11, + "canceled": 12, + "incidents": 13, + "completed": 14 + } + ]` + const parsedDto = parseArrayWithAnnotations( + json, + ProcessInstanceStatisticsWithInt32 + ) + expect(Array.isArray(parsedDto)).toBe(true) + expect(parsedDto[0].activityId).toBe('activityId') + expect(parsedDto[0].active).toBe('1') + expect(parsedDto[0].smallNumber).toBe(100) + expect(parsedDto[1].activityId).toBe('activityId2') + expect(parsedDto[1].active).toBe('11') + }) +}) diff --git a/src/__tests__/operate/multitenancy/operate-mt.spec.ts b/src/__tests__/operate/multitenancy/operate-mt.spec.ts new file mode 100644 index 00000000..25f2f5d7 --- /dev/null +++ b/src/__tests__/operate/multitenancy/operate-mt.spec.ts @@ -0,0 +1,65 @@ +import { delay, restoreZeebeLogging, suppressZeebeLogging } from 'lib' +import { ZeebeGrpcClient } from 'zeebe' + +import { OperateApiClient } from '../../../operate' + +jest.setTimeout(15000) +suppressZeebeLogging() + +afterAll(() => restoreZeebeLogging()) + +describe('Operate multi-tenancy', () => { + test('It can get the process instance from green tenant and not the red tenant', async () => { + const operateGreen = new OperateApiClient({ + config: { + CAMUNDA_TENANT_ID: 'green', + }, + }) + const operateRed = new OperateApiClient({ + config: { + CAMUNDA_TENANT_ID: 'red', + ZEEBE_CLIENT_ID: 'redzeebe', + ZEEBE_CLIENT_SECRET: 'redzecret', + }, + }) + const zbc = new ZeebeGrpcClient() + + // Deploy to green tenant + await zbc.deployResource({ + processFilename: 'src/__tests__/testdata/OperateMultitenancy.bpmn', + tenantId: 'green', + }) + + // Start an instance in green tenant + const p = await zbc.createProcessInstance({ + bpmnProcessId: 'operate-mt', + tenantId: 'green', + variables: {}, + }) + + await delay(8000) + // Get the process instance from Operate green tenant + + const greenprocess = await operateGreen.searchProcessInstances({ + filter: { key: p.processInstanceKey }, + }) + + // getProcessInstance( + // p.processInstanceKey + // ) + + expect(greenprocess).toBeDefined() + expect(greenprocess.items[0].key.toString()).toBe(p.processInstanceKey) + + // Can't find it in red tenant + const redprocess = await operateRed + .getProcessInstance(p.processInstanceKey) + .catch((e) => { + expect(e.message.includes('404')).toBe(true) + return false + }) + expect(redprocess).toBe(false) + // Cancel the instance in green tenant + await zbc.cancelProcessInstance(p.processInstanceKey) + }) +}) diff --git a/src/__tests__/operate/operate-integrate.spec.ts b/src/__tests__/operate/operate-integrate.spec.ts index 408350ab..1fbff5f5 100644 --- a/src/__tests__/operate/operate-integrate.spec.ts +++ b/src/__tests__/operate/operate-integrate.spec.ts @@ -1,5 +1,7 @@ +import { LosslessNumber } from 'lossless-json' + import { OperateApiClient } from '../../operate' -import { ProcessDefinition, Query } from '../../operate/lib/APIObjects' +import { ProcessDefinition, Query } from '../../operate/lib/OperateDto' jest.setTimeout(15000) describe('Operate Integration', () => { @@ -8,7 +10,7 @@ describe('Operate Integration', () => { const res = await c.searchIncidents({ filter: { - processInstanceKey: 2251799816400111, + processInstanceKey: new LosslessNumber('2251799816400111'), }, }) console.log(JSON.stringify(res, null, 2)) @@ -19,7 +21,7 @@ describe('Operate Integration', () => { const query: Query = { filter: {}, - size: 50, + size: 5, sort: [ { field: 'bpmnProcessId', diff --git a/src/__tests__/operate/operate.spec.ts b/src/__tests__/operate/operate.spec.ts index dd587d26..ff84fe0f 100644 --- a/src/__tests__/operate/operate.spec.ts +++ b/src/__tests__/operate/operate.spec.ts @@ -1,4 +1,5 @@ import { EnvironmentSetup } from 'lib' +import { TestableOperateApiClient } from 'operate/lib/TestableOperateApiClient' import { OperateApiClient } from '../../operate' @@ -29,3 +30,50 @@ test('Can get construct a client', () => { }) expect(client).toBeTruthy() }) + +test('Can add tenant id to filter', () => { + const client = new TestableOperateApiClient({ + config: { + CAMUNDA_OAUTH_DISABLED: true, + CAMUNDA_OPERATE_BASE_URL: 'http://localhost', + }, + }) + const query = client.addTenantIdToFilter({ filter: {} }, 'tenantId') + expect(query.filter?.tenantId).toBe('tenantId') +}) + +test('Adds tenant id if no filter', () => { + const client = new TestableOperateApiClient({ + config: { + CAMUNDA_OAUTH_DISABLED: true, + CAMUNDA_OPERATE_BASE_URL: 'http://localhost', + }, + }) + const query = client.addTenantIdToFilter({}, 'tenantId2') + expect(query.filter?.tenantId).toBe('tenantId2') +}) + +test('Does not add a tenantId if none given', async () => { + const client = new TestableOperateApiClient({ + config: { + CAMUNDA_OAUTH_DISABLED: true, + CAMUNDA_OPERATE_BASE_URL: 'http://localhost', + CAMUNDA_TENANT_ID: '', // best we can do to erase the tenant id from env + }, + }) + const query = client.addTenantIdToFilter({ filter: { id: 3 } }) + expect(query.filter?.tenantId).toBe('') +}) + +test('Adds tenant id from environment', () => { + const client = new TestableOperateApiClient({ + config: { + CAMUNDA_OAUTH_DISABLED: true, + CAMUNDA_OPERATE_BASE_URL: 'http://localhost', + CAMUNDA_TENANT_ID: 'red5', // best we can do to erase the tenant id from env + }, + }) + const query = client.addTenantIdToFilter({ filter: { id: 3 } }) + expect(query.filter?.tenantId).toBe('red5') + expect(query.filter?.id).toBe(3) +}) diff --git a/src/__tests__/testdata/OperateMultitenancy.bpmn b/src/__tests__/testdata/OperateMultitenancy.bpmn new file mode 100644 index 00000000..fb225926 --- /dev/null +++ b/src/__tests__/testdata/OperateMultitenancy.bpmn @@ -0,0 +1,50 @@ + + + + + Flow_0fgdzij + + + + Flow_0l1o4z0 + + + + + + + Flow_0fgdzij + Flow_0l1o4z0 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/__tests__/zeebe/integration/Client-BroadcastSignal.spec.ts b/src/__tests__/zeebe/integration/Client-BroadcastSignal.spec.ts index b4f3601d..92086bd5 100644 --- a/src/__tests__/zeebe/integration/Client-BroadcastSignal.spec.ts +++ b/src/__tests__/zeebe/integration/Client-BroadcastSignal.spec.ts @@ -14,7 +14,7 @@ beforeAll(async () => { const res = await zbc.deployResource({ processFilename: './src/__tests__/testdata/Signal.bpmn', }) - pid = res.deployments[0].process.bpmnProcessId + pid = res.deployments[0].process.processDefinitionKey await cancelProcesses(pid) }) diff --git a/src/__tests__/zeebe/integration/Client-CreateProcessInstanceWithResult.spec.ts b/src/__tests__/zeebe/integration/Client-CreateProcessInstanceWithResult.spec.ts index 3c5a33a7..38bcd139 100644 --- a/src/__tests__/zeebe/integration/Client-CreateProcessInstanceWithResult.spec.ts +++ b/src/__tests__/zeebe/integration/Client-CreateProcessInstanceWithResult.spec.ts @@ -26,23 +26,23 @@ beforeAll(async () => { test3 = await zbc.deployResource({ processFilename: './src/__tests__/testdata/await-outcome.bpmn', }) - await cancelProcesses(test1.deployments[0].process.bpmnProcessId) - await cancelProcesses(test2.deployments[0].process.bpmnProcessId) - await cancelProcesses(test3.deployments[0].process.bpmnProcessId) + await cancelProcesses(test1.deployments[0].process.processDefinitionKey) + await cancelProcesses(test2.deployments[0].process.processDefinitionKey) + await cancelProcesses(test3.deployments[0].process.processDefinitionKey) }) afterAll(async () => { await zbc.close() // Makes sure we don't forget to close connection restoreZeebeLogging() - await cancelProcesses(test1.deployments[0].process.bpmnProcessId) - await cancelProcesses(test2.deployments[0].process.bpmnProcessId) - await cancelProcesses(test3.deployments[0].process.bpmnProcessId) + await cancelProcesses(test1.deployments[0].process.processDefinitionKey) + await cancelProcesses(test2.deployments[0].process.processDefinitionKey) + await cancelProcesses(test3.deployments[0].process.processDefinitionKey) }) test('Awaits a process outcome', async () => { - const processId = test1.deployments[0].process.bpmnProcessId + const bpmnProcessId = test1.deployments[0].process.bpmnProcessId const result = await zbc.createProcessInstanceWithResult({ - bpmnProcessId: processId, + bpmnProcessId, variables: { sourceValue: 5, }, @@ -51,9 +51,9 @@ test('Awaits a process outcome', async () => { }) test('can override the gateway timeout', async () => { - const processId = test2.deployments[0].process.bpmnProcessId + const bpmnProcessId = test2.deployments[0].process.bpmnProcessId const result = await zbc.createProcessInstanceWithResult({ - bpmnProcessId: processId, + bpmnProcessId, requestTimeout: 25000, variables: { otherValue: 'rome', @@ -64,9 +64,9 @@ test('can override the gateway timeout', async () => { }) test('fetches a subset of variables', async () => { - const processId = test3.deployments[0].process.bpmnProcessId + const bpmnProcessId = test3.deployments[0].process.bpmnProcessId const result = await zbc.createProcessInstanceWithResult({ - bpmnProcessId: processId, + bpmnProcessId, fetchVariables: ['otherValue'], variables: { otherValue: 'rome', diff --git a/src/__tests__/zeebe/integration/Client-DeployResource.spec.ts b/src/__tests__/zeebe/integration/Client-DeployResource.spec.ts index f7e20c8b..88a8b61c 100644 --- a/src/__tests__/zeebe/integration/Client-DeployResource.spec.ts +++ b/src/__tests__/zeebe/integration/Client-DeployResource.spec.ts @@ -3,7 +3,6 @@ import fs from 'fs' import { restoreZeebeLogging, suppressZeebeLogging } from 'lib' import { BpmnParser, ZeebeGrpcClient } from '../../../zeebe/index' -import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses' process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE' jest.setTimeout(20000) @@ -18,7 +17,7 @@ const bpmnString = fs.readFileSync( const expectedPid = BpmnParser.getProcessId(bpmnString) beforeAll(async () => { - cancelProcesses(expectedPid) + // cancelProcesses(expectedPid) }) afterAll(async () => { diff --git a/src/__tests__/zeebe/integration/Client-MessageStart.spec.ts b/src/__tests__/zeebe/integration/Client-MessageStart.spec.ts index 0a801878..c586fb51 100644 --- a/src/__tests__/zeebe/integration/Client-MessageStart.spec.ts +++ b/src/__tests__/zeebe/integration/Client-MessageStart.spec.ts @@ -18,13 +18,13 @@ beforeAll(async () => { test1 = await zbc.deployResource({ processFilename: './src/__tests__/testdata/Client-MessageStart.bpmn', }) - await cancelProcesses(test1.deployments[0].process.bpmnProcessId) + await cancelProcesses(test1.deployments[0].process.processDefinitionKey) }) afterAll(async () => { await zbc.close() restoreZeebeLogging() - await cancelProcesses(test1.deployments[0].process.bpmnProcessId) + await cancelProcesses(test1.deployments[0].process.processDefinitionKey) }) test('Can start a process with a message', (done) => { diff --git a/src/__tests__/zeebe/integration/Client-ModifyProcessInstance.spec.ts b/src/__tests__/zeebe/integration/Client-ModifyProcessInstance.spec.ts index 9400c54a..4c60e379 100644 --- a/src/__tests__/zeebe/integration/Client-ModifyProcessInstance.spec.ts +++ b/src/__tests__/zeebe/integration/Client-ModifyProcessInstance.spec.ts @@ -13,7 +13,7 @@ beforeAll(async () => { const res = await zbc.deployResource({ processFilename: './src/__tests__/testdata/Client-SkipFirstTask.bpmn', }) - processModelId = res.deployments[0].process.bpmnProcessId + processModelId = res.deployments[0].process.processDefinitionKey }) afterAll(async () => { zbc.cancelProcessInstance(pid).catch((_) => _) diff --git a/src/__tests__/zeebe/integration/Client-PublishMessage.spec.ts b/src/__tests__/zeebe/integration/Client-PublishMessage.spec.ts index 9f9b84f2..0c727676 100644 --- a/src/__tests__/zeebe/integration/Client-PublishMessage.spec.ts +++ b/src/__tests__/zeebe/integration/Client-PublishMessage.spec.ts @@ -1,29 +1,28 @@ import { restoreZeebeLogging, suppressZeebeLogging } from 'lib' import { v4 as uuid } from 'uuid' -import { - DeployResourceResponse, - ProcessDeployment, - ZeebeGrpcClient, -} from '../../../zeebe' +import { ZeebeGrpcClient } from '../../../zeebe' import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses' jest.setTimeout(45000) suppressZeebeLogging() const zbc = new ZeebeGrpcClient() -let deploy: DeployResourceResponse +let processDefinitionKey: string -beforeAll(async () => { - deploy = await zbc.deployResource({ - processFilename: './src/__tests__/testdata/Client-MessageStart.bpmn', - }) -}) +beforeAll( + async () => + ({ processDefinitionKey } = ( + await zbc.deployResource({ + processFilename: './src/__tests__/testdata/Client-MessageStart.bpmn', + }) + ).deployments[0].process) +) afterAll(async () => { await zbc.close() restoreZeebeLogging() - await cancelProcesses(deploy.deployments[0].process.bpmnProcessId) + await cancelProcesses(processDefinitionKey) }) test('Can publish a message', () => diff --git a/src/__tests__/zeebe/integration/Client-ThrowError.spec.ts b/src/__tests__/zeebe/integration/Client-ThrowError.spec.ts index 2d195812..6da61b98 100644 --- a/src/__tests__/zeebe/integration/Client-ThrowError.spec.ts +++ b/src/__tests__/zeebe/integration/Client-ThrowError.spec.ts @@ -7,19 +7,20 @@ import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses' process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE' jest.setTimeout(25000) -let processId: string +let bpmnProcessId: string +let processDefinitionKey: string let zbc: ZeebeGrpcClient beforeAll(async () => { suppressZeebeLogging() const zb = new ZeebeGrpcClient() - processId = ( + ;({ bpmnProcessId, processDefinitionKey } = ( await zb.deployResource({ processFilename: './src/__tests__/testdata/Client-ThrowError.bpmn', }) - ).deployments[0].process.bpmnProcessId - cancelProcesses(processId) + ).deployments[0].process) + cancelProcesses(processDefinitionKey) await zb.close() }) @@ -33,7 +34,7 @@ afterEach(async () => { afterAll(async () => { restoreZeebeLogging() - cancelProcesses(processId) + cancelProcesses(processDefinitionKey) }) test('Throws a business error that is caught in the process', async () => { @@ -51,7 +52,7 @@ test('Throws a business error that is caught in the process', async () => { }), }) const result = await zbc.createProcessInstanceWithResult({ - bpmnProcessId: processId, + bpmnProcessId, requestTimeout: 20000, variables: {}, }) @@ -77,7 +78,7 @@ test('Can set variables when throwing a BPMN Error', async () => { }), }) const result = await zbc.createProcessInstanceWithResult({ - bpmnProcessId: processId, + bpmnProcessId, requestTimeout: 20000, variables: {}, }) diff --git a/src/__tests__/zeebe/integration/Client-integration.spec.ts b/src/__tests__/zeebe/integration/Client-integration.spec.ts index 4a78669a..b215e415 100644 --- a/src/__tests__/zeebe/integration/Client-integration.spec.ts +++ b/src/__tests__/zeebe/integration/Client-integration.spec.ts @@ -7,7 +7,8 @@ process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE' jest.setTimeout(30000) let zbc: ZeebeGrpcClient -let processId: string +let processDefinitionKey: string +let bpmnProcessId: string beforeAll(async () => { suppressZeebeLogging() @@ -15,8 +16,8 @@ beforeAll(async () => { const res = await client.deployResource({ processFilename: './src/__tests__/testdata/hello-world.bpmn', }) - processId = res.deployments[0].process.bpmnProcessId - await cancelProcesses(processId) + ;({ bpmnProcessId, processDefinitionKey } = res.deployments[0].process) + await cancelProcesses(processDefinitionKey) await client.close() }) @@ -26,12 +27,12 @@ beforeEach(() => { afterEach(async () => { await zbc.close() - await cancelProcesses(processId) + await cancelProcesses(processDefinitionKey) }) afterAll(async () => { restoreZeebeLogging() - await cancelProcesses(processId) + await cancelProcesses(processDefinitionKey) }) test('Can get the broker topology', async () => { @@ -52,7 +53,7 @@ test('Can create a worker', async () => { test('Can cancel a process', async () => { const client = new ZeebeGrpcClient() const process = await client.createProcessInstance({ - bpmnProcessId: processId, + bpmnProcessId, variables: {}, }) const key = process.processInstanceKey diff --git a/src/__tests__/zeebe/integration/Client-setVariables.spec.ts b/src/__tests__/zeebe/integration/Client-setVariables.spec.ts index bd71f174..e9a4d138 100644 --- a/src/__tests__/zeebe/integration/Client-setVariables.spec.ts +++ b/src/__tests__/zeebe/integration/Client-setVariables.spec.ts @@ -21,14 +21,16 @@ suppressZeebeLogging() const zbc = new ZeebeGrpcClient() let wf: CreateProcessInstanceResponse let deploy: DeployResourceResponse -let processId: string +let bpmnProcessId: string +let processDefinitionKey: string beforeAll(async () => { deploy = await zbc.deployResource({ processFilename: './src/__tests__/testdata/conditional-pathway.bpmn', }) - processId = deploy.deployments[0].process.bpmnProcessId - await cancelProcesses(processId) + bpmnProcessId = deploy.deployments[0].process.bpmnProcessId + processDefinitionKey = deploy.deployments[0].process.processDefinitionKey + await cancelProcesses(processDefinitionKey) }) afterAll(async () => { @@ -37,7 +39,7 @@ afterAll(async () => { } await zbc.close() // Make sure to close the connection restoreZeebeLogging() - await cancelProcesses(processId) + await cancelProcesses(processDefinitionKey) }) test('Can update process variables with setVariables', async () => { @@ -45,7 +47,7 @@ test('Can update process variables with setVariables', async () => { wf = await zbc .createProcessInstance({ - bpmnProcessId: processId, + bpmnProcessId, variables: { conditionVariable: true, }, diff --git a/src/__tests__/zeebe/integration/Client-startProcess.spec.ts b/src/__tests__/zeebe/integration/Client-startProcess.spec.ts index 7642b778..9667f3bb 100644 --- a/src/__tests__/zeebe/integration/Client-startProcess.spec.ts +++ b/src/__tests__/zeebe/integration/Client-startProcess.spec.ts @@ -12,21 +12,24 @@ suppressZeebeLogging() let zbc = new ZeebeGrpcClient() let wf: CreateProcessInstanceResponse let id: string | null -let processId: string -let processId2: string +let bpmnProcessId: string +let bpmnProcessId2: string +let processKey: string +let processKey2: string beforeAll(async () => { const res = await zbc.deployResource({ processFilename: './src/__tests__/testdata/hello-world.bpmn', }) - processId = res.deployments[0].process.bpmnProcessId - processId2 = ( - await zbc.deployResource({ - processFilename: './src/__tests__/testdata/Client-SkipFirstTask.bpmn', - }) - ).deployments[0].process.bpmnProcessId - await cancelProcesses(processId) - await cancelProcesses(processId2) + processKey = res.deployments[0].process.processDefinitionKey + bpmnProcessId = res.deployments[0].process.bpmnProcessId + const res2 = await zbc.deployResource({ + processFilename: './src/__tests__/testdata/Client-SkipFirstTask.bpmn', + }) + processKey2 = res2.deployments[0].process.processDefinitionKey + bpmnProcessId2 = res2.deployments[0].process.bpmnProcessId + await cancelProcesses(processKey) + await cancelProcesses(processKey2) await zbc.close() }) @@ -49,17 +52,17 @@ afterAll(async () => { } await zbc.close() // Makes sure we don't forget to close connection restoreZeebeLogging() - await cancelProcesses(processId) - await cancelProcesses(processId) + await cancelProcesses(processKey) + await cancelProcesses(processKey2) }) test('Can start a process', async () => { wf = await zbc.createProcessInstance({ - bpmnProcessId: processId, + bpmnProcessId, variables: {}, }) await zbc.cancelProcessInstance(wf.processInstanceKey) - expect(wf.bpmnProcessId).toBe(processId) + expect(wf.bpmnProcessId).toBe(bpmnProcessId) expect(wf.processInstanceKey).toBeTruthy() }) @@ -75,7 +78,7 @@ test('Can start a process at an arbitrary point', (done) => { const finish = () => worker.close().then(() => done()) zbc .createProcessInstance({ - bpmnProcessId: 'SkipFirstTask', + bpmnProcessId: bpmnProcessId2, variables: { id: random }, startInstructions: [{ elementId: 'second_service_task' }], }) diff --git a/src/__tests__/zeebe/integration/Worker-1.0-complete.spec.ts b/src/__tests__/zeebe/integration/Worker-1.0-complete.spec.ts index 8770101a..b2899234 100644 --- a/src/__tests__/zeebe/integration/Worker-1.0-complete.spec.ts +++ b/src/__tests__/zeebe/integration/Worker-1.0-complete.spec.ts @@ -10,26 +10,34 @@ suppressZeebeLogging() const zbc = new ZeebeGrpcClient() let wf: CreateProcessInstanceResponse | undefined -let processId1: string -let processId2: string -let processId3: string +let processDefinitionKey1: string +let processDefinitionKey2: string +let processDefinitionKey3: string +let bpmnProcessId1: string +let bpmnProcessId2: string +let bpmnProcessId3: string beforeAll(async () => { const res1 = await zbc.deployResource({ processFilename: './src/__tests__/testdata/hello-world.bpmn', }) - processId1 = res1.deployments[0].process.bpmnProcessId - await cancelProcesses(processId1) + processDefinitionKey1 = res1.deployments[0].process.processDefinitionKey + bpmnProcessId1 = res1.deployments[0].process.bpmnProcessId + await cancelProcesses(processDefinitionKey1) + const res2 = await zbc.deployResource({ processFilename: './src/__tests__/testdata/hello-world-complete.bpmn', }) - processId2 = res2.deployments[0].process.bpmnProcessId - await cancelProcesses(processId2) + processDefinitionKey2 = res2.deployments[0].process.processDefinitionKey + bpmnProcessId2 = res2.deployments[0].process.bpmnProcessId + await cancelProcesses(processDefinitionKey2) + const res3 = await zbc.deployResource({ processFilename: './src/__tests__/testdata/conditional-pathway.bpmn', }) - processId3 = res3.deployments[0].process.bpmnProcessId - await cancelProcesses(processId3) + processDefinitionKey3 = res3.deployments[0].process.processDefinitionKey + bpmnProcessId3 = res3.deployments[0].process.bpmnProcessId + await cancelProcesses(processDefinitionKey3) }) afterEach(async () => { @@ -41,14 +49,14 @@ afterEach(async () => { afterAll(async () => { await zbc.close() restoreZeebeLogging() - await cancelProcesses(processId1) - await cancelProcesses(processId2) - await cancelProcesses(processId3) + await cancelProcesses(processDefinitionKey1) + await cancelProcesses(processDefinitionKey2) + await cancelProcesses(processDefinitionKey3) }) test('Can service a task', async () => { wf = await zbc.createProcessInstance({ - bpmnProcessId: processId1, + bpmnProcessId: bpmnProcessId1, variables: {}, }) @@ -73,7 +81,7 @@ test('Can service a task', async () => { }) test('Can service a task with complete.success', async () => { wf = await zbc.createProcessInstance({ - bpmnProcessId: processId2, + bpmnProcessId: bpmnProcessId2, variables: {}, }) await new Promise((resolve) => @@ -92,7 +100,7 @@ test('Can service a task with complete.success', async () => { test('Can update process variables with complete.success()', async () => { wf = await zbc.createProcessInstance({ - bpmnProcessId: processId3, + bpmnProcessId: bpmnProcessId3, variables: { conditionVariable: true, }, diff --git a/src/__tests__/zeebe/integration/Worker-BatchWorker.spec.ts b/src/__tests__/zeebe/integration/Worker-BatchWorker.spec.ts index bc06d6c6..538be6f8 100644 --- a/src/__tests__/zeebe/integration/Worker-BatchWorker.spec.ts +++ b/src/__tests__/zeebe/integration/Worker-BatchWorker.spec.ts @@ -7,26 +7,27 @@ jest.setTimeout(30000) suppressZeebeLogging() const zbc = new ZeebeGrpcClient() -let processId: string +let processDefinitionKey: string +let bpmnProcessId: string beforeAll(async () => { const res = await zbc.deployResource({ processFilename: './src/__tests__/testdata/hello-world.bpmn', }) - processId = res.deployments[0].process.bpmnProcessId - await cancelProcesses(processId) + ;({ bpmnProcessId, processDefinitionKey } = res.deployments[0].process) + await cancelProcesses(processDefinitionKey) }) afterAll(async () => { await zbc.close() // Makes sure we don't forget to close connection restoreZeebeLogging() - await cancelProcesses(processId) + await cancelProcesses(processDefinitionKey) }) test('BatchWorker gets ten jobs', async () => { for (let i = 0; i < 10; i++) { await zbc.createProcessInstance({ - bpmnProcessId: processId, + bpmnProcessId, variables: {}, }) } diff --git a/src/__tests__/zeebe/integration/Worker-Failure-Backoff-Retry.spec.ts b/src/__tests__/zeebe/integration/Worker-Failure-Backoff-Retry.spec.ts index 3be911c5..99ba000c 100644 --- a/src/__tests__/zeebe/integration/Worker-Failure-Backoff-Retry.spec.ts +++ b/src/__tests__/zeebe/integration/Worker-Failure-Backoff-Retry.spec.ts @@ -11,14 +11,16 @@ suppressZeebeLogging() const zbc = new ZeebeGrpcClient() let wf: CreateProcessInstanceResponse | undefined +let processDefinitionKey: string let processId: string beforeAll(async () => { const res = await zbc.deployResource({ processFilename: './src/__tests__/testdata/Worker-Failure1.bpmn', }) + processDefinitionKey = res.deployments[0].process.processDefinitionKey processId = res.deployments[0].process.bpmnProcessId - await cancelProcesses(processId) + await cancelProcesses(processDefinitionKey) }) afterEach(async () => { @@ -30,7 +32,7 @@ afterEach(async () => { afterAll(async () => { await zbc.close() restoreZeebeLogging() - await cancelProcesses(processId) + await cancelProcesses(processDefinitionKey) }) test('Can specify a retryBackoff with complete.failure()', async () => { diff --git a/src/__tests__/zeebe/integration/Worker-Failure-Retries.spec.ts b/src/__tests__/zeebe/integration/Worker-Failure-Retries.spec.ts index 94f9e85b..df61f203 100644 --- a/src/__tests__/zeebe/integration/Worker-Failure-Retries.spec.ts +++ b/src/__tests__/zeebe/integration/Worker-Failure-Retries.spec.ts @@ -33,7 +33,7 @@ test('Decrements the retries count by default', async () => { const res = await zbc.deployResource({ processFilename: './src/__tests__/testdata/Worker-Failure-Retries.bpmn', }) - await cancelProcesses(res.deployments[0].process.bpmnProcessId) + await cancelProcesses(res.deployments[0].process.processDefinitionKey) wf = await zbc.createProcessInstance({ bpmnProcessId: 'worker-failure-retries', variables: { @@ -66,7 +66,7 @@ test('Set the retries to a specific number when provided with one via simple sig const res = await zbc.deployResource({ processFilename: './src/__tests__/testdata/Worker-Failure-Retries.bpmn', }) - cancelProcesses(res.deployments[0].process.bpmnProcessId) + cancelProcesses(res.deployments[0].process.processDefinitionKey) wf = await zbc.createProcessInstance({ bpmnProcessId: 'worker-failure-retries', variables: { @@ -99,7 +99,7 @@ test('Set the retries to a specific number when provided with one via object sig const res = await zbc.deployResource({ processFilename: './src/__tests__/testdata/Worker-Failure-Retries.bpmn', }) - await cancelProcesses(res.deployments[0].process.bpmnProcessId) + await cancelProcesses(res.deployments[0].process.processDefinitionKey) wf = await zbc.createProcessInstance({ bpmnProcessId: 'worker-failure-retries', variables: { diff --git a/src/__tests__/zeebe/integration/Worker-Failure.spec.ts b/src/__tests__/zeebe/integration/Worker-Failure.spec.ts index ce7b2350..e1b71f19 100644 --- a/src/__tests__/zeebe/integration/Worker-Failure.spec.ts +++ b/src/__tests__/zeebe/integration/Worker-Failure.spec.ts @@ -18,6 +18,9 @@ let wf: CreateProcessInstanceResponse | undefined let wf1: DeployResourceResponse let wf2: DeployResourceResponse let wf3: DeployResourceResponse +let processDefinitionKey1: string +let processDefinitionKey2: string +let processDefinitionKey3: string let bpmnProcessId1: string let bpmnProcessId2: string let bpmnProcessId3: string @@ -26,18 +29,28 @@ beforeAll(async () => { wf1 = await zbc.deployResource({ processFilename: './src/__tests__/testdata/Worker-Failure1.bpmn', }) + ;({ + processDefinitionKey: processDefinitionKey1, + bpmnProcessId: bpmnProcessId1, + } = wf1.deployments[0].process) bpmnProcessId1 = wf1.deployments[0].process.bpmnProcessId wf2 = await zbc.deployResource({ processFilename: './src/__tests__/testdata/Worker-Failure2.bpmn', }) - bpmnProcessId2 = wf2.deployments[0].process.bpmnProcessId + ;({ + processDefinitionKey: processDefinitionKey2, + bpmnProcessId: bpmnProcessId2, + } = wf2.deployments[0].process) wf3 = await zbc.deployResource({ processFilename: './src/__tests__/testdata/Worker-Failure3.bpmn', }) - bpmnProcessId3 = wf3.deployments[0].process.bpmnProcessId - await cancelProcesses(bpmnProcessId1) - await cancelProcesses(bpmnProcessId2) - await cancelProcesses(bpmnProcessId3) + ;({ + processDefinitionKey: processDefinitionKey3, + bpmnProcessId: bpmnProcessId3, + } = wf3.deployments[0].process) + await cancelProcesses(processDefinitionKey1) + await cancelProcesses(processDefinitionKey2) + await cancelProcesses(processDefinitionKey3) }) afterEach(async () => { @@ -53,9 +66,9 @@ afterEach(async () => { afterAll(async () => { await zbc.close() restoreZeebeLogging() - await cancelProcesses(bpmnProcessId1) - await cancelProcesses(bpmnProcessId2) - await cancelProcesses(bpmnProcessId3) + await cancelProcesses(processDefinitionKey1) + await cancelProcesses(processDefinitionKey2) + await cancelProcesses(processDefinitionKey3) }) test('Causes a retry with complete.failure()', async () => { @@ -92,7 +105,6 @@ test('Causes a retry with complete.failure()', async () => { } return job.fail('Triggering a retry') }, - loglevel: 'NONE', }) ) }) @@ -120,8 +132,6 @@ test('Does not fail a process when the handler throws, by default', async () => 'Unhandled exception in task handler for testing purposes' ) // Will be caught in the library }, - - loglevel: 'NONE', pollInterval: 10000, }) }) @@ -149,7 +159,6 @@ test('Fails a process when the handler throws and options.failProcessOnException throw new Error('Unhandled exception in task handler for test purposes') // Will be caught in the library }, failProcessOnException: true, - loglevel: 'NONE', }) function testProcessInstanceExists() { diff --git a/src/__tests__/zeebe/integration/Worker-LongPoll.spec.ts b/src/__tests__/zeebe/integration/Worker-LongPoll.spec.ts index 2c1ef2c4..944266af 100644 --- a/src/__tests__/zeebe/integration/Worker-LongPoll.spec.ts +++ b/src/__tests__/zeebe/integration/Worker-LongPoll.spec.ts @@ -4,8 +4,6 @@ import * as uuid from 'uuid' import { ZeebeGrpcClient } from '../../../zeebe' import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses' -process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE' - jest.setTimeout(40000) let processId: string @@ -26,7 +24,7 @@ beforeAll(async () => { const res = await zbcLongPoll.deployResource({ processFilename: './src/__tests__/testdata/Worker-LongPoll.bpmn', }) - processId = res.deployments[0].process.bpmnProcessId + processId = res.deployments[0].process.processDefinitionKey await cancelProcesses(processId) }) diff --git a/src/__tests__/zeebe/integration/Worker-RaiseIncident.spec.ts b/src/__tests__/zeebe/integration/Worker-RaiseIncident.spec.ts index e869cc9d..3566be26 100644 --- a/src/__tests__/zeebe/integration/Worker-RaiseIncident.spec.ts +++ b/src/__tests__/zeebe/integration/Worker-RaiseIncident.spec.ts @@ -3,8 +3,6 @@ import { restoreZeebeLogging, suppressZeebeLogging } from 'lib' import { ZeebeGrpcClient } from '../../../zeebe' import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses' -process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE' - /** * Note: This test needs to be modified to leave its process instance active so the incident can be manually verified */ @@ -15,26 +13,27 @@ let processInstanceKey: string suppressZeebeLogging() const zbc = new ZeebeGrpcClient() -let processId: string +let bpmnProcessId: string +let processDefinitionKey: string beforeAll(async () => { const res = await zbc.deployResource({ processFilename: './src/__tests__/testdata/Worker-RaiseIncident.bpmn', }) - processId = res.deployments[0].process.bpmnProcessId - await cancelProcesses(processId) + ;({ bpmnProcessId, processDefinitionKey } = res.deployments[0].process) + await cancelProcesses(processDefinitionKey) }) afterAll(async () => { zbc.cancelProcessInstance(processInstanceKey) await zbc.close() restoreZeebeLogging() - await cancelProcesses(processId) + await cancelProcesses(processDefinitionKey) }) test('Can raise an Operate incident with complete.failure()', async () => { const wf = await zbc.createProcessInstance({ - bpmnProcessId: processId, + bpmnProcessId, variables: { conditionVariable: true, }, @@ -66,7 +65,7 @@ test('Can raise an Operate incident with complete.failure()', async () => { expect(job.processInstanceKey).toBe(processInstanceKey) expect(job.variables.conditionVariable).toBe(false) const res1 = await job.fail('Raise an incident in Operate', 0) - // Manually verify that an incident has been raised + /* @TODO: delay, then check for incident in Operate via the API */ await job.cancelWorkflow() // comment out the preceding line for the verification test resolve(null) diff --git a/src/__tests__/zeebe/integration/Worker-fetchVariable.spec.ts b/src/__tests__/zeebe/integration/Worker-fetchVariable.spec.ts index 4af977f3..fdf7d9fb 100644 --- a/src/__tests__/zeebe/integration/Worker-fetchVariable.spec.ts +++ b/src/__tests__/zeebe/integration/Worker-fetchVariable.spec.ts @@ -10,14 +10,15 @@ suppressZeebeLogging() const zbc = new ZeebeGrpcClient() let wf: CreateProcessInstanceResponse | undefined -let processId: string +let processDefinitionKey: string +let bpmnProcessId: string beforeAll(async () => { const res = await zbc.deployResource({ processFilename: './src/__tests__/testdata/hello-world.bpmn', }) - processId = res.deployments[0].process.bpmnProcessId - await cancelProcesses(processId) + ;({ processDefinitionKey, bpmnProcessId } = res.deployments[0].process) + await cancelProcesses(processDefinitionKey) }) afterEach(async () => { @@ -27,7 +28,7 @@ afterEach(async () => { }) afterAll(async () => { - await cancelProcesses(processId) + await cancelProcesses(processDefinitionKey) await zbc.close() restoreZeebeLogging() }) @@ -35,7 +36,7 @@ afterAll(async () => { test('Can retrieve only specified variables using fetchVariable', (done) => { zbc .createProcessInstance({ - bpmnProcessId: processId, + bpmnProcessId, variables: { var1: 'foo', var2: 'bar', diff --git a/src/__tests__/zeebe/integration/Worker-integration.spec.ts b/src/__tests__/zeebe/integration/Worker-integration.spec.ts index 6093e7de..5a76cfce 100644 --- a/src/__tests__/zeebe/integration/Worker-integration.spec.ts +++ b/src/__tests__/zeebe/integration/Worker-integration.spec.ts @@ -9,26 +9,38 @@ suppressZeebeLogging() const zbc = new ZeebeGrpcClient() let wf: CreateProcessInstanceResponse | undefined -let processId1: string -let processId2: string -let processId3: string +let processDefinitionKey1: string +let processDefinitionKey2: string +let processDefinitionKey3: string +let bpmnProcessId1: string +let bpmnProcessId2: string +let bpmnProcessId3: string beforeAll(async () => { const res1 = await zbc.deployResource({ processFilename: './src/__tests__/testdata/hello-world.bpmn', }) - processId1 = res1.deployments[0].process.bpmnProcessId - await cancelProcesses(processId1) + ;({ + processDefinitionKey: processDefinitionKey1, + bpmnProcessId: bpmnProcessId1, + } = res1.deployments[0].process) + await cancelProcesses(processDefinitionKey1) const res2 = await zbc.deployResource({ processFilename: './src/__tests__/testdata/hello-world-complete.bpmn', }) - processId2 = res2.deployments[0].process.bpmnProcessId - await cancelProcesses(processId2) + ;({ + processDefinitionKey: processDefinitionKey2, + bpmnProcessId: bpmnProcessId2, + } = res2.deployments[0].process) + await cancelProcesses(processDefinitionKey2) const res3 = await zbc.deployResource({ processFilename: './src/__tests__/testdata/conditional-pathway.bpmn', }) - processId3 = res3.deployments[0].process.bpmnProcessId - await cancelProcesses(processId3) + ;({ + processDefinitionKey: processDefinitionKey3, + bpmnProcessId: bpmnProcessId3, + } = res3.deployments[0].process) + await cancelProcesses(processDefinitionKey3) }) afterEach(async () => { @@ -39,16 +51,16 @@ afterEach(async () => { afterAll(async () => { await zbc.close() - await cancelProcesses(processId1) - await cancelProcesses(processId2) - await cancelProcesses(processId3) + await cancelProcesses(processDefinitionKey1) + await cancelProcesses(processDefinitionKey2) + await cancelProcesses(processDefinitionKey3) restoreZeebeLogging() }) test('Can service a task', (done) => { zbc .createProcessInstance({ - bpmnProcessId: processId1, + bpmnProcessId: bpmnProcessId1, variables: {}, }) .then((res) => { @@ -69,7 +81,7 @@ test('Can service a task', (done) => { test('Can service a task with complete.success', (done) => { zbc .createProcessInstance({ - bpmnProcessId: processId2, + bpmnProcessId: bpmnProcessId2, variables: {}, }) .then((res) => { @@ -89,7 +101,7 @@ test('Can service a task with complete.success', (done) => { test('Can update process variables with complete.success()', async () => { wf = await zbc.createProcessInstance({ - bpmnProcessId: processId3, + bpmnProcessId: bpmnProcessId3, variables: { conditionVariable: true, }, diff --git a/src/__tests__/zeebe/multitenancy/createProcessInstance-mt.spec.ts b/src/__tests__/zeebe/multitenancy/createProcessInstance-mt.spec.ts index 7ebc7346..20397f54 100644 --- a/src/__tests__/zeebe/multitenancy/createProcessInstance-mt.spec.ts +++ b/src/__tests__/zeebe/multitenancy/createProcessInstance-mt.spec.ts @@ -7,7 +7,7 @@ import { suppressZeebeLogging() let res: DeployResourceResponse - +let bpmnProcessId: string beforeAll(async () => { const client = new ZeebeGrpcClient({ config: { @@ -17,6 +17,7 @@ beforeAll(async () => { res = await client.deployResource({ processFilename: './src/__tests__/testdata/hello-world.bpmn', }) + bpmnProcessId = res.deployments[0].process.bpmnProcessId }) afterAll(() => restoreZeebeLogging()) @@ -32,8 +33,7 @@ test('Will not throw an error if tenantId is provided when starting a process in try { const p = await client.createProcessInstance({ - bpmnProcessId: res.deployments[0].process.bpmnProcessId, - version: res.deployments[0].process.version, + bpmnProcessId, variables: {}, }) expect(p).toBeTruthy() @@ -56,8 +56,7 @@ test('Will throw an error if no tenantId is provided when starting a process ins }) try { const p = await client.createProcessInstance({ - bpmnProcessId: res.deployments[0].process.bpmnProcessId, - version: res.deployments[0].process.version, + bpmnProcessId, variables: {}, }) client.cancelProcessInstance(p.bpmnProcessId) diff --git a/src/lib/BigIntSerializer.ts b/src/lib/BigIntSerializer.ts deleted file mode 100644 index ffb65685..00000000 --- a/src/lib/BigIntSerializer.ts +++ /dev/null @@ -1,2 +0,0 @@ -export const bigIntToString = (bigInt: bigint) => bigInt.toString() -export const stringToBigInt = (str: string) => BigInt(str) diff --git a/src/lib/LosslessJsonParser.ts b/src/lib/LosslessJsonParser.ts new file mode 100644 index 00000000..93d437f8 --- /dev/null +++ b/src/lib/LosslessJsonParser.ts @@ -0,0 +1,88 @@ +/** + * This is a custom JSON Parser that handles lossless parsing of int64 numbers by using the lossless-json library. + * + * It converts all JSON numbers to lossless numbers, then converts them back to the correct type based on the metadata + * of a Dto class - fields decorated with `@Int32` are converted to a `number` and fields decorated with `@Int64` are + * converted to a `string`. + * + * It also handles nested Dtos by using the `@ChildDto` decorator. + */ + +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { LosslessNumber, parse, stringify } from 'lossless-json' +import 'reflect-metadata' + +// Custom Decorators +export function Int32(target: any, propertyKey: string | symbol): void { + Reflect.defineMetadata('type:int32', true, target, propertyKey) +} + +export function Int64(target: any, propertyKey: string | symbol): void { + Reflect.defineMetadata('type:int64', true, target, propertyKey) +} + +// Custom decorator to specify the class type of a property +export function ChildDto(childClass: any) { + return function (target: any, propertyKey: string | symbol) { + Reflect.defineMetadata('child:class', childClass, target, propertyKey) + } +} + +export class LosslessDto { + constructor(obj: any) { + if (obj) { + for (const [key, value] of Object.entries(obj)) { + this[key] = value + } + } + } +} + +export function parseWithAnnotations( + json: string, + dto: { new (...args: any[]): T } +): T { + const obj = parse(json) as any // Assume using a parser that doesn't lose precision for int64 + + const instance = new dto() + + for (const [key, value] of Object.entries(obj)) { + const childClass = Reflect.getMetadata('child:class', dto.prototype, key) + if (childClass) { + if (Array.isArray(value)) { + // If the value is an array, parse each element with the specified child class + instance[key] = value.map((item) => + parseWithAnnotations(stringify(item) as string, childClass) + ) + } else { + // If the value is an object, parse it with the specified child class + instance[key] = parseWithAnnotations( + stringify(value) as string, + childClass + ) + } + } else { + // Existing logic for int32 and int64... + if (Reflect.hasMetadata('type:int32', dto.prototype, key)) { + instance[key] = (value as LosslessNumber).valueOf() // Assuming value is already the correct type + } else if (Reflect.hasMetadata('type:int64', dto.prototype, key)) { + instance[key] = (value as LosslessNumber).toString() // Assuming value is string + } else { + instance[key] = value // Assign directly for other types + } + } + } + + return instance +} + +export function parseArrayWithAnnotations( + json: string, + dto: { new (...args: any[]): T } +): T[] { + const array = parse(json) as any[] // Assume using a parser that doesn't lose precision for int64 + + return array.map((item) => + parseWithAnnotations(stringify(item) as string, dto) + ) +} diff --git a/src/lib/index.ts b/src/lib/index.ts index 64842330..11002a8f 100644 --- a/src/lib/index.ts +++ b/src/lib/index.ts @@ -5,6 +5,7 @@ export * from './ConstructOAuthProvider' export * from './Delay' export * from './EnvironmentSetup' export { packageVersion } from './GetPackageVersion' +export * from './LosslessJsonParser' export { RequireConfiguration } from './RequireConfiguration' export * from './SuppressZeebeLogging' export * from './ValueOrDefault' diff --git a/src/modeler/README.md b/src/modeler/README.md deleted file mode 100644 index ac8a42ca..00000000 --- a/src/modeler/README.md +++ /dev/null @@ -1,31 +0,0 @@ -# Camunda 8 Web Modeler API Client - -This is a Node.js client for the Camunda 8 Web Modeler API v1. - -To install: - -```bash -npm i @camunda8/modeler --registry https://npm.pkg.github.com -``` - -## Usage - -Set the environment variables for a Console API client. - -```typescript -import { ModelerApiClient } from '@camunda8/modeler' - -// Hydrate credentials from the environment using the dotenv package -import 'dotenv/config' - -async function main() { - const info = await modeler.getInfo() - console.log(JSON.stringify(info, null, 2)) - const projects = await modeler.searchProjects({}) - console.log(`Found ${projects.length} projects`) - console.log(JSON.stringify(projects, null, 2)) - }) -} - -main() -``` diff --git a/src/oauth/README.md b/src/oauth/README.md deleted file mode 100644 index f6470c7d..00000000 --- a/src/oauth/README.md +++ /dev/null @@ -1,82 +0,0 @@ -# Camunda SaaS OAuth for Node.js - -[![NPM](https://nodei.co/npm/camunda-saas-oauth.png)](https://npmjs.org/package/camunda-saas-oauth) - -![Community Extension](https://img.shields.io/badge/Community%20Extension-An%20open%20source%20community%20maintained%20project-FF4700) - -![Lifecycle](https://img.shields.io/badge/Lifecycle-Stable-brightgreen) - -[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) - -A library to exchange a set of Camunda 8 SaaS API credentials for a token to make API calls to Camunda 8 SaaS. Uses [camunda-8-credentials-from-env](https://github.com/camunda-community-hub/camunda-8-credentials-from-env) to get the credentials from the environment. - -Caches the token to disk, and refreshes tokens before they expire. - -## Installation - -Install as a dependency: - -``` -npm i @camunda8/oauth -``` - -## Usage - -```typescript -import * as auth from '@camunda8/oauth' - -async function main() { - const useragent = 'myclient-nodejs/1.0.0' - const operateToken = await auth.getOperateToken(useragent) - const tasklistToken = await auth.getTasklistToken(useragent) - const optimizeToken = await auth.getOptimizeToken(useragent) - const zeebeToken = await auth.getZeebeToken(useragent) - return { - operateToken, - tasklistToken, - optimizeToken, - zeebeToken, - } -} -``` - -The call will throw if the client credentials are not found in the environment, or you request a token for a scope for which the credentials are not valid. - -## Configuration - -Set the API client credentials in the environment, using [the environment variables from the web console](https://docs.camunda.io/docs/components/console/manage-clusters/manage-api-clients/). - -To configure a different cache directory, set the `CAMUNDA_TOKEN_CACHE_DIR` environment variable. - -To turn off disk caching, set the environment variable `CAMUNDA_TOKEN_CACHE=memory-only`. - -## User Agent - -Example of a custom user agent string: `mycustom-client-nodejs/${pkg.version} ${CUSTOM_AGENT_STRING}` - -## Advanced Usage - -The methods that return tokens use an `OAuthProvider` to get the tokens. - -The `OAuthProvider` class is a wrapper that hydrates a `OAuthProviderImpl` with credentials from the environment. - -If you want to manually set the credentials (for example, to address multiple clusters in a single application), you can do so by creating an `OAuthProviderImpl` directly, like so: - -```typescript -import { OAuthProviderImpl } from 'camunda-saas-oauth' - -const oauth = new OAuthProviderImpl({ - /** OAuth Endpoint URL */ - authServerUrl, - /** OAuth Audience */ - audience, - clientId, - clientSecret, - userAgentString, -}) - -const operateToken = oauth.getToken('OPERATE') -const optimizeToken = oauth.getToken('OPTIMIZE') -const tasklistToken = oauth.getToken('TASKLIST') -const zeebeToken = oauth.getToken('ZEEBE') -``` diff --git a/src/oauth/lib/OAuthProvider.ts b/src/oauth/lib/OAuthProvider.ts index 482cfeb6..4a412f0c 100644 --- a/src/oauth/lib/OAuthProvider.ts +++ b/src/oauth/lib/OAuthProvider.ts @@ -17,7 +17,7 @@ import { IOAuthProvider, Token, TokenError } from '../index' import { TokenGrantAudienceType } from './IOAuthProvider' -const trace = debug('camunda:token') +const trace = debug('camunda:oauth') const homedir = os.homedir() const BACKOFF_TOKEN_ENDPOINT_FAILURE = 1000 @@ -163,7 +163,9 @@ export class OAuthProvider implements IOAuthProvider { // check expiry and evict in-memory and file cache if expired if (this.isExpired(token)) { this.evictFromMemoryCache(audienceType) + trace(`In-memory token ${token.token_type} is expired`) } else { + trace(`Using in-memory cached token ${token.token_type}`) return this.tokenCache[key].access_token } } @@ -176,7 +178,9 @@ export class OAuthProvider implements IOAuthProvider { // check expiry and evict in-memory and file cache if expired if (this.isExpired(cachedToken)) { this.evictFromFileCache({ audienceType, clientId: clientIdToUse }) + trace(`File cached token ${cachedToken.token_type} is expired`) } else { + trace(`Using file cached token ${cachedToken.token_type}`) return cachedToken.access_token } } @@ -279,7 +283,15 @@ export class OAuthProvider implements IOAuthProvider { const optionsWithAgent = this.customRootCert ? { ...options, agent: customAgent } : options + trace(`Making token request to the token endpoint: `) + trace(` ${this.authServerUrl}`) + trace(optionsWithAgent) return fetch(this.authServerUrl, optionsWithAgent) + .catch((e) => { + console.log(`Erroring requesting token for Client Id ${clientIdToUse}`) + console.log(e) + throw e + }) .then((res) => res.json().catch(() => { trace( @@ -314,8 +326,6 @@ export class OAuthProvider implements IOAuthProvider { }) } this.sendToMemoryCache({ audience: audienceType, token }) - trace(`Got token from endpoint: \n${token.access_token}`) - trace(`Token expires in ${token.expires_in} seconds`) return token.access_token }) } @@ -378,6 +388,7 @@ export class OAuthProvider implements IOAuthProvider { }), (e) => { if (!e) { + trace(`Wrote OAuth token to file ${file}`) return } // tslint:disable-next-line diff --git a/src/operate/README.md b/src/operate/README.md deleted file mode 100644 index 45d438de..00000000 --- a/src/operate/README.md +++ /dev/null @@ -1,75 +0,0 @@ -# Operate API Client for Node.js - -[![NPM](https://nodei.co/npm/operate-api-client.png)](https://npmjs.org/package/operate-api-client) - -![Community Extension](https://img.shields.io/badge/Community%20Extension-An%20open%20source%20community%20maintained%20project-FF4700) - -![Lifecycle](https://img.shields.io/badge/Lifecycle-Stable-brightgreen) - -[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) - -A Node.js client for interacting with the [Camunda 8 Operate REST API](https://docs.camunda.io/docs/apis-clients/operate-api/) in Camunda 8 SaaS. - -Uses [camunda-saas-oauth-nodejs](https://github.com/camunda-community-hub/camunda-saas-oauth-nodejs) to use client credentials from the environment for authentication. - -## Installation - -``` -npm i @camunda8/operate -``` - -## Usage - -Set the credentials for Camunda SaaS in the environment, then: - -```typescript -import { OperateApiClient } from '@camunda8/operate' - -const operate = new OperateApiClient() - -operate - .searchProcessInstances({ - filter: { - state: 'ACTIVE', - }, - size: 50, - }) - .then((instances) => { - console.log(instances) - }) -``` - -## Advanced Usage - -If you want to create multiple instances of the client in an application - for example, to address different clusters - then you can hydrate the client manually using an `OAuthProviderImpl` like so: - -```typescript -import { OperateApiClient } from '@camunda8/operate' -import { OAuthProviderImpl } from '@camunda8/oauth' - -const oauthProvider1 = new OAuthProviderImpl({ - audience: 'zeebe.camunda.io', - authServerUrl: 'https://login.cloud.camunda.io/oauth/token', - clientId: process.env.ZEEBE_CLIENT_ID_1, - clientSecret: process.env.ZEEBE_CLIENT_SECRET_1, - userAgentString: 'operate-client-nodejs', -}) - -const client_1 = new OperateApiClient({ - oauthProvider, - baseUrl: process.env.CAMUNDA_OPERATE_BASE_URL_1, -}) - -const oauthProvider2 = new OAuthProviderImpl({ - audience: 'zeebe.camunda.io', - authServerUrl: 'https://login.cloud.camunda.io/oauth/token', - clientId: process.env.ZEEBE_CLIENT_ID_2, - clientSecret: process.env.ZEEBE_CLIENT_SECRET_2, - userAgentString: 'operate-client-nodejs', -}) - -const client_2 = new OperateApiClient({ - oauthProvider, - baseUrl: process.env.CAMUNDA_OPERATE_BASE_URL_2, -}) -``` diff --git a/src/operate/index.ts b/src/operate/index.ts index 1547eca0..96c537ea 100644 --- a/src/operate/index.ts +++ b/src/operate/index.ts @@ -1,3 +1,3 @@ export { OperateApiClient } from './lib/OperateApiClient' -export * as Dto from './lib/APIObjects' +export * as Dto from './lib/OperateDto' diff --git a/src/operate/lib/APIObjects.ts b/src/operate/lib/APIObjects.ts deleted file mode 100644 index b30b2b8f..00000000 --- a/src/operate/lib/APIObjects.ts +++ /dev/null @@ -1,113 +0,0 @@ -export interface ProcessDefinition { - key: number - name: string - version: number - bpmnProcessId: string -} - -export interface ProcessInstance { - key: number - processVersion: number - bpmnProcessId: string - parentKey?: number - parentFlowNodeInstanceKey: number - /* yyyy-MM-dd'T'HH:mm:ss.SSSZZ */ - startDate: string - /* yyyy-MM-dd'T'HH:mm:ss.SSSZZ */ - endDate: string - state: 'ACTIVE' | 'COMPLETED' | 'CANCELED' - processDefinitionKey: number - tenantId: string | undefined - parentProcessInstanceKey: number -} - -export interface Incident { - key: number - processDefinitionKey: number - processInstanceKey: number - type: - | 'UNSPECIFIED' - | 'UNKNOWN' - | 'IO_MAPPING_ERROR' - | 'JOB_NO_RETRIES' - | 'CONDITION_ERROR' - | 'EXTRACT_VALUE_ERROR' - | 'CALLED_ELEMENT_ERROR' - | 'UNHANDLED_ERROR_EVENT' - | 'MESSAGE_SIZE_EXCEEDED' - | 'CALLED_DECISION_ERROR' - | 'DECISION_EVALUATION_ERROR' - message: string - /* yyyy-MM-dd'T'HH:mm:ss.SSSZZ */ - creationTime: string // - state: 'ACTIVE' | 'RESOLVED' -} - -export interface FlownodeInstance { - key: number - processInstanceKey: number - /* yyyy-MM-dd'T'HH:mm:ss.SSSZZ */ - startDate: string - /* yyyy-MM-dd'T'HH:mm:ss.SSSZZ */ - endDate: string - incidentKey: number - type: - | 'UNSPECIFIED' - | 'PROCESS' - | 'SUB_PROCESS' - | 'EVENT_SUB_PROCESS' - | 'START_EVENT' - | 'INTERMEDIATE_CATCH_EVENT' - | 'INTERMEDIATE_THROW_EVENT' - | 'BOUNDARY_EVENT' - | 'END_EVENT' - | 'SERVICE_TASK' - | 'RECEIVE_TASK' - | 'USER_TASK' - | 'MANUAL_TASK' - | 'TASK' - | 'EXCLUSIVE_GATEWAY' - | 'INCLUSIVE_GATEWAY' - | 'PARALLEL_GATEWAY' - | 'EVENT_BASED_GATEWAY' - | 'SEQUENCE_FLOW' - | 'MULTI_INSTANCE_BODY' - | 'CALL_ACTIVITY' - | 'BUSINESS_RULE_TASK' - | 'SCRIPT_TASK' - | 'SEND_TASK' - | 'UNKNOWN' - state: 'ACTIVE' | 'COMPLETED' | 'TERMINATED' - incident: boolean -} - -export interface Variable { - key: number - processInstanceKey: number - scopeKey: number - name: string - /* Always truncated if value is too big in "search" results. In "get object" result it is not truncated. */ - value: string - /* if true 'value' is truncated. */ - truncated: boolean -} - -export interface ChangeStatus { - /* What was changed */ - message: string - /* How many items were deleted */ - deleted: number -} - -export interface Query { - filter?: Partial - size?: number - sort?: [{ field: keyof T; order: 'ASC' | 'DESC' }] - searchAfter?: unknown[] -} - -export interface SearchResults { - items: T[] - sortValues: unknown[] - total: number -} diff --git a/src/operate/lib/OperateApiClient.ts b/src/operate/lib/OperateApiClient.ts index 650a879c..a7386bc3 100644 --- a/src/operate/lib/OperateApiClient.ts +++ b/src/operate/lib/OperateApiClient.ts @@ -6,23 +6,38 @@ import { RequireConfiguration, constructOAuthProvider, packageVersion, + parseArrayWithAnnotations, + parseWithAnnotations, } from 'lib' import { IOAuthProvider } from 'oauth' import { ChangeStatus, + DecisionDefinition, + DecisionInstance, + DecisionRequirements, FlownodeInstance, Incident, ProcessDefinition, ProcessInstance, + ProcessInstanceStatistics, Query, SearchResults, Variable, -} from './APIObjects' +} from './OperateDto' +import { parseSearchResults } from './parseSearchResults' const OPERATE_API_VERSION = 'v1' type JSONDoc = { [key: string]: string | boolean | number | JSONDoc } +type EnhanceWithTenantIdIfMissing = T extends { + filter: { tenantId: string | undefined } +} + ? T // If T's filter already has tenantId, V is T unchanged + : T extends { filter: infer F } + ? { filter: F & { tenantId: string | undefined } } & Omit // If T has a filter without tenantId, add tenantId to it + : { filter: { tenantId: string | undefined } } & T // If T has no filter property, add filter with tenantId + /** * @description The high-level client for Operate. * @example @@ -87,6 +102,28 @@ export class OperateApiClient { } } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + protected addTenantIdToFilter( + query: Query, + tenantId: string | undefined = this.tenantId // Example default value + ): Query> { + const hasTenantIdInFilter = query.filter && 'tenantId' in query.filter + + // If `filter` already has `tenantId`, return the original query as is. + if (hasTenantIdInFilter) { + return query as Query> + } + + // Otherwise, add or ensure `tenantId` in `filter`. + return { + ...query, + filter: { + ...query.filter, + tenantId, + }, + } as unknown as Query> + } + /** * @description Search and retrieve process definitions. * @@ -111,10 +148,12 @@ export class OperateApiClient { query: Query = {} ): Promise> { const headers = await this.getHeaders() + const json = this.addTenantIdToFilter(query) return this.rest .post('process-definitions/search', { - json: query, + json, headers, + parseJson: (text) => parseSearchResults(text, ProcessDefinition), }) .json() } @@ -148,6 +187,49 @@ export class OperateApiClient { }).text() } + public async searchDecisionDefinitions( + query: Query + ): Promise> { + const headers = await this.getHeaders() + const json = this.addTenantIdToFilter(query) + return this.rest('decision-definitions/search', { + headers, + parseJson: (text) => parseSearchResults(text, DecisionDefinition), + json, + }).json() + } + + public async getDecisionDefinition( + decisionDefinitionKey: number | string + ): Promise { + const headers = await this.getHeaders() + return this.rest(`decision-definitions/${decisionDefinitionKey}`, { + headers, + parseJson: (text) => parseWithAnnotations(text, DecisionDefinition), + }).json() + } + + public async searchDecisionInstances( + query: Query + ): Promise> { + const headers = await this.getHeaders() + const json = this.addTenantIdToFilter(query) + return this.rest('decision-instances/search', { + headers, + parseJson: (text) => parseSearchResults(text, DecisionInstance), + json, + }).json() + } + + public async getDecisionInstance( + decisionInstanceKey: number | string + ): Promise { + const headers = await this.getHeaders() + return this.rest(`decision-instances/${decisionInstanceKey}`, { + headers, + parseJson: (text) => parseWithAnnotations(text, DecisionInstance), + }).json() + } /** * @description Search and retrieve process instances. * @example @@ -172,18 +254,19 @@ export class OperateApiClient { query: Query = {} ): Promise> { const headers = await this.getHeaders() - const json = this.tenantId - ? { - ...query, - tenantId: this.tenantId, - } - : query - return this.rest - .post('process-instances/search', { - json, - headers, - }) - .json() + + const json = this.addTenantIdToFilter(query) + try { + return this.rest + .post('process-instances/search', { + json, + headers, + parseJson: (text) => parseSearchResults(text, ProcessInstance), + }) + .json() + } catch (e) { + throw new Error((e as Error).message) + } } /** @@ -216,11 +299,16 @@ export class OperateApiClient { processInstanceKey: number | string ): Promise { const headers = await this.getHeaders() - return this.rest - .delete(`process-instances/${processInstanceKey}`, { + try { + const res = this.rest.delete(`process-instances/${processInstanceKey}`, { headers, + throwHttpErrors: false, }) - .json() + res.catch((e) => console.log(e)) + return res.json() + } catch (e) { + throw new Error((e as Error).message) + } } /** @@ -228,19 +316,12 @@ export class OperateApiClient { */ public async getProcessInstanceStatistics( processInstanceKey: number | string - ): Promise< - { - // The id of the flow node for which the results are aggregated - activityId: string - active: number - canceled: number - incidents: number - completed: number - }[] - > { + ): Promise { const headers = await this.getHeaders() return this.rest(`process-instances/${processInstanceKey}/statistics`, { headers, + parseJson: (text) => + parseArrayWithAnnotations(text, ProcessInstanceStatistics), }).json() } @@ -281,10 +362,12 @@ export class OperateApiClient { query: Query = {} ): Promise> { const headers = await this.getHeaders() + const json = this.addTenantIdToFilter(query) return this.rest .post('incidents/search', { - json: query, + json, headers, + parseJson: (text) => parseSearchResults(text, Incident), }) .json() } @@ -303,6 +386,7 @@ export class OperateApiClient { const headers = await this.getHeaders() return this.rest(`incidents/${key}`, { headers, + parseJson: (text) => parseWithAnnotations(text, Incident), }).json() } @@ -310,10 +394,12 @@ export class OperateApiClient { query: Query ): Promise> { const headers = await this.getHeaders() + const json = this.addTenantIdToFilter(query) return this.rest - .post('flownodes/search', { + .post('flownode-instances/search', { headers, - json: query, + json, + parseJson: (text) => parseSearchResults(text, FlownodeInstance), }) .json() } @@ -322,8 +408,9 @@ export class OperateApiClient { key: number | string ): Promise { const headers = await this.getHeaders() - return this.rest(`flownodes/${key}`, { + return this.rest(`flownode-instances/${key}`, { headers, + parseJson: (text) => parseWithAnnotations(text, FlownodeInstance), }).json() } @@ -331,10 +418,12 @@ export class OperateApiClient { query: Query ): Promise> { const headers = await this.getHeaders() + const json = this.addTenantIdToFilter(query) return this.rest .post('variables/search', { headers, - json: query, + json, + parseJson: (text) => parseSearchResults(text, Variable), }) .json() } @@ -412,4 +501,33 @@ export class OperateApiClient { headers, }).json() } + + public async searchDecisionRequirements(query: Query) { + const headers = await this.getHeaders() + const json = this.addTenantIdToFilter(query) + return this.rest + .post('drd/search', { + headers, + json, + parseJson: (text) => parseSearchResults(text, DecisionRequirements), + }) + .json() + } + + public async getDecisionRequirements( + key: string | number + ): Promise { + const headers = await this.getHeaders() + return this.rest(`drd/${key}`, { + headers, + parseJson: (text) => parseWithAnnotations(text, DecisionRequirements), + }).json() + } + + public async getDecisionRequirementsXML(key: string | number) { + const headers = await this.getHeaders() + return this.rest(`drd/${key}/xml`, { + headers, + }).text() + } } diff --git a/src/operate/lib/OperateDto.ts b/src/operate/lib/OperateDto.ts new file mode 100644 index 00000000..c8f94b38 --- /dev/null +++ b/src/operate/lib/OperateDto.ts @@ -0,0 +1,222 @@ +import { ChildDto, Int32, Int64, LosslessDto } from 'lib' +import { LosslessNumber } from 'lossless-json' + +export class DecisionDefinition extends LosslessDto { + id!: string + @Int64 + key!: string + decisionId!: string + name!: string + version!: number + decisionRequirementsId!: string + @Int64 + decisionRequirementsKey!: string + decisionRequirementsName!: string + decisionRequirementsVersion!: number + tenantId: string | undefined +} + +class DecisionInstanceInput extends LosslessDto { + id!: string + name!: string + value!: string +} + +class DecisionInstanceOutput extends LosslessDto { + id!: string + name!: string + value!: string + ruleId!: string + @Int32 + ruleIndex!: number +} + +export class DecisionInstance extends LosslessDto { + id!: string + @Int64 + key!: string + state!: 'FAILED' | 'EVALUATED' | 'UNKNOWN' | 'UNSPECIFIED' + evaluationDate!: string + evaluationFailure!: string + @Int64 + processDefinitionKey!: string + decisionId!: string + decisionDefinitionId!: string + decisionName!: string + @Int32 + decisionVersion!: number + decisionType!: + | 'DECISION_TABLE' + | 'LITERAL_EXPRESSION' + | 'UNSPECIFIED' + | 'UNKNOWN' + result!: string + evaluatedInputs!: DecisionInstanceInput[] + @ChildDto(DecisionInstanceOutput) + evaluatedOutputs!: DecisionInstanceOutput[] + tenantId: string | undefined +} + +export class DecisionRequirements extends LosslessDto { + id!: string + @Int64 + key!: string + decisionRequirementsId!: string + name!: string + @Int32 + version!: number + resourceName!: string + tenantId: string | undefined +} + +export class ProcessDefinition extends LosslessDto { + /** ProcessDefinition key is a string in the SDK, but it's an int64 number in the database */ + @Int64 + key!: string + name!: string + @Int32 + version!: number + bpmnProcessId!: string +} + +export class ProcessInstance extends LosslessDto { + @Int64 + key!: string + @Int32 + processVersion!: number + bpmnProcessId!: string + @Int64 + parentKey?: string + @Int64 + parentFlowNodeInstanceKey?: string + /* yyyy-MM-dd'T'HH:mm:ss.SSSZZ */ + startDate!: string + /* yyyy-MM-dd'T'HH:mm:ss.SSSZZ */ + endDate!: string + state!: 'ACTIVE' | 'COMPLETED' | 'CANCELED' + @Int64 + processDefinitionKey!: string + tenantId: string | undefined +} + +export class Incident extends LosslessDto { + @Int64 + key!: string + @Int64 + processDefinitionKey!: LosslessNumber + @Int64 + processInstanceKey!: LosslessNumber + type!: + | 'UNSPECIFIED' + | 'UNKNOWN' + | 'IO_MAPPING_ERROR' + | 'JOB_NO_RETRIES' + | 'CONDITION_ERROR' + | 'EXTRACT_VALUE_ERROR' + | 'CALLED_ELEMENT_ERROR' + | 'UNHANDLED_ERROR_EVENT' + | 'MESSAGE_SIZE_EXCEEDED' + | 'CALLED_DECISION_ERROR' + | 'DECISION_EVALUATION_ERROR' + message!: string + /* yyyy-MM-dd'T'HH:mm:ss.SSSZZ */ + creationTime!: string // + state!: 'ACTIVE' | 'RESOLVED' + @Int64 + jobKey!: string + tenantId: string | undefined +} + +export class FlownodeInstance extends LosslessDto { + @Int64 + key!: string + @Int64 + processInstanceKey!: string + /* yyyy-MM-dd'T'HH:mm:ss.SSSZZ */ + startDate!: string + /* yyyy-MM-dd'T'HH:mm:ss.SSSZZ */ + endDate!: string + @Int64 + incidentKey?: string + type!: + | 'UNSPECIFIED' + | 'PROCESS' + | 'SUB_PROCESS' + | 'EVENT_SUB_PROCESS' + | 'START_EVENT' + | 'INTERMEDIATE_CATCH_EVENT' + | 'INTERMEDIATE_THROW_EVENT' + | 'BOUNDARY_EVENT' + | 'END_EVENT' + | 'SERVICE_TASK' + | 'RECEIVE_TASK' + | 'USER_TASK' + | 'MANUAL_TASK' + | 'TASK' + | 'EXCLUSIVE_GATEWAY' + | 'INCLUSIVE_GATEWAY' + | 'PARALLEL_GATEWAY' + | 'EVENT_BASED_GATEWAY' + | 'SEQUENCE_FLOW' + | 'MULTI_INSTANCE_BODY' + | 'CALL_ACTIVITY' + | 'BUSINESS_RULE_TASK' + | 'SCRIPT_TASK' + | 'SEND_TASK' + | 'UNKNOWN' + state!: 'ACTIVE' | 'COMPLETED' | 'TERMINATED' + incident!: boolean + tenantId: string | undefined +} + +export class Variable extends LosslessDto { + @Int64 + key!: string + @Int64 + processInstanceKey!: string + @Int64 + scopeKey!: string + name!: string + /* Always truncated if value is too big in "search" results. In "get object" result it is not truncated. */ + value!: string + /* if true 'value' is truncated. */ + truncated!: boolean +} + +export class ChangeStatus extends LosslessDto { + /* What was changed */ + message!: string + /* How many items were deleted */ + @Int64 + deleted!: string +} + +export class ProcessInstanceStatistics extends LosslessDto { + /* The id of the flow node for which the results are aggregated */ + activityId!: string + /* The total number of active instances of the flow node */ + @Int64 + active!: string + /* The total number of canceled instances of the flow node */ + @Int64 + canceled!: string + /* The total number of incidents for the flow node */ + @Int64 + incidents!: string + /* The total number of completed instances of the flow node */ + @Int64 + completed!: string +} + +export interface Query { + filter?: Partial + size?: number + sort?: [{ field: keyof T; order: 'ASC' | 'DESC' }] + searchAfter?: unknown[] +} + +export interface SearchResults { + items: T[] + sortValues: unknown[] + total: string +} diff --git a/src/operate/lib/TestableOperateApiClient.ts b/src/operate/lib/TestableOperateApiClient.ts new file mode 100644 index 00000000..acb886f1 --- /dev/null +++ b/src/operate/lib/TestableOperateApiClient.ts @@ -0,0 +1,18 @@ +import { ClientConstructor } from 'lib' + +import { OperateApiClient } from './OperateApiClient' +import { Query } from './OperateDto' + +export class TestableOperateApiClient extends OperateApiClient { + constructor(options?: ClientConstructor) { + super(options) + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + public override addTenantIdToFilter>( + query: T, + tenantId?: string + ) { + return super.addTenantIdToFilter(query, tenantId) + } +} diff --git a/src/operate/lib/parseSearchResults.ts b/src/operate/lib/parseSearchResults.ts new file mode 100644 index 00000000..74791713 --- /dev/null +++ b/src/operate/lib/parseSearchResults.ts @@ -0,0 +1,33 @@ +import { parseWithAnnotations } from 'lib' +import { parse, stringify } from 'lossless-json' + +import { SearchResults } from './OperateDto' + +export function parseSearchResults( + json: string, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + dto: { new (...args: any[]): T } +): SearchResults { + const parsedResult = parse(json) as SearchResults + + // Assuming `parsedResult` matches the structure of `SearchResults` + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const items = parsedResult.items.map((item: any) => + parseWithAnnotations(stringify(item) as string, dto) + ) + + // Apply additional parsing or annotations if necessary + // For each item in the array, you could potentially apply the same or similar logic + // as in `parseWithAnnotations` if your items have properties that need special handling. + + const total = parsedResult.total.toString() // Or convert based on your needs + + // Construct the final object, assuming `SearchResults` is a simple interface without methods + const result: SearchResults = { + items, + sortValues: parsedResult.sortValues, // Handle according to your needs + total, + } + + return result +} diff --git a/src/optimize/README.md b/src/optimize/README.md deleted file mode 100644 index 84e50ead..00000000 --- a/src/optimize/README.md +++ /dev/null @@ -1,38 +0,0 @@ -# Optimize API Client for Node.js - -[![NPM](https://nodei.co/npm/optimize-api-client.png)](https://npmjs.org/package/optimize-api-client) - -![Community Extension](https://img.shields.io/badge/Community%20Extension-An%20open%20source%20community%20maintained%20project-FF4700) - -![Lifecycle](https://img.shields.io/badge/Lifecycle-Stable-brightgreen) - -[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) - -A Node.js client for interacting with the [Camunda Platform 8 Optimize REST API](https://docs.camunda.io/optimize/apis-clients/optimize-api/configuration/enable-sharing/). - -Uses [camunda-saas-oauth-nodejs](https://github.com/camunda-community-hub/camunda-saas-oauth-nodejs) to use client credentials from the environment for authentication. - -## Installation - -``` -npm i @camunda8/optimize -``` - -## Usage - -Set the credential for Camunda SaaS in the environment, then: - -```typescript -import { OptimizeApiClient } from '@camunda8/optimize` - -const optimize = new OperateApiClient() - -async function main() { - await optimize.enableSharing() - const id = "8a7103a7-c086-48f8-b5b7-a7f83e864688" - const res = await optimize.exportDashboardDefinitions([id]) - fs.writeFileSync('exported-dashboard.json', JSON.stringify(res, null, 2)) -} - -main() -``` diff --git a/src/tasklist/README.md b/src/tasklist/README.md deleted file mode 100644 index 35bff86f..00000000 --- a/src/tasklist/README.md +++ /dev/null @@ -1,39 +0,0 @@ -# Camunda 8 Tasklist API client for Node.js - -[![NPM](https://nodei.co/npm/camunda-tasklist-client.png)](https://npmjs.org/package/camunda-tasklist-client) - -![Community Extension](https://img.shields.io/badge/Community%20Extension-An%20open%20source%20community%20maintained%20project-FF4700) - -![Lifecycle](https://img.shields.io/badge/Lifecycle-Stable-brightgreen) - -[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) - -A Camunda 8 Tasklist API client for Node.js. Uses [camunda-saas-oauth-nodejs](https://github.com/camunda-community-hub/camunda-saas-oauth-nodejs) to use client credentials from the environment for authentication. - -## Installation - -``` -npm i @camunda8/tasklist -``` - -## Usage - -Set the credential for Camunda SaaS in the environment, then: - -```typescript -import { TasklistApiClient } from '@camunda8/tasklist' - -const tasklist = new TasklistApiClient() - -async function main() { - const { tasks } = await tasklist.getTasks({ state: 'CREATED' }) - const task = tasks[0] - console.log('Task', JSON.stringify(task, null, 0)) - const taskid = task.id - const assignedTask = await tasklist.assignTask({ taskId: taskId, assignee: 'jwulf', allowOverrideAssignment: false }) -} - -main() -``` - -Full API documentation available [here](https://camunda-community-hub.github.io/tasklist-client-node-js/). diff --git a/src/zeebe/README.md b/src/zeebe/README.md deleted file mode 100644 index fd617c4c..00000000 --- a/src/zeebe/README.md +++ /dev/null @@ -1,1571 +0,0 @@ -# Zeebe Node.js Client - -![Compatible with: Camunda Platform 8](https://img.shields.io/badge/Compatible%20with-Camunda%20Platform%208-0072Ce) -![Community Extension](https://img.shields.io/badge/Community%20Extension-An%20open%20source%20community%20maintained%20project-FF4700) -![Lifecycle](https://img.shields.io/badge/Lifecycle-Stable-brightgreen) -[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) - -This is a Node.js gRPC client for [Zeebe](https://zeebe.io), the workflow engine in [Camunda Platform 8](https://camunda.com/platform/). It is written in TypeScript and transpiled to JavaScript in the `dist` directory. - -Comprehensive API documentation is available [online](https://camunda-community-hub.github.io/zeebe-client-node-js/). - -See [CHANGELOG.md](https://github.com/camunda-community-hub/zeebe-client-node-js/blob/master/CHANGELOG.md) to see what has changed with each release. - -Get a hosted instance of Zeebe on [Camunda Cloud](https://camunda.io). - -## Table of Contents - -**Quick Start** - -- [ Install ](#install) -- [ Get Broker Topology ](#get-topology) -- [ Deploy a process ](#deploy-process) -- [ Start and service a process](#start-and-service-a-process) - -- [ Versioning ](#versioning) -- [ Compatible Node Versions ](#node-versions) -- [ Breaking changes in 8.1.0 ](#breaking-8.1.0) -- [ Breaking changes in 1.0.0 ](#breaking-1.0.0) -- [ gRPC Implementation ](#grpc-implementation) -- [ Type difference from other Zeebe clients ](#type-difference) -- [ A note on representing timeout durations ](#time-duration) - -**Connection Behaviour** - -- [ Client-side gRPC retry in ZBClient ](#client-side-retry) -- [ onReady(), onConnectionError(), and connected ](#on-ready) -- [ Initial Connection Tolerance ](#initial-connection-tolerance) - -**Connecting to a Broker** - -- [ TLS ](#tls) -- [ OAuth ](#oauth) -- [ Basic Auth ](#basic-auth) -- [ Camunda Cloud ](#camunda-cloud) -- [ Zero-conf constructor ](#zero-conf) - -**Job Workers** - -- [ Job Workers](#job-workers) -- [ The `ZBWorker` Job Worker ](#create-zbworker) -- [ Unhandled Exceptions in Task Handlers ](#unhandled-exceptions) -- [ Completing tasks with success, failure, error, or forwarded ](#complete-tasks) -- [ Working with Process Variables and Custom Headers ](#working-with-variables) -- [ Constraining the Variables Fetched by the Worker ](#fetch-variable) -- [ The "Decoupled Job Completion" pattern ](#decoupled-complete) -- [ The `ZBBatchWorker` Job Worker ](#zbbatchworker) -- [ Long polling ](#long-polling) -- [ Poll Interval ](#poll-interval) - -**Client Commands** - -- [ Deploy Process Models and DMN Tables ](#deploy-resource) -- [ Start a Process Instance ](#start-process) -- [ Start a Process Instance of a specific version of a Process definition ](#start-specific-version) -- [ Start a process instance and await the process outcome ](#start-await) -- [ Publish a Message ](#publish-message) -- [ Publish a Start Message ](#publish-start-message) -- [ Activate Jobs ](#activate-jobs) - -**Other Concerns** - -- [ Graceful Shutdown ](#graceful-shutdown) -- [ Logging ](#logging) - -**Programming with Safety** - -- [ Generating TypeScript constants for BPMN Models ](#generate-constants) -- [ Generating code from a BPM Model file ](#generate-code) -- [ Writing Strongly-typed Job Workers ](#strongly-typed) -- [ Run-time Type Safety ](#run-time-safety) - -**Development of the Library itself** - -- [ Developing Zeebe Node ](#developing) - - [ Tests ](#tests) - - [ Writing Tests ](#writing-tests) -- [ Contributors ](#contributors) - -## Quick Start - - - -## Install - -### Add the Library to your Project - -```bash -npm i @camunda8/zeebe -``` - - - -### Get Broker Topology - -```javascript -const ZB = require('@camunda8/zeebe') - -void (async () => { - const zbc = new ZB.ZBClient() - const topology = await zbc.topology() - console.log(JSON.stringify(topology, null, 2)) -})() -``` - - - -### Deploy a process - -```javascript -const ZB = require('@camunda8/zeebe') -const fs = require('fs') - -void (async () => { - const zbc = new ZB.ZBClient() // localhost:26500 || ZEEBE_GATEWAY_ADDRESS - - const res = await zbc.deployProcess('./domain-mutation.bpmn') - console.log(res) - - // Deploy multiple with an array of filepaths - await zbc.deployProcess(['./wf1.bpmn', './wf2.bpmn']) - - const buffer = fs.readFileSync('./wf3.bpmn') - - // Deploy from an in-memory buffer - await zbc.deployProcess({ definition: buffer, name: 'wf3.bpmn' }) -})() -``` - - - -### Start and service a process - -This code demonstrates how to deploy a Zeebe process, create a process instance, and handle a service task using the Zeebe Node.js client. The 'get-customer-record' service task worker checks for the presence of a customerId variable, simulates fetching a customer record from a database, and completes the task with a customerRecordExists variable. - -```javascript -// Import the Zeebe Node.js client and the 'fs' module -const ZB = require('@camunda8/zeebe') -const fs = require('fs') - -// Instantiate a Zeebe client with default localhost settings or environment variables -const zbc = new ZB.ZBClient() - -// Create a Zeebe worker to handle the 'get-customer-record' service task -const worker = zbc.createWorker({ - // Define the task type that this worker will process - taskType: 'get-customer-record', - // Define the task handler to process incoming jobs - taskHandler: (job) => { - // Log the job variables for debugging purposes - console.log(job.variables) - - // Check if the customerId variable is missing and return an error if so - if (!job.variables.customerId) { - return job.error('NO_CUSTID', 'Missing customerId in process variables') - } - - // Add logic to retrieve the customer record from the database here - // ... - - // Complete the job with the 'customerRecordExists' variable set to true - return job.complete({ - customerRecordExists: true, - }) - }, -}) - -// Define an async main function to deploy a process, create a process instance, and log the outcome -async function main() { - // Deploy the 'new-customer.bpmn' process - const res = await zbc.deployProcess('./new-customer.bpmn') - // Log the deployment result - console.log('Deployed process:', JSON.stringify(res, null, 2)) - - // Create a process instance of the 'new-customer-process' process, with a customerId variable set - // 'createProcessInstanceWithResult' awaits the outcome - const outcome = await zbc.createProcessInstanceWithResult({ - bpmnProcessId: 'new-customer-process', - variables: { customerId: 457 }, - }) - // Log the process outcome - console.log('Process outcome', JSON.stringify(outcome, null, 2)) -} - -// Call the main function to execute the script -main() -``` - - - -## Versioning - -To enable that the client libraries can be easily supported to the Zeebe server we map the version numbers, so that Major, Minor match the server application. Patches are independent and indicate client updates. - -NPM Package version 0.26.x supports Zeebe 0.22.x to 0.26.x. - -NPM Package version 1.x supports Zeebe 1.x. It uses the C-based gRPC library by default. - -NPM Package version 2.x supports Zeebe 1.x, and requires Node >= 16.6.1, >=14.17.5, or >=12.22.5. It removes the C-based gRPC library and uses the pure JS implementation. - - - -## Compatible Node Versions - -Version 1.x of the package: Node versions <=16.x. Version 1.x uses the C-based gRPC library and does not work with Node 17. The C-based gRPC library is deprecated and no longer being maintained. - -Version 2.x and later of the package: Node versions 12.22.5+, 14.17.5+, or 16.6.1+. Version 2.x uses the pure JS implementation of the gRPC library, and requires a fix to the `nghttp2` library in Node (See [#201](https://github.com/camunda-community-hub/zeebe-client-node-js/issues/201)). - - - -## Breaking changes in Zeebe 8.1.0 - -All deprecated APIs are removed in the 8.1.0 package version. If your code relies on deprecated methods and method signatures, you need to use a package version prior to 8.1.0 or update your application code. - - - -## Breaking changes in Zeebe 1.0.0 - -For Zeebe brokers prior to 1.0.0, use the 0.26.z version of `@camunda8/zeebe`. This README documents the Zeebe 1.0.0 API. The previous API is documented [here](https://github.com/camunda-community-hub/zeebe-client-node-js/blob/v.0.25.0/README.md). - -Zeebe 1.0.0 contains a number of breaking changes, including the gRPC protocol and the API surface area. You must use a 1.x.y version of the client library with Zeebe 1.0.0 and later. - -The pre-1.0.0 API of the Node client has been deprecated, but not removed. This means that your pre-1.0.0 applications should still work, just by changing the version of `@camunda8/zeebe` in the `package.json`. - - - -## gRPC Implementation - -From version 2.x, the Zeebe Node client uses the pure JS gRPC client implementation. - -For version 1.x, the Zeebe Node client uses the C gRPC client implementation [grpc-node](https://github.com/grpc/grpc-node) by default. The C-based gRPC implementation is deprecated and is not being maintained. - - - -## Type difference from other Zeebe clients - -Protobuf fields of type `int64` are serialised as type string in the Node library. These fields are serialised as numbers (long) in the Go and Java client. See [grpc/#7229](https://github.com/grpc/grpc/issues/7229) for why the Node library serialises them as string. The Process instance key, and other fields that are of type long in other client libraries, are type string in this library. Fields of type `int32` are serialised as type number in the Node library. - - - -## A note on representing timeout durations - -All timeouts are ultimately communicated in _milliseconds_. They can be specified using the primitive type `number`, and this is always a _number of milliseconds_. - -All timeouts in the client library can _also_, optionally, be specified by a time value that encodes the units, using the [typed-durations](https://www.npmjs.com/package/typed-duration) package. You can specify durations for timeouts like this: - -``` -const { Duration } = require('@camunda8/zeebe') - -const timeoutS = Duration.seconds.of(30) // 30s timeout -const timeoutMs = Duration.milliseconds.of(30000) // 30s timeout in milliseconds -``` - -Using the value types makes your code more semantically specific. - -There are five timeouts to take into account. - -The first is the job `timeout`. This is the amount of time that the broker allocates exclusive responsibility for a job to a worker instance. By default, this is 60 seconds. This is the default value set by this client library. See "[Job Workers](#job-workers)". - -The second is the `requestTimeout`. Whenever the client library sends a gRPC command to the broker, it has an explicit or implied `requestTimeout`. This is the amount of time that the gRPC gateway will wait for a response from the broker cluster before returning a `4 DEADLINE` gRPC error response. - -If no `requestTimeout` is specified, then the configured timeout of the broker gateway is used. Out of the box, this is 15 seconds by default. - -The most significant use of the `requestTimeout` is when using the `createProcessInstanceWithResult` command. If your process will take longer than 15 seconds to complete, you should specify a `requestTimeout`. See "[Start a Process Instance and await the Process Outcome](#start-await)". - -The third is the `longpoll` duration. This is the amount of time that the job worker holds a long poll request to activate jobs open. - -The fourth is the maximum back-off delay in client-side gRPC command retries. See "[Client-side gRPC retry in ZBClient](#client-side-retry)". - -Finally, the `connectionTolerance` option for ZBClient can also take a typed duration. This value is used to buffer reporting connection errors while establishing a connection - for example with Camunda SaaS, which requires a token exchange as part of the connection process. - -## Connection Behaviour - - - -### Client-side gRPC retry in ZBClient - -If a gRPC command method fails in the ZBClient - such as `ZBClient.deployProcess` or `ZBClient.topology()`, the underlying gRPC library will throw an exception. - -If no workers have been started, this can be fatal to the process if it is not handled by the application logic. This is especially an issue when a worker container starts before the Zeebe gRPC gateway is available to service requests, and can be inconsistent as this is a race condition. - -To mitigate against this, the Node client implements some client-side gRPC operation retry logic by default. This can be configured, including disabled, via configuration in the client constructor. - -- Operations retry, but only for [gRPC error codes 8 and 14](https://github.com/grpc/grpc/blob/master/doc/statuscodes.md) - indicating resource exhaustion (8) or transient network failure (14). Resource exhaustion occurs when the broker starts backpressure due to latency because of load. Network failure can be caused by passing in an unresolvable gateway address (`14: DNS Resolution failed`), or by the gateway not being ready yet (`14: UNAVAILABLE: failed to connect to all addresses`). -- Operations that fail for other reasons, such as deploying an invalid bpmn file or cancelling a process that does not exist, do not retry. -- Retry is enabled by default, and can be disabled by passing { retry: false } to the client constructor. -- Values for `retry`, `maxRetries` and `maxRetryTimeout` can be configured via the environment variables `ZEEBE_CLIENT_RETRY`, `ZEEBE_CLIENT_MAX_RETRIES` and `ZEEBE_CLIENT_MAX_RETRY_TIMEOUT` respectively. -- `maxRetries` and `maxRetryTimeout` are also configurable through the constructor options, or through environment variables. By default, if not supplied, the values are: - -```TypeScript -const { ZBClient, Duration } = require('@camunda8/zeebe') - -const zbc = new ZBClient(gatewayAddress, { - retry: true, - maxRetries: -1, // infinite retries - maxRetryTimeout: Duration.seconds.of(5) -}) -``` - -The environment variables are: - -``` -ZEEBE_CLIENT_MAX_RETRIES -ZEEBE_CLIENT_RETRY -ZEEBE_CLIENT_MAX_RETRY_TIMEOUT -``` - -Retry is provided by [promise-retry](https://www.npmjs.com/package/promise-retry), and the back-off strategy is simple ^2. - -Additionally, the gRPC Client will continually reconnect when in a failed state, such as when the gateway goes away due to pod rescheduling on Kubernetes. - - - -### Eager Connection - -The ZBClient eagerly connects to the broker by issuing a topology command in the constructor. This allows you an onReady event to be emitted. You can disable this (for example, for testing without a broker), by either passing `eagerConnection: false` to the client constructor options, or setting the environment variable `ZEEBE_NODE_EAGER_CONNECTION` to `false`. - - - -### onReady(), onConnectionError(), and connected - -The client has a `connected` property that can be examined to determine if it has a gRPC connection to the gateway. - -The client and the worker can take an optional `onReady()` and `onConnectionError()` handler in their constructors, like this: - -```TypeScript -const { ZBClient, Duration } = require('@camunda8/zeebe') - -const zbc = new ZBClient({ - onReady: () => console.log(`Connected!`), - onConnectionError: () => console.log(`Disconnected!`) -}) - -const zbWorker = zbc.createWorker({ - taskType: 'demo-service', - taskHandler: handler, - onReady: () => console.log(`Worker connected!`), - onConnectionError: () => console.log(`Worker disconnected!`) -}) -``` - -These handlers are called whenever the gRPC channel is established or lost. As the grpc channel will often "jitter" when it is lost (rapidly emitting READY and ERROR events at the transport layer), there is a `connectionTolerance` property that determines how long the connection must be in a connected or failed state before the handler is called. By default this is 3000ms. - -You can specify another value either in the constructor or via an environment variable. - -To specify it via an environment variable, set `ZEEBE_CONNECTION_TOLERANCE` to a number of milliseconds. - -To set it via the constructor, specify a value for `connectionTolerance` like this: - -```TypeScript -const { ZBClient, Duration } = require('@camunda8/zeebe') - -const zbc = new ZBClient({ - onReady: () => console.log(`Connected!`), - onConnectionError: () => console.log(`Disconnected!`), - connectionTolerance: 5000 // milliseconds -}) - -const zbWorker = zbc.createWorker({ - taskType: 'demo-service', - taskHandler: handler, - onReady: () => console.log(`Worker connected!`), - onConnectionError: () => console.log(`Worker disconnected!`), - connectionTolerance: Duration.seconds.of(3.5) // 3500 milliseconds -}) -``` - -As well as the callback handlers, the client and workers extend the [`EventEmitter`](https://nodejs.org/api/events.html#events_class_eventemitter) class, and you can attach listeners to them for the 'ready' and 'connectionError' events: - -```TypeScript -const { ZBClient, Duration } = require('@camunda8/zeebe') - -const zbc = new ZBClient() - -const zbWorker = zbc.createWorker({ - taskType: 'demo-service', - taskHandler: handler, - connectionTolerance: Duration.seconds.of(3.5) -}) - -zbWorker.on('ready', () => console.log(`Worker connected!`)) -zbWorker.on('connectionError', () => console.log(`Worker disconnected!`)) -``` - - - -### Initial Connection Tolerance - -Some broker connections can initially emit error messages - for example: when connecting to Camunda SaaS, during TLS negotiation and OAuth authentication, the eager commands used to detect connection status will fail, and the library will report connection errors. - -Since this is expected behaviour - a _characteristic of that particular connection_ - the library has a configurable "_initial connection tolerance_". This is a number of milliseconds representing the expected window in which these errors will occur on initial connection. - -If the library detects that you are connecting to Camunda SaaS, it sets this window to five seconds (5000 milliseconds). In some environments and under some conditions this may not be sufficient. - -You can set an explicit value for this using the environment variable `ZEEBE_INITIAL_CONNECTION_TOLERANCE`, set to a number of milliseconds. - -The effect of this setting is to suppress connection errors during this window, and only report them if the connection did not succeed by the end of the window. - -## Connecting to a Broker - - - -### TLS - -The Node client does not use TLS by default. - -Enable a secure connection by setting `useTLS: true`: - -```typescript -const { ZBClient } = require('@camunda8/zeebe') - -const zbc = new ZBClient(tlsSecuredGatewayAddress, { - useTLS: true, -}) -``` - -Via environment variable: - -```bash -ZEEBE_SECURE_CONNECTION=true -``` - -### Using a Self-signed Certificate - -You can use a self-signed SSL certificate with the Zeebe client. You need to provide the root certificates, the private key and the SSL cert chain as Buffers. You can pass them into the ZBClient constructor: - -```typescript -const rootCertsPath = '/path/to/rootCerts' -const privateKeyPath = '/path/to/privateKey' -const certChainPath = '/path/to/certChain' - -const zbc = new ZBClient({ - useTLS: true, - customSSL: { - rootCerts: rootCertsPath, - privateKey: privateKeyPath, - certChain: certChainPath - } -}) - -Or you can put the file paths into the environment in the following variables: - -ZEEBE_CLIENT_SSL_ROOT_CERTS_PATH -ZEEBE_CLIENT_SSL_PRIVATE_KEY_PATH -ZEEBE_CLIENT_SSL_CERT_CHAIN_PATH -``` - -# Enable TLS - -``` -ZEEBE_SECURE_CONNECTION=true -``` - -In this case, they will be passed to the constructor automatically. - - - -### OAuth - -In case you need to connect to a secured endpoint with OAuth, you can pass in OAuth credentials. This will enable TLS (unless you explicitly disable it with `useTLS: false`), and handle the OAuth flow to get / renew a JWT: - -```typescript -const { ZBClient } = require('@camunda8/zeebe') - -const zbc = new ZBClient("my-secure-broker.io:443", { - oAuth: { - url: "https://your-auth-endpoint/oauth/token", - audience: "my-secure-broker.io", - clientId: "myClientId", - clientSecret: "randomClientSecret", - customRootCert: fs.readFileSync('./my_CA.pem'), - cacheOnDisk: true - } -} -``` - -The `cacheOnDisk` option will cache the token on disk in `$HOME/.camunda`, which can be useful in development if you are restarting the service frequently, or are running in a serverless environment, like AWS Lambda. - -If the cache directory is not writable, the ZBClient constructor will throw an exception. This is considered fatal, as it can lead to denial of service or hefty bills if you think caching is on when it is not. - -The `customRootCert` argument is optional. It can be used to provide a custom TLS certificate as a Buffer, which will be used while obtaining the OAuth token from the specified URL. If not provided, the CAs provided by [Mozilla](https://ccadb-public.secure.force.com/mozilla/IncludedCACertificateReport) will be used. - - - -### Camunda 8 SaaS - -[Camunda 8 SaaS](https://camunda.io) is a hosted SaaS instance of Zeebe. The easiest way to connect is to use the [Zero-conf constructor](#zero-conf) with the Client Credentials from the Camunda SaaS console as environment variables. - -You can also connect to Camunda SaaS by using the `camundaCloud` configuration option, using the `clusterId`, `clientSecret`, and `clientId` from the Camunda SaaS Console, like this: - -```typescript -const { ZBClient } = require('@camunda8/zeebe') - -const zbc = new ZBClient({ - camundaCloud: { - clientId, - clientSecret, - clusterId, - clusterRegion, // optional, defaults to bru-2 - }, -}) -``` - -That's it! Under the hood, the client lib will construct the OAuth configuration for Camunda SaaS and set the gateway address and port for you. - -We recommend the [Zero-conf constructor](#zero-conf) with the configuration passed in via environment variables. This allows you to run your application against different environments via configuration. - - - -## Zero-Conf constructor - -The ZBClient has a 0-parameter constructor that takes the config from the environment. This is useful for injecting secrets into your app via the environment, and switching between development and production environments with no change to code. - -To use the zero-conf constructor, you create the client like this: - -```typescript -const { ZBClient } = require('@camunda8/zeebe') - -const zbc = new ZBClient() -``` - -With no relevant environment variables set, it will default to localhost on the default port with no TLS. - -The following environment variable configurations are possible with the Zero-conf constructor: - -From 8.3.0, multi-tenancy: - -```bash -ZEEBE_TENANT_ID -``` - -Camunda SaaS: - -```bash -ZEEBE_ADDRESS -ZEEBE_CLIENT_SECRET -ZEEBE_CLIENT_ID -ZEEBE_TOKEN_AUDIENCE -ZEEBE_AUTHORIZATION_SERVER_URL -``` - -Self-hosted or local broker (no TLS or OAuth): - -```bash -ZEEBE_ADDRESS -``` - -Self-hosted with self-signed SSL certificate: - -```bash -ZEEBE_CLIENT_SSL_ROOT_CERTS_PATH -ZEEBE_CLIENT_SSL_PRIVATE_KEY_PATH -ZEEBE_CLIENT_SSL_CERT_CHAIN_PATH -ZEEBE_SECURE_CONNECTION=true -``` - -Self-hosted or local broker with OAuth + TLS: - -```bash -ZEEBE_CLIENT_ID -ZEEBE_CLIENT_SECRET -ZEEBE_TOKEN_AUDIENCE -ZEEBE_AUTHORIZATION_SERVER_URL -ZEEBE_ADDRESS -``` - -Multi-tenant self-hosted or local broker with OAuth and no TLS: - -```bash -ZEEBE_TENANT_ID='' -ZEEBE_SECURE_CONNECTION=false -ZEEBE_ADDRESS='localhost:26500' -ZEEBE_CLIENT_ID='zeebe' -ZEEBE_CLIENT_SECRET='zecret' -ZEEBE_AUTHORIZATION_SERVER_URL='http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token' -ZEEBE_TOKEN_AUDIENCE='zeebe.camunda.io' -CAMUNDA_CREDENTIALS_SCOPES='Zeebe' -CAMUNDA_OAUTH_URL='http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token' -``` - -Basic Auth: - -```bash -ZEEBE_BASIC_AUTH_PASSWORD -ZEEBE_BASIC_AUTH_USERNAME -``` - - - -## Job Workers - -### Types of Job Workers - -There are two different types of job worker provided by the Zeebe Node client: - -- The `ZBWorker` - this worker operates on individual jobs. -- The `ZBBatchWorker` - this worker batches jobs on the client, to allow you to batch operations that pool resources. (_This worker was introduced in 0.23.0 of the client_). - -Much of the information in the following [`ZBWorker` section](#create-zbworker) applies also to the `ZBBatchWorker`. The `ZBBatchWorker` section covers the features that differ from the `ZBWorker`. - - - -### The `ZBWorker` Job Worker - -The `ZBWorker` takes a _job handler function_ that is invoked for each job. It is invoked as soon as the worker retrieves a job from the broker. The worker can retrieve any number of jobs in a response from the broker, and the handler is invoked for each one, independently. - -The simplest signature for a worker takes a string task type, and a job handler function. - -The job handler receives the job object, which has methods that it can use to complete or fail the job, and a reference to the worker itself, which you can use to log using the worker's configured logger (See [Logging](#logging)). - -Note: _The second argument is deprecated, and remains for backward-compatibility - it is a complete function. In the 1.0 version of the API, the complete function methods are available on the `job` object_. - -```javascript -const ZB = require('@camunda8/zeebe') - -const zbc = new ZB.ZBClient() - -const zbWorker = zbc.createWorker({ - taskType: 'demo-service', - taskHandler: handler, -}) - -function handler(job) { - zbWorker.log('Task variables', job.variables) - - // Task worker business logic goes here - const updateToBrokerVariables = { - updatedProperty: 'newValue', - } - - return job.complete(updateToBrokerVariables) -} -``` - -Here is an example job: - -```javascript - -{ key: '578', - type: 'demo-service', - jobHeaders: - { processInstanceKey: '574', - bpmnProcessId: 'test-process', - processDefinitionVersion: 1, - processKey: '3', - elementId: 'ServiceTask_0xdwuw7', - elementInstanceKey: '577' }, - customHeaders: '{}', - worker: 'test-worker', - retries: 3, - deadline: '1546915422636', - variables: { testData: 'something' } } -``` - -The worker can be configured with options. To do this, you should use the object parameter constructor. - -Shown below are the defaults that apply if you don't supply them: - -```javascript -const { ZBClient, Duration } = require('@camunda8/zeebe') - -const zbc = new ZBClient() - -const zbWorker = zbc.createWorker({ - taskType: 'demo-service', - taskHandler: handler, - // the number of simultaneous tasks this worker can handle - maxJobsToActivate: 32, - // the amount of time the broker should allow this worker to complete a task - timeout: Duration.seconds.of(30), - // One of 'DEBUG', 'INFO', 'NONE' - loglevel: 'INFO', - // Called when the connection to the broker cannot be established, or fails - onConnectionError: () => zbWorker.log('Disconnected') - // Called when the connection to the broker is (re-)established - onReady: () => zbWorker.log('Connected.') -}) -``` - - - -#### Unhandled Exceptions in Task Handlers - -_Note: this behaviour is for the ZBWorker only. The ZBBatchWorker does not manage this._ - -When a task handler throws an unhandled exception, the library will fail the job. Zeebe will then retry the job according to the retry settings of the task. Sometimes you want to halt the entire process so you can investigate. To have the library cancel the process on an unhandled exception, pass in `{failProcessOnException: true}` to the `createWorker` call: - -```typescript -import { ZBClient } from '@camunda8/zeebe' - -const zbc = new ZBClient() - -zbc.createWorker({ - taskType: 'console-log', - taskHandler: maybeFaultyHandler, - failProcessOnException: true, -}) -``` - - - -### Completing tasks with success, failure, error, or forwarded - -To complete a task, the job object that the task worker handler function receives has `complete`, `fail`, and `error` methods. - -Call `job.complete()` passing in a optional plain old JavaScript object (POJO) - a key:value map. These are variable:value pairs that will be used to update the process state in the broker. They will be merged with existing values. You can set an existing key to `null` or `undefined`, but there is no way to delete a key. - -Call `job.fail()` to fail the task. You mus t pass in a string message describing the failure. The client library decrements the retry count, and the broker handles the retry logic. If the failure is a hard failure and should cause an incident to be raised in Operate, then pass in `0` for the optional second parameter, `retries`: - -```javascript -job.fail('This is a critical failure and will raise an incident', 0) -``` - -From version 8.0.0 of the package, used with a 8.0.0 Zeebe broker, you can specify to the broker an optional backoff for the reactivation of the job, like this: - -```javascript -job.fail({ - errorMessage: 'Triggering a retry with a two second back-off', - retryBackOff: 2000, - retries: 1, -}) -``` - -Call `job.error()` to trigger a BPMN error throw event. You must pass in a string error code for the error code, and you can pass an optional error message as the second parameter. If no BPMN error catch event exists for the error code, an incident will be raised. - -```javascript -job.error('RECORD_NOT_FOUND', 'Could not find the customer in the database') -``` - -From 8.2.5 of the client, you can update the variables in the workflow when you throw a BPMN error in a worker: - -```javascript -job.error({ - errorCode: 'RECORD_NOT_FOUND', - errorMessage: 'Could not find the customer in the database', - variables: { - someVariable: 'someValue', - }, -}) -``` - -Call `job.forwarded()` to release worker capacity to handle another job, without completing the job in any way with the Zeebe broker. This method supports the _decoupled job completion_ pattern. In this pattern, the worker forwards the job to another system - a lambda or a RabbitMQ queue. Some other process is ultimately responsible for completing the job. - - - -## Working with Process Variables and Custom Headers - -Process variables are available in your worker job handler callback as `job.variables`, and any custom headers are available as `job.customHeaders`. - -These are read-only JavaScript objects in the Zeebe Node client. However, they are not stored that way in the broker. - -Both process variables and custom headers are stored in the broker as a dictionary of named strings. That means that the variables and custom headers are JSON.parsed in the Node client when it fetches the job, and any update passed to the `success()` function is JSON.stringified. - -If you pass in a circular JSON structure to `complete()` - like, for example the response object from an HTTP call - it will throw, as this cannot be serialised to a string. - -To update a key deep in the object structure of a process variable, you can use the [deepmerge utility](https://www.npmjs.com/package/deepmerge): - -```TypeScript -const merge = require('deepmerge') -import { ZBClient } from '@camunda8/zeebe' - -const zbc = new ZBClient() - -zbc.createWorker({ - taskType: 'some-task', - taskHandler: job => { - const { people } = job.variables - // update bob's age, keeping all his other properties the same - job.complete(merge(people, { bob: { age: 23 } })) - } -}) -``` - -When setting custom headers in BPMN tasks, while designing your model, you can put stringified JSON as the value for a custom header, and it will show up in the client as a JavaScript object. - -Process variables and custom headers are untyped in the Zeebe broker, however the Node client in TypeScript mode provides the option to type them to provide safety. You can type your worker as `any` to turn that off: - -```TypeScript -// No type checking - totally dynamic and unchecked -zbc.createWorker({ - taskType: 'yolo-jobs', - taskHandler: (job) => { - console.log(`Look ma - ${job.variables?.anything?.goes?.toUpperCase()}`) - job.complete({what: job.variables.could.possibly.go.wrong}) - } -}) -``` - -See the section [Writing Strongly-typed Job Workers](#strongly-typed) for more details. - - - -## Constraining the Variables Fetched by the Worker - -Sometimes you only need a few specific process variables to service a job. One way you can achieve constraint on the process variables received by a worker is by using [input variable mappings](https://docs.zeebe.io/reference/variables.html#inputoutput-variable-mappings) on the task in the model. - -You can also use the `fetchVariable` parameter when creating a worker. Pass an array of strings, containing the names of the variables to fetch, to the `fetchVariable` parameter when creating a worker. Here is an example, in JavaScript: - -```javascript -zbc.createWorker({ - taskType: 'process-favorite-albums', - taskHandler: (job) => { - const { name, albums } = job.variables - console.log(`${name} has the following albums: ${albums.join(', ')}`) - job.complete() - }, - fetchVariable: ['name', 'albums'], -}) -``` - -If you are using TypeScript, you can supply an interface describing the process variables, and parameterize the worker: - -```TypeScript -interface Variables { - name: string - albums: string[] -} - -zbc.createWorker({ - taskType: 'process-favorite-albums', - taskHandler: (job) => { - const { name, albums = [] } = job.variables - console.log(`${name} has the following albums: ${albums?.join?.(', ')}`) - job.complete() - }, - fetchVariable: ['name', 'albums'], -}) -``` - -This parameterization does two things: - -- It informs the worker about the expected types of the variables. For example, if `albums` is a string, calling `join` on it will fail at runtime. Providing the type allows the compiler to reason about the valid methods that can be applied to the variables. -- It allows the type-checker to pick up spelling errors in the strings in `fetchVariable`, by comparing them with the Variables typing. - -Note, that this does not protect you against run-time exceptions where your typings are incorrect, or the payload simply does not match the definition that you provided. - -See the section [ Writing Strongly-typed Job Workers ](#strongly-typed) for more details on run-time safety. - -You can turn off the type-safety by typing the worker as `any`: - -```TypeScript -zbc.createWorker({ - taskType: 'process-favorite-albums', - taskHandler: (job) => { - const { name, albums = [] } = job.variables - // TS 3.7 safe access to .join _and_ safe call, to prevent run-time exceptions - console.log(`${name} has the following albums: ${albums?.join?.(', ')}`) - job.complete() - }, - fetchVariable: ['name', 'albums'], -}) -``` - - - -## The "Decoupled Job Completion" pattern - -The _Decoupled Job Completion_ pattern uses a Zeebe Job Worker to activate jobs from the broker, and some other asynchronous (remote) system to do the work. - -You might activate jobs and then send them to a RabbitMQ queue, or to an AWS lambda. In this case, there may be no outcome about the job that this worker can report back to the broker about success or failure. That will be the responsibility of another part of your distributed system. - -The first thing you should do is ensure that you activate the job with sufficient time for the complete execution of your system. Your worker will not be completing the job, but it informs the broker how long the expected loop will take to close. - -Next, call `job.forward()` in your job worker handler. This has no side-effect with the broker - so nothing is communicated to Zeebe. The job is still out there with your worker as far as Zeebe is concerned. What this call does is release worker capacity to request more jobs. - -If you are using the Zeebe Node library in the remote system, or if the remote system eventually reports back to you (perhaps over a different RabbitMQ queue), you can use the ZBClient methods `completeJob()`, `failJob()`, and `throwError()` to report the outcome back to the broker. - -You need at least the `job.key`, to be able to correlate the result back to Zeebe. Presumably you also want the information from the remote system about the outcome, and any updated variables. - -Here is an example: - -- You have a COBOL system that runs a database. -- Somebody wrote an adapter for this COBOL database. In executes commands over SSH. -- The adapter is accessible via a RabbitMQ "request" queue, which takes a command and a correlation id, so that its response can be correlated to this request. -- The adapter sends back the COBOL database system response on a RabbitMQ "response" queue, with the correlation id. -- It typically takes 15 seconds for the round-trip through RabbitMQ to the COBOL database and back. - -You want to put this system into a Zeebe-orchestrated BPMN model as a task. - -Rather than injecting a RabbitMQ listener into the job handler, you can "_fire and forget_" the request using the decoupled job completion pattern. - -Here is how you do it: - -- Your worker gets the job from Zeebe. -- Your worker makes the command and sends it down the RabbitMQ "request" queue, with the `job.jobKey` as the correlation id. -- Your worker calls `job.forward()` - -Here is what that looks like in code: - -```TypeScript -import { RabbitMQSender } from './lib/my-awesome-rabbitmq-api' -import { ZBClient, Duration } from '@camunda8/zeebe' - -const zbc = new ZBClient() - -const cobolWorker = zbc.createWorker({ - taskType: 'cobol-insert', - timeout: Duration.seconds.of(20), // allow 5s over the expected 15s - taskHandler: job => { - const { key, variables } = job - const request = { - correlationId: key, - command: `INSERT ${variables.customer} INTO CUSTOMERS` - } - RabbitMQSender.send({ - channel: 'COBOL_REQ', - request - }) - // Call forward() to release worker capacity - return job.forward() - } -) -``` - -Now for the response part: - -- Another part of your system listens to the RabbitMQ response queue. -- It gets a response back from the COBOL adapter. -- It examines the response, then sends the appropriate outcome to Zeebe, using the jobKey that has been attached as the correlationId - -```TypeScript -import { RabbitMQListener } from './lib/my-awesome-rabbitmq-api' -import { ZBClient } from '@camunda8/zeebe' - -const zbc = new ZBClient() - -const RabbitMQListener.listen({ - channel: 'COBOL_RES', - handler: message => { - const { outcome, correlationId } = message - if (outcome.SUCCESS) { - zbc.completeJob({ - jobKey: correlationId, - variables: {} - }) - } - if (outcome.ERROR) { - zbc.throwError({ - jobKey: correlationId, - errorCode: "5", - errorMessage: "The COBOL Database reported an error. Boo!" - }) - } - }) -} -``` - -See also the section "[Publish a Message](#publish-message)", for a pattern that you can use when it is not possible to attach the job key to the round trip data response. - - - -## The `ZBBatchWorker` Job Worker - -The `ZBBatchWorker` Job Worker batches jobs before calling the job handler. Its fundamental differences from the ZBWorker are: - -- Its job handler receives an _array_ of one or more jobs. -- The handler is not invoked immediately, but rather when enough jobs are batched, or a job in the batch is at risk of being timed out by the Zeebe broker. - -You can use the batch worker if you have tasks that _benefit from processing together_, but are _not related in the BPMN model_. - -An example would be a high volume of jobs that require calls to an external system, where you have to pay per call to that system. In that case, you may want to batch up jobs, make one call to the external system, then update all the jobs and send them on their way. - -The batch worker works on a _first-of_ batch size _or_ batch timeout basis. - -You must configure both `jobBatchMinSize` and `jobBatchMaxTime`. Whichever condition is met first will trigger the processing of the jobs: - -- Enough jobs are available to the worker to satisfy the minimum job batch size; -- The batch has been building for the maximum amount of time - "_we're doing this now, before the earliest jobs in the batch time out on the broker_". - -You should be sure to specify a `timeout` for your worker that is `jobBatchMaxTime` _plus_ the expected latency of the external call _plus_ your processing time and network latency, to avoid the broker timing your batch worker's lock and making the jobs available to another worker. That would defeat the whole purpose. - -Here is an example of using the `ZBBatchWorker`: - -```TypeScript -import { API } from './lib/my-awesome-external-api' -import { ZBClient, BatchedJob, Duration } from '@camunda8/zeebe' - -const zbc = new ZBClient() - -// Helper function to find a job by its key -const findJobByKey = jobs => key => jobs.filter(job => job.jobKey === id)?.[0] ?? [] - -const handler = async (jobs: BatchedJob[]) => { - console.log("Let's do this!") - const {jobKey, variables} = job - // Construct some hypothetical payload with correlation ids and requests - const req = jobs.map(job => ({id: jobKey, data: variables.request})) - // An uncaught exception will not be managed by the library - try { - // Our API wrapper turns that into a request, and returns - // an array of results with ids - const outcomes = await API.post(req) - // Construct a find function for these jobs - const getJob = findJobByKey(jobs) - // Iterate over the results and call the succeed method on the corresponding job, - // passing in the correlated outcome of the API call - outcomes.forEach(res => getJob(res.id)?.complete(res.data)) - } catch (e) { - jobs.forEach(job => job.fail(e.message)) - } -} - -const batchWorker = zbc.createBatchWorker({ - taskType: 'get-data-from-external-api', - taskHandler: handler, - jobBatchMinSize: 10, // at least 10 at a time - jobBatchMaxTime: 60, // or every 60 seconds, whichever comes first - timeout: Duration.seconds.of(80) // 80 second timeout means we have 20 seconds to process at least -}) -``` - -See [this blog post](http://joshwulf.com/blog/2020/03/zb-batch-worker/) for some more details on the implementation. - - - -### Long polling - -With Zeebe 0.21 onward, long polling is supported for clients, and is used by default. Rather than polling continuously for work and getting nothing back, a client can poll once and leave the request open until work appears. This reduces network traffic and CPU utilization in the server. Every JobActivation Request is appended to the event log, so continuous polling can significantly impact broker performance, especially when an exporter is loaded (see [here](https://github.com/zeebe-io/zeebe-client-node-js/issues/64#issuecomment-520233275)). - -Long polling sends the `ActivateJobs` command to the broker, and waits for up to the long poll interval for jobs to be available, rather than returning immediately with an empty response if no jobs are available at that moment. - -The default long poll duration is 30s. - -To use a different long polling duration, pass in a long poll timeout in milliseconds to the client. All workers created with that client will use it. Alternatively, set a period per-worker. - -Long polling for workers is configured in the ZBClient like this: - -```typescript -const { ZBClient, Duration } = require('@camunda8/zeebe') - -const zbc = new ZBClient('serverAddress', { - longPoll: Duration.minutes.of(10), // Ten minutes - inherited by workers -}) - -const longPollingWorker = zbc.createWorker({ - taskType: 'task-type', - taskHandler: handler, - longPoll: Duration.minutes.of(2), // override client, poll 2m -}) -``` - - - -### Poll Interval - -The poll interval is a timer that fires on the configured interval and sends an `ActivateJobs` command if no pending command is currently active. By default, this is set to 300ms. This guarantees that there will be a minimum of 300ms between `ActivateJobs` commands, which prevents flooding the broker. - -Too many `ActivateJobs` requests per period of time can cause broker backpressure to kick in, and the gateway to return a GRPC 8 error code. - -You can configure this with the `pollInterval` option in the client constructor, in which case all workers inherit it as their default. You can also override this by specifying a value in the `createWorker` call: - -```typescript -const zbc = new ZBClient({ - pollInterval: Duration.milliseconds.of(500), -}) - -const worker = zbc.createWorker({ - taskType: 'send-email', - taskHandler: sendEmailWorkerHandler, - pollInterval: Duration.milliseconds.of(750), -}) -``` - -## Client Commands - - - -### Deploy Process Models and Decision Tables - -From version 8 of Zeebe, `deployProcess` in deprecated in favor of `deployResource` which allows you to deploy both process models and DMN tables. - -You can deploy a resource as a buffer, or by passing a filename - in which case the client library will load the file into a buffer for you. - -### Deploy Process Model - -By passing a filename, and allowing the client library to load the file into a buffer: - -```typescript -async function deploy() { - const zbc = new ZBClient() - const result = await zbc.deployResource({ - processFilename: `./src/__tests__/testdata/Client-DeployWorkflow.bpmn`, - }) -} -``` - -By passing a buffer, and a name: - -```typescript -async function deploy() { - const zbc = new ZBClient() - const process = fs.readFileSync( - `./src/__tests__/testdata/Client-DeployWorkflow.bpmn` - ) - const result = await zbc.deployResource({ - process, - name: `Client-DeployWorkflow.bpmn`, - }) -} -``` - -### Deploy DMN Table - -By passing a filename, and allowing the client library to load the file into a buffer: - -```typescript -async function deploy() { - const zbc = new ZBClient() - const result = await zbc.deployResource({ - decisionFilename: `./src/__tests__/testdata/quarantine-duration.dmn`, - }) -} -``` - -By passing a buffer, and a name: - -```typescript -async function deploy() { - const zbc = new ZBClient() - const decision = fs.readFileSync( - `./src/__tests__/testdata/quarantine-duration.dmn` - ) - const result = await zbc.deployResource({ - decision, - name: `quarantine-duration.dmn`, - }) -} -``` - -### Deploy Form - -From 8.3.1, you can deploy a form to the Zeebe broker: - -```javascript -async function deploy() { - const zbc = new ZBClient() - const form = fs.readFileSync('./src/__tests__/testdata/form_1.form') - const result = await zbc.deployResource({ - form, - name: 'form_1.form', - }) -} -``` - - - -### Start a Process Instance - -```javascript -const ZB = require('@camunda8/zeebe') - -;(async () => { - const zbc = new ZB.ZBClient('localhost:26500') - const result = await zbc.createProcessInstance({ - bpmnProcessId: 'test-process', - variables: { - testData: 'something', - }, - }) - console.log(result) -})() -``` - -Example output: - -```javascript - -{ processKey: '3', - bpmnProcessId: 'test-process', - version: 1, - processInstanceKey: '569' } - -``` - - - -### Start a Process Instance of a specific version of a Process definition - -From version 0.22 of the client onward: - -```javascript -const ZB = require('@camunda8/zeebe') - -;(async () => { - const zbc = new ZB.ZBClient('localhost:26500') - const result = await zbc.createProcessInstance({ - bpmnProcessId: 'test-process', - variables: { - testData: 'something', - }, - version: 5, - }) - console.log(result) -})() -``` - - - -### Start a Process Instance and await the Process Outcome - -From version 0.22 of the broker and client, you can await the outcome of a process end-to-end execution: - -```typescript -async function getOutcome() { - const result = await zbc.createProcessInstanceWithResult({ - bpmnProcessId: processId, - variables: { - sourceValue: 5, - }, - }) - return result -} -``` - -Be aware that by default, **this will throw an exception if the process takes longer than 15 seconds to complete**. - -To override the gateway's default timeout for a process that needs more time to complete: - -```typescript -const { ZBClient, Duration } = require('@camunda8/zeebe') - -const zbc = new ZBClient() - -const result = await zbc.createProcessInstanceWithResult({ - bpmnProcessId: processId, - variables: { - sourceValue: 5, - otherValue: 'rome', - }, - requestTimeout: Duration.seconds.of(25), - // also works supplying a number of milliseconds - // requestTimeout: 25000 -}) -``` - - - -### Publish a Message - -You can publish a message to the Zeebe broker that will be correlated with a running process instance: - -```javascript -const { ZBClient, Duration } = require('@camunda8/zeebe') - -const zbc = new ZBClient() - -zbc.publishMessage({ - correlationKey: 'value-to-correlate-with-process-variable', - messageId: uuid.v4(), - name: 'message-name', - variables: { valueToAddToProcessVariables: 'here', status: 'PROCESSED' }, - timeToLive: Duration.seconds.of(10), // seconds -}) -``` - -When would you do this? Well, the sky is not even the limit when it comes to thinking creatively about building a system with Zeebe - _and_ here's one concrete example to get you thinking: - -Recall the example of the _remote COBOL database_ in the section "[The "Decoupled Job Completion" pattern](#decoupled-complete)". We're writing code to allow that system to be participate in a BPMN-modelling process orchestrated by Zeebe. - -But what happens if the adapter for that system has been written in such a way that there is no opportunity to attach metadata to it? In that case we have no opportunity to attach a job key. Maybe you send the fixed data for the command, and you have to correlate the response based on those fields. - -Another example: think of a system that emits events, and has no knowledge of a running process. An example from one system that I orchestrate with Zeebe is Minecraft. A logged-in user in the game performs some action, and code in the game emits an event. I can catch that event in my Node-based application, but I have no knowledge of which running process to target - _and_ the event was not generated from a BPMN task providing a worker with the complete context of a process. - -In these two cases, I can publish a message to Zeebe, and let the broker figure out which processes are: - -- Sitting at an intermediate message catch event waiting for this message; or -- In a sub-process that has a boundary event that will be triggered by this message; or -- Would be started by a message start event, on receiving this message. - -The Zeebe broker correlates a message to a running process instance _not on the job key_ - but on _the value of one of the process variables_ (for intermediate message events) and _the message name_ (for all message events, including start messages). - -So the response from your COBOL database system, sans job key, is sent back to Zeebe from the RabbitMQListener not via `completeJob()`, but with `publishMessage()`, and the value of the payload is used to figure out which process it is for. - -In the case of the Minecraft event, a message is published to Zeebe with the Minecraft username, and that is used by Zeebe to determine which processes are running for that user and are interested in that event. - -See the article "[Zeebe Message Correlation](https://zeebe.io/blog/2019/08/zeebe-message-correlation/)" for a complete example with code. - - - -### Publish a Start Message - -You can also publish a message targeting a [Message Start Event](https://github.com/zeebe-io/zeebe/issues/1858). -In this case, the correlation key is optional, and all Message Start events that match the `name` property will receive the message. - -You can use the `publishStartMessage()` method to publish a message with no correlation key (it will be set to a random uuid in the background): - -```javascript -const { ZBClient, Duration } = require('@camunda8/zeebe') - -const zbc = new ZB.ZBClient('localhost:26500') -zbc.publishStartMessage({ - messageId: uuid.v4(), - name: 'message-name', - variables: { initialProcessVariable: 'here' }, - timeToLive: Duration.seconds.of(10), // seconds -}) -``` - -Both normal messages and start messages can be published idempotently by setting both the `messageId` and the `correlationKey`. They will only ever be correlated once. See: [A message can be published idempotent](https://github.com/zeebe-io/zeebe/issues/1012). - - - -### Activate Jobs - -If you have some use case that doesn't fit the existing workers, you can write your own custom workers using the `ZBClient.activateJobs()` method. It takes an `ActivateJobsRequest` object, and returns a stream for that call. - -Attach a listener to the stream's 'data' event, and it will be called with an `ActivateJobsResponse` object if there are jobs to work on. - -To complete these jobs, use the `ZBClient` methods `completeJob()`, `failJob()`, and `throwError()`. - -For more details, read the source code of the library, particularly the `ZBWorkerBase` class. This is an advanced use case, and the existing code in the library is the best documentation. - -## Other Concerns - - - -### Graceful Shutdown - -To drain workers, call the `close()` method of the ZBClient. This causes all workers using that client to stop polling for jobs, and returns a Promise that resolves when all active jobs have either finished or timed out. - -```javascript -console.log('Closing client...') -zbc.close().then(() => console.log('All workers closed')) -``` - - - -## Logging - -Control the log output for the client library by setting the ZBClient log level. Valid log levels are `NONE` (supress all logging), `ERROR` (log only exceptions), `INFO` (general logging), or `DEBUG` (verbose logging). You can set this in the client constructor: - -```typescript -const zbc = new ZBClient('localhost', { loglevel: 'DEBUG' }) -``` - -And also via the environment: - -```bash -ZEEBE_NODE_LOG_LEVEL='ERROR' node start.js -``` - -By default the library uses `console.info` and `console.error` for logging. You can also pass in a custom logger, such as [pino](https://github.com/pinojs/pino): - -```typescript -const logger = require('pino')() -const zbc = new ZBClient({ stdout: logger }) -``` - -From version v0.23.0-alpha.1, the library logs human-readable logs by default, using the `ZBSimpleLogger`. If you want structured logs as stringified JSON, pass in `ZBJSONLogger` to the constructor `stdout` option, like this: - -```typescript -const { ZBJsonLogger, ZBClient } = require('@camunda8/zeebe') -const zbc = new ZBClient({ stdout: ZBJsonLogger }) -``` - -You can also control this via environment variables: - -```bash -export ZEEBE_NODE_LOG_TYPE=SIMPLE # Simple Logger (default) -export ZEEBE_NODE_LOG_TYPE=JSON # JSON Logger -``` - - - -### Generating TypeScript constants for BPMN Models - -Message names and Task Types are untyped magic strings. You can generate type information to avoid some classes of errors. - -#### 0.22.0-alpha.5 and above - -Install the package globally: - -``` -npm i -g @camunda8/zeebe -``` - -Now you have the command `@camunda8/zeebe ` that parses a BPMN file and emits type definitions. - -#### All versions - -The `BpmnParser` class provides a static method `generateConstantsForBpmnFiles()`. -This method takes a filepath and returns TypeScript definitions that you can use to avoid typos in your code, and to reason about the completeness of your task worker coverage. - -```javascript -const ZB = require('@camunda8/zeebe') -;(async () => { - console.log(await ZB.BpmnParser.generateConstantsForBpmnFiles(processFile)) -})() -``` - -This will produce output similar to: - -```typescript -// Autogenerated constants for msg-start.bpmn - -export enum TaskType = { - CONSOLE_LOG = "console-log" -}; - -export enum MessageName = { - MSG_EMIT_FRAME = "MSG-EMIT_FRAME", - MSG_START_JOB = "MSG-START_JOB" -}; -``` - - - -## Generating code from a BPM Model file - -You can scaffold your worker code from a BPMN file with the `zeebe-node` command. To use this command, install the package globally with: - -```bash -npm i -g zeebe-node -``` - -Pass in the path to the BPMN file, and it will output a file to implement it: - -```bash -zeebe-node my-model.bpmn -``` - - - -### Writing Strongly-typed Job Workers - -You can provide interfaces to get design-time type safety and intellisense on the process variables passed in the a worker job handler, the custom headers that it will receive, and the variables that it will pass back to Zeebe in the `complete.success` call: - -```TypeScript -interface InputVariables { - name: string, - age: number, - preferences: { - beverage: 'Coffee' | 'Tea' | 'Beer' | 'Water', - color: string - } -} - -interface OutputVariables { - suggestedGift: string -} - -interface CustomHeaders { - occasion: 'Birthday' | 'Christmas' | 'Hannukah' | 'Diwali' -} - -const giftSuggester = zbc.createWorker< - InputVariables, - CustomHeaders, - OutputVariables> - ('get-gift-suggestion', (job) => { - const suggestedGift = `${job.customHeaders.occasion} ${job.variables.preferences.beverage}` - job.complete({ suggestedGift }) -}) -``` - -If you decouple the declaration of the job handler from the `createWorker` call, you will need to explicitly specify its type, like this: - -```TypeScript -import { ZBWorkerTaskHandler } from 'zeebe-node' - -function getGiftSuggestion(job): ZBWorkerTaskHandler { - const suggestedGift = `${job.customHeaders.occasion} ${job.variables.preferences.beverage}` - job.complete({ suggestedGift }) -} - -const giftSuggester = zbc.createWorker({ - taskType: 'get-gift-suggestion', - taskHandler: getGiftSuggestion -}) -``` - - - -## Run-time Type Safety - -The parameterization of the client and workers helps to catch errors in code, and if your interface definitions are good, can go a long way to making sure that your workers and client emit the correct payloads and have a strong expectation about what they will receive, but it does not give you any _run-time safety_. - -Your type definition may be incorrect, or the variables or custom headers may simply not be there at run-time, as there is no type checking in the broker, and other factors are involved, such as tasks with input and output mappings, and data added to the process variables by REST calls and other workers. - -You should consider: - -- Writing interface definitions for your payloads to get design-time assist for protection against spelling errors as you demarshal and update variables. -- Testing for the existence of variables and properties on payloads, and writing defensive pathways to deal with missing properties. If you mark _everything_ as optional in your interfaces, the type-checker will force you to write that code. -- Surfacing code exceptions operationally to detect and diagnose mismatched expectations. -- If you want to validate inputs and outputs to your system at runtime, you can use [io-ts](https://github.com/gcanti/io-ts). Once data goes into that, it either exits through an exception handler, or is guaranteed to have the shape of the defined codec at run-time. - -As with everything, it is a balancing act / trade-off between correctness, safety, and speed. You do not want to lock everything down while you are still exploring. - -I recommend the following scale, to match the maturity of your system: - -- Start with `` typing for the workers; then -- Develop interfaces to describe the DTOs represented in your process variables; -- Use optional types on those interfaces to check your defensive programming structures; -- Lock down the run-time behaviour with io-ts as the boundary validator. - -You may choose to start with the DTOs. Anyway, there are options. - - - -## Developing Zeebe Node - -The source is written in TypeScript in `src`, and compiled to ES6 in the `dist` directory. - -To build: - -```bash -npm run build -``` - -To start a watcher to build the source and API docs while you are developing: - -```bash -npm run dev -``` - - - -### Tests - -Tests are written in Jest, and live in the `src/__tests__` directory. To run the unit tests: - -```bash -npm t -``` - -Integration tests are in the `src/__tests__/integration` directory. - -They require a Zeebe broker to run. You can start a dockerised broker: - -```bash -cd docker -docker-compose up -``` - -And then run them manually: - -```bash -npm run test:integration -``` - -For the failure test, you need to run Operate and manually verify that an incident has been raised. - - - -### Writing Tests - -Zeebe is inherently stateful, so integration tests need to be carefully isolated so that workers from one test do not service tasks in another test. Jest runs tests in a random order, so intermittent failures are the outcome of tests that mutate shared state. - -The tests use a templating function to replace the process id, task types and message names in the bpmn model to produce distinct, isolated namespaces for each test and each test run. - - - -## Contributors - -| Name | -| ------------------------------------------------------------ | -| **[Josh Wulf](https://github.com/jwulf)** | -| **[Colin Raddatz](https://github.com/ColRad)** | -| **[Jarred Filmer](https://github.com/BrighTide)** | -| **[Timothy Colbert](https://github.com/s3than)** | -| **[Olivier Albertini](https://github.com/OlivierAlbertini)** | -| **[Patrick Dehn](https://github.com/pedesen)** | diff --git a/src/zeebe/lib/cancelProcesses.ts b/src/zeebe/lib/cancelProcesses.ts index fc1e1e03..e35fa1e0 100644 --- a/src/zeebe/lib/cancelProcesses.ts +++ b/src/zeebe/lib/cancelProcesses.ts @@ -8,13 +8,16 @@ export async function cancelProcesses(processDefinitionKey: string) { } const processes = await operate.searchProcessInstances({ filter: { - processDefinitionKey: +processDefinitionKey, + processDefinitionKey, }, }) await Promise.all( - processes.items.map((item) => - operate.deleteProcessInstance(+item.bpmnProcessId) - ) + processes.items.map((item) => { + return operate.deleteProcessInstance(item.key).catch((e) => { + console.log(`Failed to delete process ${item.key}`) + console.log(e) + }) + }) ) } diff --git a/src/zeebe/zb/ZeebeGrpcClient.ts b/src/zeebe/zb/ZeebeGrpcClient.ts index 2df0d69c..a6769d59 100644 --- a/src/zeebe/zb/ZeebeGrpcClient.ts +++ b/src/zeebe/zb/ZeebeGrpcClient.ts @@ -707,7 +707,6 @@ export class ZeebeGrpcClient extends TypedEmitter< }: { resourceKey: string }): Promise> { - console.log('resourceKey', resourceKey) return this.executeOperation('deleteResourceSync', () => this.grpc.deleteResourceSync({ resourceKey }) )