Skip to content

Commit

Permalink
* feat(zeebe): deployResources to deploy multiple resources atomically
Browse files Browse the repository at this point in the history
* docs(zeebe): document multi-tenant workers

* feat(zeebe): deployResources to deploy multiple resources atomically

add deployResources method

fixes #173
  • Loading branch information
jwulf authored Jun 12, 2024
1 parent f43d956 commit 695a26a
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 114 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,31 @@ Please note that only jobs that become available _after_ the stream is opened ar
In this release, this is not handled for you. You must both poll and stream jobs to make sure that you get jobs that were available before your application started as well as jobs that become available after your application starts.

In a subsequent release, the ZeebeWorker will transparently handle this for you.

## Multi-tenant workers

Workers, both polling and streaming, can be multi-tenanted, requesting jobs from more than one tenant.

Example:

```typescript
client.createWorker({
taskHandler: (job) => {
console.log(job.tenantId) // '<default>' | 'green'
return job.complete()
},
taskType: 'multi-tenant-work',
tenantIds: ['<default>', 'green'],
})

client.streamJobs({
taskHandler: async (job) => {
console.log(job.tenantId) // '<default>' | 'green'
return job.complete()
},
type: 'multi-tenant-stream-work',
tenantIds: ['<default>', 'green'],
worker: 'stream-worker',
timeout: 2000,
})
```
15 changes: 15 additions & 0 deletions src/__tests__/zeebe/integration/Client-DeployResource.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,18 @@ test('deploys a Form', async () => {
})
expect(result.deployments[0].form).not.toBeNull()
})
test.only('deploys multiple resources', async () => {
const result = await zbc.deployResources([
{
processFilename: './src/__tests__/testdata/Client-DeployWorkflow.bpmn',
},
{
decisionFilename: './src/__tests__/testdata/quarantine-duration.dmn',
},
{
form: fs.readFileSync('./src/__tests__/testdata/form_1.form'),
name: 'form_1.form',
},
])
expect(result.deployments.length).toBe(4)
})
56 changes: 56 additions & 0 deletions src/zeebe/lib/deployResource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { readFileSync } from 'fs'

export type Resource =
| { name: string; process: Buffer; tenantId?: string }
| { processFilename: string; tenantId?: string }
| { name: string; decision: Buffer; tenantId?: string }
| { decisionFilename: string; tenantId?: string }
| { name: string; form: Buffer; tenantId?: string }
| { formFilename: string; tenantId?: string }

const isProcess = (
maybeProcess: any // eslint-disable-line @typescript-eslint/no-explicit-any
): maybeProcess is { process: Buffer; name: string } => !!maybeProcess.process
const isProcessFilename = (
maybeProcessFilename: any // eslint-disable-line @typescript-eslint/no-explicit-any
): maybeProcessFilename is { processFilename: string } =>
!!maybeProcessFilename.processFilename
const isDecision = (
maybeDecision: any // eslint-disable-line @typescript-eslint/no-explicit-any
): maybeDecision is { decision: Buffer; name: string } =>
!!maybeDecision.decision
const isDecisionFilename = (
maybeDecisionFilename: any // eslint-disable-line @typescript-eslint/no-explicit-any
): maybeDecisionFilename is { decisionFilename: string } =>
!!maybeDecisionFilename.decisionFilename
// default fall-through
/* const isForm = ( maybeForm: any ): maybeForm is { form: Buffer; name: string } =>
!!maybeForm.form
*/
const isFormFilename = (
maybeFormFilename: any // eslint-disable-line @typescript-eslint/no-explicit-any
): maybeFormFilename is { formFilename: string } =>
!!maybeFormFilename.formFilename

export function getResourceContentAndName(resource: Resource) {
if (isProcessFilename(resource)) {
const filename = resource.processFilename
const process = readFileSync(filename)
return { content: process, name: filename }
} else if (isProcess(resource)) {
return { content: resource.process, name: resource.name }
} else if (isDecisionFilename(resource)) {
const filename = resource.decisionFilename
const decision = readFileSync(filename)
return { content: decision, name: filename }
} else if (isDecision(resource)) {
return { content: resource.decision, name: resource.name }
} else if (isFormFilename(resource)) {
const filename = resource.formFilename
const form = readFileSync(filename)
return { content: form, name: filename }
} /* if (isForm(resource)) */ else {
// default fall-through
return { content: resource.form, name: resource.name }
}
}
178 changes: 64 additions & 114 deletions src/zeebe/zb/ZeebeGrpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { StatefulLogInterceptor } from '../lib/StatefulLogInterceptor'
import { TypedEmitter } from '../lib/TypedEmitter'
import { ZBJsonLogger } from '../lib/ZBJsonLogger'
import { ZBStreamWorker } from '../lib/ZBStreamWorker'
import { Resource, getResourceContentAndName } from '../lib/deployResource'
import * as ZB from '../lib/interfaces-1.0'
import { ZBWorkerTaskHandler } from '../lib/interfaces-1.0'
import * as Grpc from '../lib/interfaces-grpc-1.0'
Expand Down Expand Up @@ -656,8 +657,7 @@ export class ZeebeGrpcClient extends TypedEmitter<

/**
*
* @description Deploys one or more resources (e.g. processes or decision models) to Zeebe.
* Note that this is an atomic call, i.e. either all resources are deployed, or none of them are.
* @description Deploys a single resources (e.g. process or decision model) to Zeebe.
*
* Errors:
* PERMISSION_DENIED:
Expand Down Expand Up @@ -696,13 +696,7 @@ export class ZeebeGrpcClient extends TypedEmitter<
| { name: string; form: Buffer; tenantId?: string }
): Promise<Grpc.DeployResourceResponse<Grpc.FormDeployment>>
async deployResource(
resource:
| { name: string; process: Buffer; tenantId?: string }
| { processFilename: string; tenantId?: string }
| { name: string; decision: Buffer; tenantId?: string }
| { decisionFilename: string; tenantId?: string }
| { name: string; form: Buffer; tenantId?: string }
| { formFilename: string; tenantId?: string }
resource: Resource
): Promise<
Grpc.DeployResourceResponse<
| Grpc.ProcessDeployment
Expand All @@ -711,111 +705,67 @@ export class ZeebeGrpcClient extends TypedEmitter<
| Grpc.FormDeployment
>
> {
const isProcess = (
maybeProcess: any // eslint-disable-line @typescript-eslint/no-explicit-any
): maybeProcess is { process: Buffer; name: string } =>
!!maybeProcess.process
const isProcessFilename = (
maybeProcessFilename: any // eslint-disable-line @typescript-eslint/no-explicit-any
): maybeProcessFilename is { processFilename: string } =>
!!maybeProcessFilename.processFilename
const isDecision = (
maybeDecision: any // eslint-disable-line @typescript-eslint/no-explicit-any
): maybeDecision is { decision: Buffer; name: string } =>
!!maybeDecision.decision
const isDecisionFilename = (
maybeDecisionFilename: any // eslint-disable-line @typescript-eslint/no-explicit-any
): maybeDecisionFilename is { decisionFilename: string } =>
!!maybeDecisionFilename.decisionFilename
// default fall-through
/* const isForm = ( maybeForm: any ): maybeForm is { form: Buffer; name: string } =>
!!maybeForm.form
*/
const isFormFilename = (
maybeFormFilename: any // eslint-disable-line @typescript-eslint/no-explicit-any
): maybeFormFilename is { formFilename: string } =>
!!maybeFormFilename.formFilename

if (isProcessFilename(resource)) {
const filename = resource.processFilename
const process = readFileSync(filename)
return this.executeOperation('deployResource', async () =>
(await this.grpc).deployResourceSync({
resources: [
{
name: filename,
content: process,
},
],
tenantId: resource.tenantId ?? this.tenantId,
})
)
} else if (isProcess(resource)) {
return this.executeOperation('deployResource', async () =>
(await this.grpc).deployResourceSync({
resources: [
{
name: resource.name,
content: resource.process,
},
],
tenantId: resource.tenantId ?? this.tenantId,
})
)
} else if (isDecisionFilename(resource)) {
const filename = resource.decisionFilename
const decision = readFileSync(filename)
return this.executeOperation('deployResource', async () =>
(await this.grpc).deployResourceSync({
resources: [
{
name: filename,
content: decision,
},
],
tenantId: resource.tenantId ?? this.tenantId,
})
)
} else if (isDecision(resource)) {
return this.executeOperation('deployResource', async () =>
(await this.grpc).deployResourceSync({
resources: [
{
name: resource.name,
content: resource.decision,
},
],
tenantId: resource.tenantId ?? this.tenantId,
})
)
} else if (isFormFilename(resource)) {
const filename = resource.formFilename
const form = readFileSync(filename)
return this.executeOperation('deployResource', async () =>
(await this.grpc).deployResourceSync({
resources: [
{
name: filename,
content: form,
},
],
tenantId: resource.tenantId ?? this.tenantId,
})
)
} /* if (isForm(resource)) */ else {
// default fall-through
return this.executeOperation('deployResource', async () =>
(await this.grpc).deployResourceSync({
resources: [
{
name: resource.name,
content: resource.form,
},
],
tenantId: resource.tenantId ?? this.tenantId,
})
)
}
const { content, name } = getResourceContentAndName(resource)

return this.executeOperation('deployResource', async () =>
(await this.grpc).deployResourceSync({
resources: [
{
name,
content,
},
],
tenantId: resource.tenantId ?? this.tenantId,
})
)
}

/**
*
* @description Deploys one or more resources (e.g. processes or decision models) to Zeebe.
* Note that this is an atomic call, i.e. either all resources are deployed, or none of them are.
*
* Errors:
* PERMISSION_DENIED:
* - if a deployment to an unauthorized tenant is performed
* INVALID_ARGUMENT:
* - no resources given.
* - if at least one resource is invalid. A resource is considered invalid if:
* - the content is not deserializable (e.g. detected as BPMN, but it's broken XML)
* - the content is invalid (e.g. an event-based gateway has an outgoing sequence flow to a task)
* - if multi-tenancy is enabled, and:
* - a tenant id is not provided
* - a tenant id with an invalid format is provided
* - if multi-tenancy is disabled and a tenant id is provided
* @example
* ```
* const zbc = new ZeebeGrpcClient()
*
* const result = await zbc.deployResources([
* {
* processFilename: './src/__tests__/testdata/Client-DeployWorkflow.bpmn',
* },
* {
* decisionFilename: './src/__tests__/testdata/quarantine-duration.dmn',
* },
* {
* form: fs.readFileSync('./src/__tests__/testdata/form_1.form'),
* name: 'form_1.form',
* },
* ])
* ```
*/
public async deployResources(resources: Resource[], tenantId?: string) {
const resourcesToDeploy = resources.map((r) => {
const { content, name } = getResourceContentAndName(r)
return { name, content }
})
return this.executeOperation('deployResources', async () =>
(await this.grpc).deployResourceSync({
resources: resourcesToDeploy,
tenantId: tenantId ?? this.tenantId,
})
)
}

/**
Expand Down

0 comments on commit 695a26a

Please sign in to comment.