Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alpha #178

Merged
merged 10 commits into from
Jun 10, 2024
4 changes: 3 additions & 1 deletion .github/workflows/pr-test.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
name: Pull Request tests

on: [pull_request]
on:
pull_request:
workflow_dispatch:

jobs:
unit-tests:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: Publish a new version

on:
workflow_dispatch:
push:
branches:
- main
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## [8.6.1-alpha.1](https://github.com/camunda/camunda-8-js-sdk/compare/v8.6.0...v8.6.1-alpha.1) (2024-06-07)


### Features

* **zeebe:** add multi-tenant support to workers ([#175](https://github.com/camunda/camunda-8-js-sdk/issues/175)) ([28450a5](https://github.com/camunda/camunda-8-js-sdk/commit/28450a50a2cbb70b5f8958e1d94c144f817a8758)), closes [#171](https://github.com/camunda/camunda-8-js-sdk/issues/171)
* **zeebe:** add updateJobTimeout method ([#172](https://github.com/camunda/camunda-8-js-sdk/issues/172)) ([5eff624](https://github.com/camunda/camunda-8-js-sdk/commit/5eff6243dbce5fd296daeedcf6191ef4c4d4b609)), closes [#171](https://github.com/camunda/camunda-8-js-sdk/issues/171)
* **zeebe:** support StreamActivatedJobs RPC ([#160](https://github.com/camunda/camunda-8-js-sdk/issues/160)) ([258296a](https://github.com/camunda/camunda-8-js-sdk/commit/258296aef6558f976dd299ea977514d58d822141)), closes [#17](https://github.com/camunda/camunda-8-js-sdk/issues/17)

# [8.6.0](https://github.com/camunda/camunda-8-js-sdk/compare/v8.5.4...v8.6.0) (2024-06-05)

## [8.5.5-alpha.1](https://github.com/camunda/camunda-8-js-sdk/compare/v8.5.4...v8.5.5-alpha.1) (2024-06-05)
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@camunda8/sdk",
"version": "8.6.0",
"version": "8.6.1-alpha.1",
"description": "",
"main": "dist/index.js",
"scripts": {
Expand Down
44 changes: 28 additions & 16 deletions src/__tests__/config/jest.cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,44 @@ export const cleanUp = async () => {
const processIds = (bpmn as any[]).map(
(b) => b?.['bpmn:definitions']?.['bpmn:process']?.['@_id']
)
const operate = new OperateApiClient()
const zeebe = new ZeebeGrpcClient({
config: {
zeebeGrpcSettings: { ZEEBE_CLIENT_LOG_LEVEL: 'NONE' },
},
})
for (const id of processIds) {
if (id) {
const res = await operate.searchProcessInstances({
filter: { bpmnProcessId: id, state: 'ACTIVE' },
})
const instancesKeys = res.items.map((instance) => instance.key)
if (instancesKeys.length > 0) {
console.log(`Cancelling ${instancesKeys.length} instances for ${id}`)
}
for (const key of instancesKeys) {
try {
await zeebe.cancelProcessInstance(key)
console.log(`Cancelled process instance ${key}`)
} catch (e) {
console.log('Failed to cancel process instance', key)
console.log((e as Error).message)
// Are we running in a multi-tenant environment?
const multiTenant = !!process.env.CAMUNDA_TENANT_ID
const tenantIds = multiTenant
? ['<default>', 'red', 'green']
: [undefined]
for (const tenantId of tenantIds) {
const operate = new OperateApiClient({
config: {
CAMUNDA_TENANT_ID: tenantId,
},
})
const res = await operate.searchProcessInstances({
filter: { bpmnProcessId: id, state: 'ACTIVE' },
})
const instancesKeys = res.items.map((instance) => instance.key)
if (instancesKeys.length > 0) {
console.log(
`Don't worry about it - Operate is eventually consistent.`
`Cancelling ${instancesKeys.length} instances for ${id} in tenant '${tenantId}'...`
)
}
for (const key of instancesKeys) {
try {
await zeebe.cancelProcessInstance(key)
console.log(`Cancelled process instance ${key}`)
} catch (e) {
if (!(e as Error).message.startsWith('5 NOT_FOUND')) {
console.log('Failed to cancel process instance', key)
console.log((e as Error).message)
}
}
}
}
}
}
Expand Down
48 changes: 48 additions & 0 deletions src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1o5c8zw" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="multi-tenant-stream-worker-test" name="Multi-tenant Stream Worker Test" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start multi-tenancy worker test">
<bpmn:outgoing>Flow_0r8p543</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0r8p543" sourceRef="StartEvent_1" targetRef="Activity_1an5aay" />
<bpmn:endEvent id="Event_1hylnf3" name="Multi-tenancy worker test complete">
<bpmn:incoming>Flow_08wm3o9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_08wm3o9" sourceRef="Activity_1an5aay" targetRef="Event_1hylnf3" />
<bpmn:serviceTask id="Activity_1an5aay" name="multi-tenant-stream-work">
<bpmn:extensionElements>
<zeebe:taskDefinition type="multi-tenant-stream-work" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0r8p543</bpmn:incoming>
<bpmn:outgoing>Flow_08wm3o9</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="multi-tenant-stream-worker-test">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="160" y="142" width="75" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1hylnf3_di" bpmnElement="Event_1hylnf3">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="417" y="142" width="66" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0cx6d07_di" bpmnElement="Activity_1an5aay">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0r8p543_di" bpmnElement="Flow_0r8p543">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_08wm3o9_di" bpmnElement="Flow_08wm3o9">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
48 changes: 48 additions & 0 deletions src/__tests__/testdata/multi-tenant-worker-test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1o5c8zw" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="multi-tenant-worker-test" name="Multi-tenant Worker Test" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start multi-tenancy worker test">
<bpmn:outgoing>Flow_0r8p543</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0r8p543" sourceRef="StartEvent_1" targetRef="Activity_1an5aay" />
<bpmn:endEvent id="Event_1hylnf3" name="Multi-tenancy worker test complete">
<bpmn:incoming>Flow_08wm3o9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_08wm3o9" sourceRef="Activity_1an5aay" targetRef="Event_1hylnf3" />
<bpmn:serviceTask id="Activity_1an5aay" name="multi-tenant-work">
<bpmn:extensionElements>
<zeebe:taskDefinition type="multi-tenant-work" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0r8p543</bpmn:incoming>
<bpmn:outgoing>Flow_08wm3o9</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="multi-tenant-worker-test">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="160" y="142" width="75" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1hylnf3_di" bpmnElement="Event_1hylnf3">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="417" y="142" width="66" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0cx6d07_di" bpmnElement="Activity_1an5aay">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0r8p543_di" bpmnElement="Flow_0r8p543">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_08wm3o9_di" bpmnElement="Flow_08wm3o9">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
110 changes: 110 additions & 0 deletions src/__tests__/zeebe/multitenancy/multitenant-worker-mt.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe/index'

jest.setTimeout(10000)

beforeAll(() => {
suppressZeebeLogging()
})

afterAll(() => {
restoreZeebeLogging()
})

test('A worker can be multi-tenant', async () => {
const client = new ZeebeGrpcClient()

await client.deployResource({
processFilename: './src/__tests__/testdata/multi-tenant-worker-test.bpmn',
tenantId: '<default>',
})

await client.deployResource({
processFilename: './src/__tests__/testdata/multi-tenant-worker-test.bpmn',
tenantId: 'green',
})

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-worker-test',
variables: { foo: 'bar' },
tenantId: '<default>',
})

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-worker-test',
variables: { foo: 'bar' },
tenantId: 'green',
})

let greenTenant = false,
defaultTenant = false
await new Promise((resolve) =>
client.createWorker({
taskHandler: (job) => {
greenTenant = greenTenant || job.tenantId === 'green'
defaultTenant = defaultTenant || job.tenantId === '<default>'
if (greenTenant && defaultTenant) {
resolve(null)
}
return job.complete()
},
taskType: 'multi-tenant-work',
tenantIds: ['<default>', 'green'],
})
)

await client.close()
})

test('A stream worker can be multi-tenant', async () => {
const client = new ZeebeGrpcClient()

await client.deployResource({
processFilename:
'./src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn',
tenantId: '<default>',
})

await client.deployResource({
processFilename:
'./src/__tests__/testdata/multi-tenant-stream-worker-test.bpmn',
tenantId: 'green',
})

let greenTenant = false,
defaultTenant = false
// eslint-disable-next-line no-async-promise-executor
await new Promise(async (resolve) => {
client.streamJobs({
taskHandler: async (job) => {
greenTenant = greenTenant || job.tenantId === 'green'
defaultTenant = defaultTenant || job.tenantId === '<default>'
const res = await job.complete()
if (greenTenant && defaultTenant) {
resolve(null)
}
return res
},
type: 'multi-tenant-stream-work',
tenantIds: ['<default>', 'green'],
worker: 'stream-worker',
timeout: 2000,
})

await new Promise((resolve) => setTimeout(resolve, 2000))

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-stream-worker-test',
variables: { foo: 'bar' },
tenantId: '<default>',
})

await client.createProcessInstance({
bpmnProcessId: 'multi-tenant-stream-worker-test',
variables: { foo: 'bar' },
tenantId: 'green',
})
})

await client.close()
})
7 changes: 0 additions & 7 deletions src/zeebe/lib/ZBStreamWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@ export class ZBStreamWorker implements IZBJobWorker {
stream.on('error', (e) => {
console.error(e)
})
// stream.on('pause', () => console.log('paused'))
// stream.on('metadata', (m) => console.log(m))
// stream.on('readable', () => console.log('readable'))
// stream.on('status', () => console.log('status'))
// stream.on('close', () => console.log('close'))
// stream.on('end', () => console.log('end'))
// stream.on('resume', (n) => console.log('resume', n))
stream.on('data', (res: ActivatedJob) => {
// Make handlers
const job: Job<WorkerInputVariables, CustomHeaderShape> =
Expand Down
8 changes: 5 additions & 3 deletions src/zeebe/lib/ZBWorkerBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export interface ZBWorkerConstructorConfig<
inputVariableDto?: { new (...args: any[]): Readonly<WorkerInputVariables> }
// eslint-disable-next-line @typescript-eslint/no-explicit-any
customHeadersDto?: { new (...args: any[]): Readonly<CustomHeaderShape> }
tenantIds: string[] | [string] | undefined
}

export class ZBWorkerBase<
Expand Down Expand Up @@ -101,7 +102,6 @@ export class ZBWorkerBase<
private pollMutex: boolean = false
private backPressureRetryCount: number = 0
private fetchVariable: (keyof WorkerInputVariables)[] | undefined
private tenantId?: string
private inputVariableDto: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (obj: any): WorkerInputVariables
Expand All @@ -110,6 +110,7 @@ export class ZBWorkerBase<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (...args: any[]): CustomHeaderShape
}
private tenantIds: string[] | [string] | undefined

constructor({
grpcClient,
Expand All @@ -121,13 +122,15 @@ export class ZBWorkerBase<
zbClient,
inputVariableDto,
customHeadersDto,
tenantIds,
}: ZBWorkerConstructorConfig<
WorkerInputVariables,
CustomHeaderShape,
WorkerOutputVariables
>) {
super()
options = options || {}
this.tenantIds = tenantIds
if (!taskType) {
throw new Error('Missing taskType')
}
Expand All @@ -146,7 +149,6 @@ export class ZBWorkerBase<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new (obj: any): CustomHeaderShape
})
this.tenantId = options.tenantId
this.taskHandler = taskHandler
this.taskType = taskType
this.maxJobsToActivate =
Expand Down Expand Up @@ -560,7 +562,7 @@ You should call only one job action method in the worker handler. This is a bug
type: this.taskType,
worker: this.id,
fetchVariable: this.fetchVariable as string[],
tenantIds: this.tenantId ? [this.tenantId] : undefined,
tenantIds: this.tenantIds,
}

this.logger.logDebug(
Expand Down
Loading
Loading