Skip to content

Commit

Permalink
feat: remove reduce node and process implement context
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Jan 16, 2025
1 parent 2ed522a commit b3218b5
Show file tree
Hide file tree
Showing 45 changed files with 190 additions and 557 deletions.
4 changes: 2 additions & 2 deletions cmd/pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,8 @@ func (v *processDebugView) Interface() map[string]any {
value["pid"] = p.ID()
}
for _, key := range v.process.Keys() {
val := v.process.Load(key)
value[key] = fmt.Sprint(val)
val := v.process.Value(key)
value[fmt.Sprint(key)] = fmt.Sprint(val)
}
value["status"] = v.process.Status()
return value
Expand Down
4 changes: 2 additions & 2 deletions deployments/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ WORKDIR /app
COPY ./ .

RUN make init
RUN make build CGO_ENABLED=0
RUN make build

FROM alpine:latest
FROM ubuntu:latest

WORKDIR /root/

Expand Down
20 changes: 10 additions & 10 deletions examples/system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@

- kind: if
name: specs_read_or_watch
when: '!has(self.header.Connection) || !has(self.header.Upgrade)'
when: '!has(header.Connection) || !has(header.Upgrade)'
ports:
out[0]:
- name: specs_read_with_query
Expand All @@ -124,7 +124,7 @@
specs:
- kind: snippet
language: cel
code: 'has(self.body) ? self.body : null'
code: body
- kind: syscall
opcode: specs.create
- kind: snippet
Expand Down Expand Up @@ -165,7 +165,7 @@
specs:
- kind: snippet
language: cel
code: 'has(self.body) ? self.body : null'
code: body
- kind: syscall
opcode: specs.update
- kind: snippet
Expand Down Expand Up @@ -205,7 +205,7 @@
specs:
- kind: snippet
language: cel
code: 'has(self.params) ? self.params : null'
code: params
- kind: syscall
opcode: specs.read
- kind: snippet
Expand Down Expand Up @@ -256,7 +256,7 @@
specs:
- kind: snippet
language: cel
code: 'has(self.params) ? self.params : null'
code: params
- kind: syscall
opcode: specs.delete
- kind: snippet
Expand Down Expand Up @@ -313,7 +313,7 @@

- kind: if
name: values_read_or_watch
when: '!has(self.header.Connection) || !has(self.header.Upgrade)'
when: '!has(header.Connection) || !has(header.Upgrade)'
ports:
out[0]:
- name: values_read_with_query
Expand All @@ -327,7 +327,7 @@
specs:
- kind: snippet
language: cel
code: 'has(self.body) ? self.body : null'
code: body
- kind: syscall
opcode: values.create
- kind: snippet
Expand Down Expand Up @@ -368,7 +368,7 @@
specs:
- kind: snippet
language: cel
code: 'has(self.body) ? self.body : null'
code: body
- kind: syscall
opcode: values.update
- kind: snippet
Expand Down Expand Up @@ -408,7 +408,7 @@
specs:
- kind: snippet
language: cel
code: 'has(self.params) ? self.params : null'
code: params
- kind: syscall
opcode: values.read
- kind: snippet
Expand Down Expand Up @@ -514,7 +514,7 @@
- kind: switch
name: catch
matches:
- when: self == "unsupported type" || self == "unsupported value"
- when: 'self == "unsupported type" || self == "unsupported value"'
port: out[0]
- when: 'true'
port: out[1]
Expand Down
1 change: 0 additions & 1 deletion ext/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ Precisely manage data flow.
- **[Merge Node](./docs/merge_node.md)**: Combines multiple input packets into one.
- **[NOP Node](./docs/nop_node.md)**: Responds to input packets with an empty packet, without any processing.
- **[Pipe Node](./docs/pipe_node.md)**: Processes input packets and distributes results to multiple output ports, allowing for reusable data flows.
- **[Reduce Node](./docs/reduce_node.md)**: Repeatedly processes input data to produce a single output value, useful for data aggregation.
- **[Retry Node](./docs/retry_node.md)**: Retries packet processing a specified number of times upon encountering errors.
- **[Session Node](./docs/session_node.md)**: Stores and manages process information, maintaining session continuity.
- **[Sleep Node](./docs/sleep_node.md)**: Introduces a specified delay in processing to pace workflows or await external
Expand Down
1 change: 0 additions & 1 deletion ext/README_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
- **[Merge 노드](./docs/merge_node_kr.md)**: 여러 입력 패킷을 하나로 통합합니다.
- **[NOP 노드](./docs/nop_node_kr.md)**: 입력 패킷을 처리하지 않고 빈 패킷으로 응답합니다.
- **[Pipe 노드](./docs/pipe_node_kr.md)**: 입력 패킷을 처리하고 결과를 여러 출력 포트로 전달하여 데이터 흐름을 재사용합니다.
- **[Reduce 노드](./docs/reduce_node_kr.md)**: 입력 데이터를 반복적으로 연산하여 하나의 출력 값을 생성합니다. 데이터 집계에 유용합니다.
- **[Retry 노드](./docs/retry_node_kr.md)**: 오류가 발생하면 지정된 횟수만큼 패킷 처리를 재시도합니다.
- **[Session 노드](./docs/session_node_kr.md)**: 프로세스 정보를 저장하고 관리하여 세션을 유지합니다.
- **[Sleep 노드](./docs/sleep_node_kr.md)**: 지정된 지연 시간을 추가하여 워크플로우를 조정하거나 외부 조건을 기다립니다.
Expand Down
2 changes: 1 addition & 1 deletion ext/docs/if_node.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

```yaml
- kind: if
when: "self.count > 10"
when: "count > 10"
ports:
out[0]:
- name: true_path
Expand Down
2 changes: 1 addition & 1 deletion ext/docs/if_node_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

```yaml
- kind: if
when: "self.count > 10"
when: "count > 10"
ports:
out[0]:
- name: true_path
Expand Down
26 changes: 0 additions & 26 deletions ext/docs/reduce_node.md

This file was deleted.

26 changes: 0 additions & 26 deletions ext/docs/reduce_node_kr.md

This file was deleted.

4 changes: 2 additions & 2 deletions ext/docs/signal_node.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ processing real-time events or system-level signals within the workflow.

## Specification

- **opcode**: A string identifying the system operation to be listened. It is associated with the specified function and
- **topic**: A string identifying the system operation to be listened. It is associated with the specified function and
determines the node's behavior.

## Ports
Expand All @@ -16,7 +16,7 @@ processing real-time events or system-level signals within the workflow.

```yaml
- kind: signal
opcode: specs
topic: specs
ports:
out:
- name: next
Expand Down
4 changes: 2 additions & 2 deletions ext/docs/signal_node_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

## 명세

- **opcode**: 청취할 시스템 작업을 식별하는 문자열입니다. 지정된 함수와 연관되며 노드의 동작을 결정합니다.
- **topic**: 청취할 시스템 작업을 식별하는 문자열입니다. 지정된 함수와 연관되며 노드의 동작을 결정합니다.
-

## 포트
Expand All @@ -15,7 +15,7 @@

```yaml
- kind: signal
opcode: specs
topic: specs
ports:
out:
- name: next
Expand Down
2 changes: 1 addition & 1 deletion ext/docs/syscall_node.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
```yaml
- kind: snippet
language: cel
code: 'has(self.body) ? self.body : null'
code: 'has(body) ? body : null'
ports:
out:
- name: specs_create
Expand Down
2 changes: 1 addition & 1 deletion ext/docs/syscall_node_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
```yaml
- kind: snippet
language: cel
code: 'has(self.body) ? self.body : null'
code: 'has(body) ? body : null'
ports:
out:
- name: specs_create
Expand Down
1 change: 0 additions & 1 deletion ext/pkg/control/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func AddToScheme(module *language.Module, lang string) scheme.Register {
{KindMerge, NewMergeNodeCodec(), &MergeNodeSpec{}},
{KindNOP, NewNOPNodeCodec(), &NOPNodeSpec{}},
{KindPipe, NewPipeNodeCodec(), &PipeNodeSpec{}},
{KindReduce, NewReduceNodeCodec(expr), &ReduceNodeSpec{}},
{KindRetry, NewRetryNodeCodec(), &RetryNodeSpec{}},
{KindSession, NewSessionNodeCodec(), &SessionNodeSpec{}},
{KindSleep, NewSleepNodeCodec(), &SleepNodeSpec{}},
Expand Down
2 changes: 1 addition & 1 deletion ext/pkg/control/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestAddToScheme(t *testing.T) {
err := AddToScheme(m, text.Language).AddToScheme(s)
assert.NoError(t, err)

tests := []string{KindBlock, KindCache, KindFor, KindFork, KindIf, KindFor, KindMerge, KindNOP, KindPipe, KindReduce, KindRetry, KindStep, KindSession, KindSleep, KindSnippet, KindSplit, KindSwitch, KindTry}
tests := []string{KindBlock, KindCache, KindFor, KindFork, KindIf, KindFor, KindMerge, KindNOP, KindPipe, KindRetry, KindStep, KindSession, KindSleep, KindSnippet, KindSplit, KindSwitch, KindTry}

for _, tt := range tests {
t.Run(tt, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion ext/pkg/control/if.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (n *IfNode) action(proc *process.Process, inPck *packet.Packet) ([]*packet.
inPayload := inPck.Payload()
input := types.InterfaceOf(inPayload)

ok, err := n.condition(proc.Context(), input)
ok, err := n.condition(proc, input)
if err != nil {
return nil, packet.New(types.NewError(err))
}
Expand Down
Loading

0 comments on commit b3218b5

Please sign in to comment.