-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathworkflow.ts
50 lines (41 loc) · 1.26 KB
/
workflow.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import type { WorkflowArgs } from '@datashaper/schema'
import type { TableContainer } from '@datashaper/tables'
import type { Subscription } from 'rxjs'
import { BaseNode } from '../dataflow/index.js'
import { Workflow } from '../resources/Workflow/Workflow.js'
class WorkflowNode extends BaseNode<TableContainer, WorkflowArgs> {
private _workflow: Workflow | null = null
private _outputSub: Subscription | null = null
constructor(id: string) {
super()
this.id = id
this.config$.subscribe((config) => {
// Clean up existing workflow
if (this._outputSub != null) {
this._outputSub.unsubscribe()
}
if (this._workflow != null) {
this._workflow.dispose()
}
// Check that the new configuration is valid
if (config == null) {
this._workflow = null
return
}
// Set up the new workflow
this._workflow = new Workflow(config?.workflow)
// set the default input
this._workflow.input$ = this.inputValue$()
// Listen to the workflow outputs
this._outputSub = this._workflow.output$.subscribe((output) => {
this.emit(output)
})
})
}
protected doRecalculate(): void | Promise<void> {
// no need, the workflow will handle this
}
}
export function workflow(id: string): WorkflowNode {
return new WorkflowNode(id)
}