From 7d761013d10fec81df9239942b8810ba9a9a46f5 Mon Sep 17 00:00:00 2001 From: Nov1c444 <857526207@qq.com> Date: Fri, 18 Oct 2024 16:57:24 +0800 Subject: [PATCH 01/13] feat: Iteration node support parallel mode --- .../advanced_chat/generate_task_pipeline.py | 3 +- .../apps/workflow/generate_task_pipeline.py | 3 +- api/core/app/apps/workflow_app_runner.py | 33 ++ api/core/app/entities/queue_entities.py | 32 ++ .../task_pipeline/workflow_cycle_manage.py | 29 +- api/core/workflow/entities/node_entities.py | 1 + .../workflow/graph_engine/entities/event.py | 5 + .../workflow/graph_engine/graph_engine.py | 11 + api/core/workflow/nodes/iteration/entities.py | 17 + .../nodes/iteration/iteration_node.py | 337 +++++++++++---- .../unit_tests/configs/test_dify_config.py | 4 +- .../nodes/iteration/test_iteration.py | 393 ++++++++++++++++++ .../workflow/hooks/use-nodes-interactions.ts | 5 + .../workflow/hooks/use-workflow-run.ts | 9 +- .../workflow/nodes/_base/components/field.tsx | 6 +- .../components/workflow/nodes/_base/node.tsx | 18 + .../workflow/nodes/iteration/default.ts | 7 +- .../workflow/nodes/iteration/node.tsx | 15 +- .../workflow/nodes/iteration/panel.tsx | 53 ++- .../workflow/nodes/iteration/types.ts | 6 + .../workflow/nodes/iteration/use-config.ts | 25 +- web/app/components/workflow/run/index.tsx | 77 +++- .../workflow/run/iteration-result-panel.tsx | 20 +- web/app/components/workflow/run/node.tsx | 12 +- web/app/components/workflow/store.ts | 4 + web/app/components/workflow/types.ts | 6 +- web/i18n/en-US/workflow.ts | 17 + web/i18n/zh-Hans/workflow.ts | 17 + web/types/workflow.ts | 1 + 29 files changed, 1023 insertions(+), 143 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index fd63c7787fa631..3c0d8fc78d3f02 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -19,6 +19,7 @@ QueueIterationStartEvent, QueueMessageReplaceEvent, QueueNodeFailedEvent, + QueueNodeInIterationFailedEvent, QueueNodeStartedEvent, QueueNodeSucceededEvent, QueueParallelBranchRunFailedEvent, @@ -306,7 +307,7 @@ def _process_stream_response( if response: yield response - elif isinstance(event, QueueNodeFailedEvent): + elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent): workflow_node_execution = self._handle_workflow_node_execution_failed(event) response = self._workflow_node_finish_to_stream_response( diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 7c53556e43bc48..8dbcb1d74c0f83 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -17,6 +17,7 @@ QueueIterationNextEvent, QueueIterationStartEvent, QueueNodeFailedEvent, + QueueNodeInIterationFailedEvent, QueueNodeStartedEvent, QueueNodeSucceededEvent, QueueParallelBranchRunFailedEvent, @@ -276,7 +277,7 @@ def _process_stream_response( if response: yield response - elif isinstance(event, QueueNodeFailedEvent): + elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent): workflow_node_execution = self._handle_workflow_node_execution_failed(event) response = self._workflow_node_finish_to_stream_response( diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index ce266116a7dfaa..7135e284d4a4c4 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -9,6 +9,7 @@ QueueIterationNextEvent, QueueIterationStartEvent, QueueNodeFailedEvent, + QueueNodeInIterationFailedEvent, QueueNodeStartedEvent, QueueNodeSucceededEvent, QueueParallelBranchRunFailedEvent, @@ -31,6 +32,7 @@ IterationRunNextEvent, IterationRunStartedEvent, IterationRunSucceededEvent, + NodeInIterationFailedEvent, NodeRunFailedEvent, NodeRunRetrieverResourceEvent, NodeRunStartedEvent, @@ -248,9 +250,40 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) error=event.route_node_state.node_run_result.error if event.route_node_state.node_run_result and event.route_node_state.node_run_result.error else "Unknown error", + execution_metadata=event.route_node_state.node_run_result.metadata + if event.route_node_state.node_run_result + else {}, in_iteration_id=event.in_iteration_id, ) ) + elif isinstance(event, NodeInIterationFailedEvent): + self._publish_event( + QueueNodeInIterationFailedEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_type=event.node_type, + node_data=event.node_data, + parallel_id=event.parallel_id, + parallel_start_node_id=event.parallel_start_node_id, + parent_parallel_id=event.parent_parallel_id, + parent_parallel_start_node_id=event.parent_parallel_start_node_id, + start_at=event.route_node_state.start_at, + inputs=event.route_node_state.node_run_result.inputs + if event.route_node_state.node_run_result + else {}, + process_data=event.route_node_state.node_run_result.process_data + if event.route_node_state.node_run_result + else {}, + outputs=event.route_node_state.node_run_result.outputs + if event.route_node_state.node_run_result + else {}, + execution_metadata=event.route_node_state.node_run_result.metadata + if event.route_node_state.node_run_result + else {}, + in_iteration_id=event.in_iteration_id, + error=event.error, + ) + ) elif isinstance(event, NodeRunStreamChunkEvent): self._publish_event( QueueTextChunkEvent( diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 4577e28535f023..b3453848d985b6 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -305,6 +305,37 @@ class QueueNodeSucceededEvent(AppQueueEvent): error: Optional[str] = None +class QueueNodeInIterationFailedEvent(AppQueueEvent): + """ + QueueNodeInIterationFailedEvent entity + """ + + event: QueueEvent = QueueEvent.NODE_FAILED + + node_execution_id: str + node_id: str + node_type: NodeType + node_data: BaseNodeData + parallel_id: Optional[str] = None + """parallel id if node is in parallel""" + parallel_start_node_id: Optional[str] = None + """parallel start node id if node is in parallel""" + parent_parallel_id: Optional[str] = None + """parent parallel id if node is in parallel""" + parent_parallel_start_node_id: Optional[str] = None + """parent parallel start node id if node is in parallel""" + in_iteration_id: Optional[str] = None + """iteration id if node is in iteration""" + start_at: datetime + + inputs: Optional[dict[str, Any]] = None + process_data: Optional[dict[str, Any]] = None + outputs: Optional[dict[str, Any]] = None + execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None + + error: str + + class QueueNodeFailedEvent(AppQueueEvent): """ QueueNodeFailedEvent entity @@ -331,6 +362,7 @@ class QueueNodeFailedEvent(AppQueueEvent): inputs: Optional[dict[str, Any]] = None process_data: Optional[dict[str, Any]] = None outputs: Optional[dict[str, Any]] = None + execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None error: str diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index b8f5ac260340e5..b08d323369bca5 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -9,6 +9,7 @@ QueueIterationNextEvent, QueueIterationStartEvent, QueueNodeFailedEvent, + QueueNodeInIterationFailedEvent, QueueNodeStartedEvent, QueueNodeSucceededEvent, QueueParallelBranchRunFailedEvent, @@ -299,7 +300,9 @@ def _handle_workflow_node_execution_success(self, event: QueueNodeSucceededEvent return workflow_node_execution - def _handle_workflow_node_execution_failed(self, event: QueueNodeFailedEvent) -> WorkflowNodeExecution: + def _handle_workflow_node_execution_failed( + self, event: QueueNodeFailedEvent | QueueNodeInIterationFailedEvent + ) -> WorkflowNodeExecution: """ Workflow node execution failed :param event: queue node failed event @@ -311,17 +314,21 @@ def _handle_workflow_node_execution_failed(self, event: QueueNodeFailedEvent) -> outputs = WorkflowEntry.handle_special_values(event.outputs) finished_at = datetime.now(timezone.utc).replace(tzinfo=None) elapsed_time = (finished_at - event.start_at).total_seconds() - + execution_metadata = ( + json.dumps(jsonable_encoder(event.execution_metadata)) if event.execution_metadata else None + ) + update_data = { + WorkflowNodeExecution.status: WorkflowNodeExecutionStatus.FAILED.value, + WorkflowNodeExecution.error: event.error, + WorkflowNodeExecution.inputs: json.dumps(inputs) if inputs else None, + WorkflowNodeExecution.process_data: json.dumps(event.process_data) if event.process_data else None, + WorkflowNodeExecution.outputs: json.dumps(outputs) if outputs else None, + WorkflowNodeExecution.finished_at: finished_at, + WorkflowNodeExecution.elapsed_time: elapsed_time, + } + update_data[WorkflowNodeExecution.execution_metadata] = execution_metadata db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.id == workflow_node_execution.id).update( - { - WorkflowNodeExecution.status: WorkflowNodeExecutionStatus.FAILED.value, - WorkflowNodeExecution.error: event.error, - WorkflowNodeExecution.inputs: json.dumps(inputs) if inputs else None, - WorkflowNodeExecution.process_data: json.dumps(event.process_data) if event.process_data else None, - WorkflowNodeExecution.outputs: json.dumps(outputs) if outputs else None, - WorkflowNodeExecution.finished_at: finished_at, - WorkflowNodeExecution.elapsed_time: elapsed_time, - } + update_data ) db.session.commit() diff --git a/api/core/workflow/entities/node_entities.py b/api/core/workflow/entities/node_entities.py index 5353b99ed38b14..cc6dbeaa426789 100644 --- a/api/core/workflow/entities/node_entities.py +++ b/api/core/workflow/entities/node_entities.py @@ -61,6 +61,7 @@ class NodeRunMetadataKey(Enum): PARALLEL_START_NODE_ID = "parallel_start_node_id" PARENT_PARALLEL_ID = "parent_parallel_id" PARENT_PARALLEL_START_NODE_ID = "parent_parallel_start_node_id" + PARALLEL_MODE_RUN_ID = "parallel_mode_run_id" class NodeRunResult(BaseModel): diff --git a/api/core/workflow/graph_engine/entities/event.py b/api/core/workflow/graph_engine/entities/event.py index 06dc4cb8f4393c..0ed15ea9b26074 100644 --- a/api/core/workflow/graph_engine/entities/event.py +++ b/api/core/workflow/graph_engine/entities/event.py @@ -81,6 +81,10 @@ class NodeRunFailedEvent(BaseNodeEvent): error: str = Field(..., description="error") +class NodeInIterationFailedEvent(BaseNodeEvent): + error: str = Field(..., description="error") + + ########################################### # Parallel Branch Events ########################################### @@ -129,6 +133,7 @@ class BaseIterationEvent(GraphEngineEvent): """parent parallel id if node is in parallel""" parent_parallel_start_node_id: Optional[str] = None """parent parallel start node id if node is in parallel""" + parallel_mode_run_id: Optional[str] = None class IterationRunStartedEvent(BaseIterationEvent): diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 8342dbd13d2a54..32ff1620bb69fd 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -4,6 +4,7 @@ import uuid from collections.abc import Generator, Mapping from concurrent.futures import ThreadPoolExecutor, wait +from copy import copy, deepcopy from typing import Any, Optional from flask import Flask, current_app @@ -729,6 +730,16 @@ def _is_timed_out(self, start_at: float, max_execution_time: int) -> bool: """ return time.perf_counter() - start_at > max_execution_time + def create_copy(self): + """ + create a graph engine copy + :return: with a new variable pool instance of graph engine + """ + new_instance = copy(self) + new_instance.graph_runtime_state = copy(self.graph_runtime_state) + new_instance.graph_runtime_state.variable_pool = deepcopy(self.graph_runtime_state.variable_pool) + return new_instance + class GraphRunFailedError(Exception): def __init__(self, error: str): diff --git a/api/core/workflow/nodes/iteration/entities.py b/api/core/workflow/nodes/iteration/entities.py index 3c2c189159cbef..f50c9e7f0e0940 100644 --- a/api/core/workflow/nodes/iteration/entities.py +++ b/api/core/workflow/nodes/iteration/entities.py @@ -1,8 +1,22 @@ +from enum import Enum from typing import Any, Optional from core.workflow.entities.base_node_data_entities import BaseIterationNodeData, BaseIterationState, BaseNodeData +class ErrorHandleMode(Enum): + TERMINATED = "Terminated" + CONTINUE_ON_ERROR = "Continue on error" + REMOVE_ABNORMAL_OUTPUT = "Remove abnormal output" + + def to_json(self): + return self.value + + @classmethod + def from_json(cls, value): + return cls(value) + + class IterationNodeData(BaseIterationNodeData): """ Iteration Node Data. @@ -11,6 +25,9 @@ class IterationNodeData(BaseIterationNodeData): parent_loop_id: Optional[str] = None # redundant field, not used currently iterator_selector: list[str] # variable selector output_selector: list[str] # output selector + is_parallel: bool = False # open the parallel mode or not + parallel_nums: int = 10 # the numbers of parallel + error_handle_mode: ErrorHandleMode = ErrorHandleMode.TERMINATED # how to handle the error class IterationStartNodeData(BaseNodeData): diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index 01bb4e9076e5a7..5216d382021a9d 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -1,11 +1,21 @@ import logging +import uuid from collections.abc import Generator, Mapping, Sequence +from concurrent.futures import wait from datetime import datetime, timezone -from typing import Any, cast +from queue import Empty, Queue +from typing import Any, Optional, cast + +from flask import Flask, current_app from configs import dify_config from core.model_runtime.utils.encoders import jsonable_encoder -from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult, NodeType +from core.workflow.entities.node_entities import ( + NodeRunMetadataKey, + NodeRunResult, + NodeType, +) +from core.workflow.entities.variable_pool import VariablePool from core.workflow.graph_engine.entities.event import ( BaseGraphEvent, BaseNodeEvent, @@ -16,13 +26,15 @@ IterationRunNextEvent, IterationRunStartedEvent, IterationRunSucceededEvent, + NodeInIterationFailedEvent, + NodeRunFailedEvent, NodeRunStreamChunkEvent, NodeRunSucceededEvent, ) from core.workflow.graph_engine.entities.graph import Graph from core.workflow.nodes.base_node import BaseNode from core.workflow.nodes.event import RunCompletedEvent, RunEvent -from core.workflow.nodes.iteration.entities import IterationNodeData +from core.workflow.nodes.iteration.entities import ErrorHandleMode, IterationNodeData from models.workflow import WorkflowNodeExecutionStatus logger = logging.getLogger(__name__) @@ -36,6 +48,17 @@ class IterationNode(BaseNode): _node_data_cls = IterationNodeData _node_type = NodeType.ITERATION + @classmethod + def get_default_config(cls, filters: Optional[dict] = None) -> dict: + return { + "type": "iteration", + "config": { + "is_parallel": False, + "parallel_nums": 10, + "error_handle_mode": ErrorHandleMode.TERMINATED.value, + }, + } + def _run(self) -> Generator[RunEvent | InNodeEvent, None, None]: """ Run the node. @@ -73,7 +96,7 @@ def _run(self) -> Generator[RunEvent | InNodeEvent, None, None]: variable_pool.add([self.node_id, "item"], iterator_list_value[0]) # init graph engine - from core.workflow.graph_engine.graph_engine import GraphEngine + from core.workflow.graph_engine.graph_engine import GraphEngine, GraphEngineThreadPool graph_engine = GraphEngine( tenant_id=self.tenant_id, @@ -113,93 +136,56 @@ def _run(self) -> Generator[RunEvent | InNodeEvent, None, None]: index=0, pre_iteration_output=None, ) - outputs: list[Any] = [] try: - for _ in range(len(iterator_list_value)): - # run workflow - rst = graph_engine.run() - for event in rst: - if isinstance(event, (BaseNodeEvent | BaseParallelBranchEvent)) and not event.in_iteration_id: - event.in_iteration_id = self.node_id - - if ( - isinstance(event, BaseNodeEvent) - and event.node_type == NodeType.ITERATION_START - and not isinstance(event, NodeRunStreamChunkEvent) - ): - continue - - if isinstance(event, NodeRunSucceededEvent): - if event.route_node_state.node_run_result: - metadata = event.route_node_state.node_run_result.metadata - if not metadata: - metadata = {} - - if NodeRunMetadataKey.ITERATION_ID not in metadata: - metadata[NodeRunMetadataKey.ITERATION_ID] = self.node_id - metadata[NodeRunMetadataKey.ITERATION_INDEX] = variable_pool.get_any( - [self.node_id, "index"] - ) - event.route_node_state.node_run_result.metadata = metadata - - yield event - elif isinstance(event, BaseGraphEvent): - if isinstance(event, GraphRunFailedEvent): - # iteration run failed - yield IterationRunFailedEvent( - iteration_id=self.id, - iteration_node_id=self.node_id, - iteration_node_type=self.node_type, - iteration_node_data=self.node_data, - start_at=start_at, - inputs=inputs, - outputs={"output": jsonable_encoder(outputs)}, - steps=len(iterator_list_value), - metadata={"total_tokens": graph_engine.graph_runtime_state.total_tokens}, - error=event.error, - ) - - yield RunCompletedEvent( - run_result=NodeRunResult( - status=WorkflowNodeExecutionStatus.FAILED, - error=event.error, - ) - ) - return - else: - event = cast(InNodeEvent, event) + if self.node_data.is_parallel: + futures = [] + q = Queue() + thread_pool = GraphEngineThreadPool(max_workers=self.node_data.parallel_nums, max_submit_count=100) + for index, item in enumerate(iterator_list_value): + future = thread_pool.submit( + self._run_single_iter_parallel, + current_app._get_current_object(), + q, + iterator_list_value, + inputs, + outputs, + start_at, + graph_engine, + iteration_graph, + index, + item, + ) + future.add_done_callback(thread_pool.task_done_callback) + futures.append(future) + succeeded_count = 0 + while True: + try: + event = q.get(timeout=1) + if event is None: + break + if isinstance(event, IterationRunNextEvent): + succeeded_count += 1 + if succeeded_count == len(futures): + q.put(None) yield event + except Empty: + logger.warning(msg="Parallel Iteration event queue empty") + continue - # append to iteration output variable list - current_iteration_output = variable_pool.get_any(self.node_data.output_selector) - outputs.append(current_iteration_output) - - # remove all nodes outputs from variable pool - for node_id in iteration_graph.node_ids: - variable_pool.remove_node(node_id) - - # move to next iteration - current_index = variable_pool.get([self.node_id, "index"]) - if current_index is None: - raise ValueError(f"iteration {self.node_id} current index not found") - - next_index = int(current_index.to_object()) + 1 - variable_pool.add([self.node_id, "index"], next_index) - - if next_index < len(iterator_list_value): - variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) - - yield IterationRunNextEvent( - iteration_id=self.id, - iteration_node_id=self.node_id, - iteration_node_type=self.node_type, - iteration_node_data=self.node_data, - index=next_index, - pre_iteration_output=jsonable_encoder(current_iteration_output) - if current_iteration_output - else None, - ) + # wait all threads + wait(futures) + else: + for _ in range(len(iterator_list_value)): + yield from self._run_single_iter( + iterator_list_value, + variable_pool, + inputs, + outputs, + start_at, + graph_engine, + iteration_graph, + ) yield IterationRunSucceededEvent( iteration_id=self.id, @@ -304,3 +290,180 @@ def _extract_variable_selector_to_variable_mapping( } return variable_mapping + + def _run_single_iter( + self, + iterator_list_value: list[str], + variable_pool: VariablePool, + inputs: dict[str, list], + outputs: list, + start_at: datetime, + graph_engine, + iteration_graph, + parallel_mode_run_id: Optional[str] = None, + ): + rst = graph_engine.run() + for event in rst: + if isinstance(event, (BaseNodeEvent | BaseParallelBranchEvent)) and not event.in_iteration_id: + event.in_iteration_id = self.node_id + + if ( + isinstance(event, BaseNodeEvent) + and event.node_type == NodeType.ITERATION_START + and not isinstance(event, NodeRunStreamChunkEvent) + ): + continue + + if isinstance(event, NodeRunSucceededEvent): + self._handle_event_metadata(event, variable_pool, parallel_mode_run_id) + yield event + elif isinstance(event, BaseGraphEvent): + if isinstance(event, GraphRunFailedEvent): + # iteration run failed + if self.node_data.is_parallel: + yield IterationRunFailedEvent( + iteration_id=self.id, + iteration_node_id=self.node_id, + iteration_node_type=self.node_type, + iteration_node_data=self.node_data, + parallel_mode_run_id=parallel_mode_run_id, + start_at=start_at, + inputs=inputs, + outputs={"output": jsonable_encoder(outputs)}, + steps=len(iterator_list_value), + metadata={"total_tokens": graph_engine.graph_runtime_state.total_tokens}, + error=event.error, + ) + else: + yield IterationRunFailedEvent( + iteration_id=self.id, + iteration_node_id=self.node_id, + iteration_node_type=self.node_type, + iteration_node_data=self.node_data, + start_at=start_at, + inputs=inputs, + outputs={"output": jsonable_encoder(outputs)}, + steps=len(iterator_list_value), + metadata={"total_tokens": graph_engine.graph_runtime_state.total_tokens}, + error=event.error, + ) + yield RunCompletedEvent( + run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + error=event.error, + ) + ) + return + else: + event = cast(InNodeEvent, event) + if isinstance(event, NodeRunFailedEvent): + if self.node_data.error_handle_mode == ErrorHandleMode.CONTINUE_ON_ERROR: + metadata_event = self._handle_event_metadata(event, variable_pool, parallel_mode_run_id) + yield NodeInIterationFailedEvent( + **metadata_event.model_dump(), + ) + break + elif self.node_data.error_handle_mode == ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT: + current_index = variable_pool.get([self.node_id, "index"]) + if current_index is None: + raise ValueError(f"iteration {self.node_id} current index not found") + next_index = int(current_index.to_object()) + 1 + metadata_event = self._handle_event_metadata(event, variable_pool, parallel_mode_run_id) + yield NodeInIterationFailedEvent( + **metadata_event.model_dump(), + ) + variable_pool.add([self.node_id, "index"], next_index) + + if next_index < len(iterator_list_value): + variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) + yield IterationRunNextEvent( + iteration_id=self.id, + iteration_node_id=self.node_id, + iteration_node_type=self.node_type, + iteration_node_data=self.node_data, + index=next_index, + parallel_mode_run_id=parallel_mode_run_id, + pre_iteration_output=None, + ) + + return + yield self._handle_event_metadata(event, variable_pool, parallel_mode_run_id) + + current_index = variable_pool.get([self.node_id, "index"]) + # append to iteration output variable list + current_iteration_output = variable_pool.get_any(self.node_data.output_selector) + outputs.insert(current_index.value, current_iteration_output) + # remove all nodes outputs from variable pool + for node_id in iteration_graph.node_ids: + variable_pool.remove_node(node_id) + + # move to next iteration + if current_index is None: + raise ValueError(f"iteration {self.node_id} current index not found") + + next_index = int(current_index.to_object()) + 1 + variable_pool.add([self.node_id, "index"], next_index) + + if next_index < len(iterator_list_value): + variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) + + yield IterationRunNextEvent( + iteration_id=self.id, + iteration_node_id=self.node_id, + iteration_node_type=self.node_type, + iteration_node_data=self.node_data, + index=next_index, + parallel_mode_run_id=parallel_mode_run_id, + pre_iteration_output=jsonable_encoder(current_iteration_output) if current_iteration_output else None, + ) + + def _run_single_iter_parallel( + self, + flask_app: Flask, + q: Queue, + iterator_list_value: list[str], + inputs: dict[str, list], + outputs: list, + start_at: datetime, + graph_engine, + iteration_graph, + index, + item, + ): + with flask_app.app_context(): + parallel_mode_run_id = uuid.uuid4().hex + graph_engine_copy = graph_engine.create_copy() + variable_pool_copy = graph_engine_copy.graph_runtime_state.variable_pool + variable_pool_copy.add([self.node_id, "index"], index) + variable_pool_copy.add([self.node_id, "item"], item) + for event in self._run_single_iter( + iterator_list_value=iterator_list_value, + variable_pool=variable_pool_copy, + inputs=inputs, + outputs=outputs, + start_at=start_at, + graph_engine=graph_engine_copy, + iteration_graph=iteration_graph, + parallel_mode_run_id=parallel_mode_run_id, + ): + q.put(event) + + def _handle_event_metadata(self, event: BaseNodeEvent, variable_pool: VariablePool, parallel_mode_run_id: str): + """ + Handle success event. + """ + if not isinstance(event, BaseNodeEvent): + return event + if event.route_node_state.node_run_result: + metadata = event.route_node_state.node_run_result.metadata + if not metadata: + metadata = {} + + if NodeRunMetadataKey.ITERATION_ID not in metadata: + metadata[NodeRunMetadataKey.ITERATION_ID] = self.node_id + if self.node_data.is_parallel: + metadata[NodeRunMetadataKey.PARALLEL_MODE_RUN_ID] = parallel_mode_run_id + else: + metadata[NodeRunMetadataKey.ITERATION_INDEX] = variable_pool.get_any([self.node_id, "index"]) + event.route_node_state.node_run_result.metadata = metadata + return event diff --git a/api/tests/unit_tests/configs/test_dify_config.py b/api/tests/unit_tests/configs/test_dify_config.py index 3f639ccacc48f5..fdce22d31559cc 100644 --- a/api/tests/unit_tests/configs/test_dify_config.py +++ b/api/tests/unit_tests/configs/test_dify_config.py @@ -50,10 +50,10 @@ def test_dify_config(example_env_file): assert config.SENTRY_TRACES_SAMPLE_RATE == 1.0 # annotated field with default value - assert config.HTTP_REQUEST_MAX_READ_TIMEOUT == 60 + assert config.HTTP_REQUEST_MAX_READ_TIMEOUT == 600 # annotated field with configured value - assert config.HTTP_REQUEST_MAX_WRITE_TIMEOUT == 30 + assert config.HTTP_REQUEST_MAX_WRITE_TIMEOUT == 600 # NOTE: If there is a `.env` file in your Workspace, this test might not succeed as expected. diff --git a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py index b3a89061b29e21..bb0cf6b771fa9b 100644 --- a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py +++ b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py @@ -10,6 +10,7 @@ from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes.event import RunCompletedEvent +from core.workflow.nodes.iteration.entities import ErrorHandleMode from core.workflow.nodes.iteration.iteration_node import IterationNode from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode from models.workflow import WorkflowNodeExecutionStatus, WorkflowType @@ -418,3 +419,395 @@ def tt_generator(self): assert item.run_result.outputs == {"output": ["dify 123", "dify 123"]} assert count == 32 + + +def test_iteration_run_in_parallel_mode(): + graph_config = { + "edges": [ + { + "id": "start-source-pe-target", + "source": "start", + "target": "pe", + }, + { + "id": "iteration-1-source-answer-3-target", + "source": "iteration-1", + "target": "answer-3", + }, + { + "id": "iteration-start-source-tt-target", + "source": "iteration-start", + "target": "tt", + }, + { + "id": "iteration-start-source-tt-2-target", + "source": "iteration-start", + "target": "tt-2", + }, + { + "id": "tt-source-if-else-target", + "source": "tt", + "target": "if-else", + }, + { + "id": "tt-2-source-if-else-target", + "source": "tt-2", + "target": "if-else", + }, + { + "id": "if-else-true-answer-2-target", + "source": "if-else", + "sourceHandle": "true", + "target": "answer-2", + }, + { + "id": "if-else-false-answer-4-target", + "source": "if-else", + "sourceHandle": "false", + "target": "answer-4", + }, + { + "id": "pe-source-iteration-1-target", + "source": "pe", + "target": "iteration-1", + }, + ], + "nodes": [ + {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, + { + "data": { + "iterator_selector": ["pe", "list_output"], + "output_selector": ["tt", "output"], + "output_type": "array[string]", + "startNodeType": "template-transform", + "start_node_id": "iteration-start", + "title": "iteration", + "type": "iteration", + }, + "id": "iteration-1", + }, + { + "data": { + "answer": "{{#tt.output#}}", + "iteration_id": "iteration-1", + "title": "answer 2", + "type": "answer", + }, + "id": "answer-2", + }, + { + "data": { + "iteration_id": "iteration-1", + "title": "iteration-start", + "type": "iteration-start", + }, + "id": "iteration-start", + }, + { + "data": { + "iteration_id": "iteration-1", + "template": "{{ arg1 }} 123", + "title": "template transform", + "type": "template-transform", + "variables": [{"value_selector": ["sys", "query"], "variable": "arg1"}], + }, + "id": "tt", + }, + { + "data": { + "iteration_id": "iteration-1", + "template": "{{ arg1 }} 321", + "title": "template transform", + "type": "template-transform", + "variables": [{"value_selector": ["sys", "query"], "variable": "arg1"}], + }, + "id": "tt-2", + }, + { + "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"}, + "id": "answer-3", + }, + { + "data": { + "conditions": [ + { + "comparison_operator": "is", + "id": "1721916275284", + "value": "hi", + "variable_selector": ["sys", "query"], + } + ], + "iteration_id": "iteration-1", + "logical_operator": "and", + "title": "if", + "type": "if-else", + }, + "id": "if-else", + }, + { + "data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer"}, + "id": "answer-4", + }, + { + "data": { + "instruction": "test1", + "model": { + "completion_params": {"temperature": 0.7}, + "mode": "chat", + "name": "gpt-4o", + "provider": "openai", + }, + "parameters": [ + {"description": "test", "name": "list_output", "required": False, "type": "array[string]"} + ], + "query": ["sys", "query"], + "reasoning_mode": "prompt", + "title": "pe", + "type": "parameter-extractor", + }, + "id": "pe", + }, + ], + } + + graph = Graph.init(graph_config=graph_config) + + init_params = GraphInitParams( + tenant_id="1", + app_id="1", + workflow_type=WorkflowType.CHAT, + workflow_id="1", + graph_config=graph_config, + user_id="1", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=0, + ) + + # construct variable pool + pool = VariablePool( + system_variables={ + SystemVariableKey.QUERY: "dify", + SystemVariableKey.FILES: [], + SystemVariableKey.CONVERSATION_ID: "abababa", + SystemVariableKey.USER_ID: "1", + }, + user_inputs={}, + environment_variables=[], + ) + pool.add(["pe", "list_output"], ["dify-1", "dify-2"]) + + parallel_iteration_node = IterationNode( + id=str(uuid.uuid4()), + graph_init_params=init_params, + graph=graph, + graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()), + config={ + "data": { + "iterator_selector": ["pe", "list_output"], + "output_selector": ["tt", "output"], + "output_type": "array[string]", + "startNodeType": "template-transform", + "start_node_id": "iteration-start", + "title": "迭代", + "type": "iteration", + "is_parallel": True, + }, + "id": "iteration-1", + }, + ) + sequential_iteration_node = IterationNode( + id=str(uuid.uuid4()), + graph_init_params=init_params, + graph=graph, + graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()), + config={ + "data": { + "iterator_selector": ["pe", "list_output"], + "output_selector": ["tt", "output"], + "output_type": "array[string]", + "startNodeType": "template-transform", + "start_node_id": "iteration-start", + "title": "迭代", + "type": "iteration", + "is_parallel": True, + }, + "id": "iteration-1", + }, + ) + + def tt_generator(self): + return NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + inputs={"iterator_selector": "dify"}, + outputs={"output": "dify 123"}, + ) + + with patch.object(TemplateTransformNode, "_run", new=tt_generator): + # execute node + parallel_result = parallel_iteration_node._run() + sequential_result = sequential_iteration_node._run() + assert parallel_iteration_node.node_data.parallel_nums == 10 + assert parallel_iteration_node.node_data.error_handle_mode == ErrorHandleMode.TERMINATED + count = 0 + + for item in parallel_result: + count += 1 + if isinstance(item, RunCompletedEvent): + assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert item.run_result.outputs == {"output": ["dify 123", "dify 123"]} + assert count == 32 + + for item in sequential_result: + count += 1 + if isinstance(item, RunCompletedEvent): + assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert item.run_result.outputs == {"output": ["dify 123", "dify 123"]} + assert count == 64 + + +def test_iteration_run_error_handle(): + graph_config = { + "nodes": [ + { + "data": {"desc": "", "selected": False, "title": "开始", "type": "start", "variables": []}, + "id": "1727165736057", + }, + { + "data": { + "answer": "{{#1728357129226.output#}}\n", + "desc": "", + "selected": False, + "title": "直接回复", + "type": "answer", + "variables": [], + }, + "id": "1727166508076", + }, + { + "data": { + "code": '\ndef main(text:str) -> dict:\n return {\n "result": text.split(","),\n }\n', + "code_language": "python3", + "desc": "", + "outputs": {"result": {"children": None, "type": "array[string]"}}, + "selected": False, + "title": "Code", + "type": "code", + "variables": [{"value_selector": ["sys", "query"], "variable": "text"}], + }, + "id": "1727168677820", + }, + { + "data": { + "desc": "", + "error_handle_mode": "Continue on error", + "height": 226, + "is_parallel": True, + "iterator_selector": ["1727168677820", "result"], + "output_selector": ["1728702070951", "result"], + "output_type": "array[string]", + "parallel_nums": 6, + "selected": True, + "start_node_id": "1728357129226start", + "title": "迭代 2", + "type": "iteration", + "width": 589, + }, + "id": "1728357129226", + }, + { + "data": {"desc": "", "isInIteration": True, "selected": False, "title": "", "type": "iteration-start"}, + "id": "1728357129226start", + }, + { + "data": { + "type": "code", + "title": "Code 3", + "desc": "", + "variables": [{"variable": "arg1", "value_selector": ["1728357129226", "item"]}], + "code_language": "python3", + "code": '\ndef main(arg1: str) -> dict:\n return {\n "result": arg1.split(":")[1]\n }\n', # noqa: E501 + "outputs": {"result": {"type": "string", "children": None}}, + "selected": False, + "isInIteration": True, + "iteration_id": "1728357129226", + }, + "id": "1728702070951", + }, + ], + "edges": [ + {"id": "1727165736057-source-1727168677820-target", "target": "1727168677820", "source": "1727165736057"}, + {"id": "1727168677820-source-1728357129226-target", "target": "1728357129226", "source": "1727168677820"}, + {"id": "1728357129226-source-1727166508076-target", "target": "1727166508076", "source": "1728357129226"}, + { + "id": "1728357129226start-source-1728702070951-target", + "target": "1728702070951", + "source": "1728357129226start", + }, + ], + } + + graph = Graph.init(graph_config=graph_config) + + init_params = GraphInitParams( + tenant_id="1", + app_id="1", + workflow_type=WorkflowType.CHAT, + workflow_id="1", + graph_config=graph_config, + user_id="1", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=0, + ) + + # construct variable pool + pool = VariablePool( + system_variables={}, + user_inputs={}, + environment_variables=[], + ) + pool.add(["1727168677820", "result"], ["test", "test:hello"]) + + iteration_node = IterationNode( + id=str(uuid.uuid4()), + graph_init_params=init_params, + graph=graph, + graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()), + config={ + "data": { + "error_handle_mode": ErrorHandleMode.CONTINUE_ON_ERROR, + "is_parallel": True, + "iterator_selector": ["1727168677820", "result"], + "output_selector": ["1728702070951", "result"], + "output_type": "array[string]", + "parallel_nums": 6, + "start_node_id": "1728357129226start", + "title": "迭代 2", + "type": "iteration", + }, + "id": "1728357129226", + }, + ) + # print("") + + # execute continue on error node + result = iteration_node._run() + count = 0 + for item in result: + count += 1 + if isinstance(item, RunCompletedEvent): + assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert item.run_result.outputs == {"output": [None, "hello"]} + + assert count == 10 + # execute remove abnormal output + iteration_node.node_data.error_handle_mode = ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT + result = iteration_node._run() + count = 0 + for item in result: + count += 1 + if isinstance(item, RunCompletedEvent): + assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert item.run_result.outputs == {"output": ["hello"]} + assert count == 10 diff --git a/web/app/components/workflow/hooks/use-nodes-interactions.ts b/web/app/components/workflow/hooks/use-nodes-interactions.ts index af2a1500baac0b..7e53592f2c7c16 100644 --- a/web/app/components/workflow/hooks/use-nodes-interactions.ts +++ b/web/app/components/workflow/hooks/use-nodes-interactions.ts @@ -644,6 +644,11 @@ export const useNodesInteractions = () => { newNode.data.isInIteration = true newNode.data.iteration_id = prevNode.parentId newNode.zIndex = ITERATION_CHILDREN_Z_INDEX + if (newNode.data.type === BlockEnum.Answer) { + const parentIterNodeIndex = nodes.findIndex(node => node.id === prevNode.parentId) + if (nodes[parentIterNodeIndex].data._isFirstTime) + nodes[parentIterNodeIndex].data._isShowTips = true + } } const newEdge: Edge = { diff --git a/web/app/components/workflow/hooks/use-workflow-run.ts b/web/app/components/workflow/hooks/use-workflow-run.ts index 68c3ff0a4b458d..71e795f1001d01 100644 --- a/web/app/components/workflow/hooks/use-workflow-run.ts +++ b/web/app/components/workflow/hooks/use-workflow-run.ts @@ -427,6 +427,8 @@ export const useWorkflowRun = () => { const { workflowRunningData, setWorkflowRunningData, + runTimes, + setRunTimes, } = workflowStore.getState() const { data } = params @@ -446,8 +448,8 @@ export const useWorkflowRun = () => { const nodes = getNodes() const newNodes = produce(nodes, (draft) => { const currentNode = draft.find(node => node.id === data.node_id)! - - currentNode.data._iterationIndex = data.index > 0 ? data.index : 1 + currentNode.data._iterationIndex = runTimes + setRunTimes(runTimes + 1) }) setNodes(newNodes) @@ -460,6 +462,7 @@ export const useWorkflowRun = () => { const { workflowRunningData, setWorkflowRunningData, + setRunTimes, } = workflowStore.getState() const { getNodes, @@ -476,7 +479,7 @@ export const useWorkflowRun = () => { }) } })) - + setRunTimes(1) const newNodes = produce(nodes, (draft) => { const currentNode = draft.find(node => node.id === data.node_id)! diff --git a/web/app/components/workflow/nodes/_base/components/field.tsx b/web/app/components/workflow/nodes/_base/components/field.tsx index 334bce2fb8ec18..5c43f2218e11ef 100644 --- a/web/app/components/workflow/nodes/_base/components/field.tsx +++ b/web/app/components/workflow/nodes/_base/components/field.tsx @@ -12,14 +12,14 @@ import Tooltip from '@/app/components/base/tooltip' type Props = { className?: string title: JSX.Element | string | DefaultTFuncReturn - tooltip?: string + tooltip?: React.ReactNode supportFold?: boolean children?: JSX.Element | string | null operations?: JSX.Element inline?: boolean } -const Filed: FC = ({ +const Field: FC = ({ className, title, tooltip, @@ -58,4 +58,4 @@ const Filed: FC = ({ ) } -export default React.memo(Filed) +export default React.memo(Field) diff --git a/web/app/components/workflow/nodes/_base/node.tsx b/web/app/components/workflow/nodes/_base/node.tsx index bd5921c7356442..c24b69732fe3e8 100644 --- a/web/app/components/workflow/nodes/_base/node.tsx +++ b/web/app/components/workflow/nodes/_base/node.tsx @@ -25,6 +25,7 @@ import { useToolIcon, } from '../../hooks' import { useNodeIterationInteractions } from '../iteration/use-interactions' +import type { IterationNodeType } from '../iteration/types' import { NodeSourceHandle, NodeTargetHandle, @@ -34,6 +35,7 @@ import NodeControl from './components/node-control' import AddVariablePopupWithPosition from './components/add-variable-popup-with-position' import cn from '@/utils/classnames' import BlockIcon from '@/app/components/workflow/block-icon' +import Tooltip from '@/app/components/base/tooltip' type BaseNodeProps = { children: ReactElement @@ -169,6 +171,22 @@ const BaseNode: FC = ({ className='grow mr-1 system-sm-semibold-uppercase text-text-primary truncate' > {data.title} + { + data.type === BlockEnum.Iteration && (data as IterationNodeType).is_parallel && ( + +
+ {t('workflow.nodes.iteration.parallelModeEnableTitle')} +
+ {t('workflow.nodes.iteration.parallelModeEnableDesc')} + } + > +
+ {t('workflow.nodes.iteration.parallelModeUpper')} +
+
+ ) + } { data._iterationLength && data._iterationIndex && data._runningStatus === NodeRunningStatus.Running && ( diff --git a/web/app/components/workflow/nodes/iteration/default.ts b/web/app/components/workflow/nodes/iteration/default.ts index 3afa52d06ec62a..82845e204903a8 100644 --- a/web/app/components/workflow/nodes/iteration/default.ts +++ b/web/app/components/workflow/nodes/iteration/default.ts @@ -1,4 +1,4 @@ -import { BlockEnum } from '../../types' +import { BlockEnum, ErrorHandleMode } from '../../types' import type { NodeDefault } from '../../types' import type { IterationNodeType } from './types' import { ALL_CHAT_AVAILABLE_BLOCKS, ALL_COMPLETION_AVAILABLE_BLOCKS } from '@/app/components/workflow/constants' @@ -10,6 +10,11 @@ const nodeDefault: NodeDefault = { iterator_selector: [], output_selector: [], _children: [], + _isFirstTime: true, + _isShowTips: false, + is_parallel: false, + parallel_nums: 10, + error_handle_mode: ErrorHandleMode.Terminated, }, getAvailablePrevNodes(isChatMode: boolean) { const nodes = isChatMode diff --git a/web/app/components/workflow/nodes/iteration/node.tsx b/web/app/components/workflow/nodes/iteration/node.tsx index 48a005a261df61..05b2fe15b30369 100644 --- a/web/app/components/workflow/nodes/iteration/node.tsx +++ b/web/app/components/workflow/nodes/iteration/node.tsx @@ -8,12 +8,16 @@ import { useNodesInitialized, useViewport, } from 'reactflow' +import { useTranslation } from 'react-i18next' import { IterationStartNodeDumb } from '../iteration-start' import { useNodeIterationInteractions } from './use-interactions' import type { IterationNodeType } from './types' import AddBlock from './add-block' import cn from '@/utils/classnames' import type { NodeProps } from '@/app/components/workflow/types' +import Toast from '@/app/components/base/toast' + +const i18nPrefix = 'workflow.nodes.iteration' const Node: FC> = ({ id, @@ -22,11 +26,20 @@ const Node: FC> = ({ const { zoom } = useViewport() const nodesInitialized = useNodesInitialized() const { handleNodeIterationRerender } = useNodeIterationInteractions() + const { t } = useTranslation() useEffect(() => { if (nodesInitialized) handleNodeIterationRerender(id) - }, [nodesInitialized, id, handleNodeIterationRerender]) + if (data.is_parallel && data._isShowTips && data._isFirstTime) { + Toast.notify({ + type: 'warning', + message: t(`${i18nPrefix}.answerNodeWarningDesc`), + duration: 5000, + }) + data._isFirstTime = false + } + }, [nodesInitialized, id, handleNodeIterationRerender, data, t]) return (
> = ({ data, }) => { const { t } = useTranslation() - + const responseMethod = [ + { + value: ErrorHandleMode.Terminated, + name: t(`${i18nPrefix}.ErrorMethod.operationTerminated`), + }, + { + value: ErrorHandleMode.ContinueOnError, + name: t(`${i18nPrefix}.ErrorMethod.continueOnError`), + }, + { + value: ErrorHandleMode.RemoveAbnormalOutput, + name: t(`${i18nPrefix}.ErrorMethod.removeAbnormalOutput`), + }, + ] const { readOnly, inputs, @@ -47,6 +65,9 @@ const Panel: FC> = ({ setIterator, iteratorInputKey, iterationRunResult, + changeParallel, + changeErrorResponseMode, + changeParallelNums, } = useConfig(id, data) return ( @@ -87,6 +108,34 @@ const Panel: FC> = ({ />
+
+ {t(`${i18nPrefix}.parallelPanelDesc`)}
} inline> + + + +
+ {t(`${i18nPrefix}.MaxParallelismDesc`)}
}> +
+ { changeParallelNums(Number(e.target.value)) }} /> + +
+ + + + +
+ + + +
+ {isShowSingleRun && ( { @@ -183,6 +184,25 @@ const useConfig = (id: string, payload: IterationNodeType) => { }) }, [iteratorInputKey, runInputData, setRunInputData]) + const changeParallel = useCallback((value: boolean) => { + const newInputs = produce(inputs, (draft) => { + draft.is_parallel = value + }) + setInputs(newInputs) + }, [inputs, setInputs]) + + const changeErrorResponseMode = useCallback((item: Item) => { + const newInputs = produce(inputs, (draft) => { + draft.error_handle_mode = item.value as ErrorHandleMode + }) + setInputs(newInputs) + }, [inputs, setInputs]) + const changeParallelNums = useCallback((num: number) => { + const newInputs = produce(inputs, (draft) => { + draft.parallel_nums = num + }) + setInputs(newInputs) + }, [inputs, setInputs]) return { readOnly, inputs, @@ -209,6 +229,9 @@ const useConfig = (id: string, payload: IterationNodeType) => { setIterator, iteratorInputKey, iterationRunResult, + changeParallel, + changeErrorResponseMode, + changeParallelNums, } } diff --git a/web/app/components/workflow/run/index.tsx b/web/app/components/workflow/run/index.tsx index 331ef1c2f50dc6..9ec09ec33d7b5a 100644 --- a/web/app/components/workflow/run/index.tsx +++ b/web/app/components/workflow/run/index.tsx @@ -61,36 +61,67 @@ const RunPanel: FC = ({ hideResult, activeTab = 'RESULT', runID, getRe }, [notify, getResultCallback]) const formatNodeList = useCallback((list: NodeTracing[]) => { - const allItems = list.reverse() + const allItems = [...list].reverse() const result: NodeTracing[] = [] - allItems.forEach((item) => { - const { node_type, execution_metadata } = item - if (node_type !== BlockEnum.Iteration) { - const isInIteration = !!execution_metadata?.iteration_id - - if (isInIteration) { - const iterationNode = result.find(node => node.node_id === execution_metadata?.iteration_id) - const iterationDetails = iterationNode?.details - const currentIterationIndex = execution_metadata?.iteration_index ?? 0 - - if (Array.isArray(iterationDetails)) { - if (iterationDetails.length === 0 || !iterationDetails[currentIterationIndex]) - iterationDetails[currentIterationIndex] = [item] - else - iterationDetails[currentIterationIndex].push(item) - } - return - } - // not in iteration - result.push(item) + const groupMap = new Map() - return - } + const processIterationNode = (item: NodeTracing) => { result.push({ ...item, details: [], }) + } + const updateParallelModeGroup = (runId: string, item: NodeTracing, iterationNode: NodeTracing) => { + if (!groupMap.has(runId)) + groupMap.set(runId, [item]) + else + groupMap.get(runId)!.push(item) + if (item.status === 'failed') { + iterationNode.status = 'failed' + iterationNode.error = item.error + } + + iterationNode.details = Array.from(groupMap.values()) + } + const updateSequentialModeGroup = (index: number, item: NodeTracing, iterationNode: NodeTracing) => { + const { details } = iterationNode + if (details) { + if (!details[index]) + details[index] = [item] + else + details[index].push(item) + } + + if (item.status === 'failed') { + iterationNode.status = 'failed' + iterationNode.error = item.error + } + } + const processNonIterationNode = (item: NodeTracing) => { + const { execution_metadata } = item + if (!execution_metadata?.iteration_id) { + result.push(item) + return + } + + const iterationNode = result.find(node => node.node_id === execution_metadata.iteration_id) + if (!iterationNode || !Array.isArray(iterationNode.details)) + return + + const { parallel_mode_run_id, iteration_index = 0 } = execution_metadata + + if (parallel_mode_run_id) + updateParallelModeGroup(parallel_mode_run_id, item, iterationNode) + else + updateSequentialModeGroup(iteration_index, item, iterationNode) + } + + allItems.forEach((item) => { + item.node_type === BlockEnum.Iteration + ? processIterationNode(item) + : processNonIterationNode(item) }) + return result }, []) diff --git a/web/app/components/workflow/run/iteration-result-panel.tsx b/web/app/components/workflow/run/iteration-result-panel.tsx index 4fc30f03df4ce6..7d2939c7d9bcfe 100644 --- a/web/app/components/workflow/run/iteration-result-panel.tsx +++ b/web/app/components/workflow/run/iteration-result-panel.tsx @@ -5,6 +5,7 @@ import { useTranslation } from 'react-i18next' import { RiArrowRightSLine, RiCloseLine, + RiErrorWarningLine, } from '@remixicon/react' import { ArrowNarrowLeft } from '../../base/icons/src/vender/line/arrows' import TracingPanel from './tracing-panel' @@ -27,7 +28,7 @@ const IterationResultPanel: FC = ({ noWrap, }) => { const { t } = useTranslation() - const [expandedIterations, setExpandedIterations] = useState>([]) + const [expandedIterations, setExpandedIterations] = useState>({}) const toggleIteration = useCallback((index: number) => { setExpandedIterations(prev => ({ @@ -71,10 +72,19 @@ const IterationResultPanel: FC = ({ {t(`${i18nPrefix}.iteration`)} {index + 1} - + { + iteration.some(item => item.status === 'failed') + ? ( + + ) + : (< RiArrowRightSLine className={ + cn( + 'w-4 h-4 text-text-tertiary transition-transform duration-200 flex-shrink-0', + expandedIterations[index] && 'transform rotate-90', + )} /> + ) + } + {expandedIterations[index] &&
= ({ return iteration_length } + const getErrorCount = (details: NodeTracing[][] | undefined) => { + if (!details || details.length === 0) + return 0 + return details.flat().filter(item => item.status === 'failed').length + } useEffect(() => { setCollapseState(!nodeInfo.expand) }, [nodeInfo.expand, setCollapseState]) @@ -134,7 +139,12 @@ const NodePanel: FC = ({ onClick={handleOnShowIterationDetail} > -
{t('workflow.nodes.iteration.iteration', { count: getCount(nodeInfo.details?.length, nodeInfo.metadata?.iterator_length) })}
+
{t('workflow.nodes.iteration.iteration', { count: getCount(nodeInfo.details?.length, nodeInfo.metadata?.iterator_length) })}{getErrorCount(nodeInfo.details) > 0 && ( + <> + {t('workflow.nodes.iteration.comma')} + {t('workflow.nodes.iteration.error', { count: getErrorCount(nodeInfo.details) })} + + )}
{justShowIterationNavArrow ? ( diff --git a/web/app/components/workflow/store.ts b/web/app/components/workflow/store.ts index 853d0c5934759d..5b779d6e3298d5 100644 --- a/web/app/components/workflow/store.ts +++ b/web/app/components/workflow/store.ts @@ -164,6 +164,8 @@ type Shape = { setShowImportDSLModal: (showImportDSLModal: boolean) => void showTips: string setShowTips: (showTips: string) => void + runTimes: number + setRunTimes: (runTimes: number) => void } export const createWorkflowStore = () => { @@ -266,6 +268,8 @@ export const createWorkflowStore = () => { setShowImportDSLModal: showImportDSLModal => set(() => ({ showImportDSLModal })), showTips: '', setShowTips: showTips => set(() => ({ showTips })), + runTimes: 1, + setRunTimes: runTimes => set(() => ({ runTimes })), })) } diff --git a/web/app/components/workflow/types.ts b/web/app/components/workflow/types.ts index 797c2dbd855847..20e85fb8d53fe9 100644 --- a/web/app/components/workflow/types.ts +++ b/web/app/components/workflow/types.ts @@ -34,7 +34,11 @@ export enum ControlMode { Pointer = 'pointer', Hand = 'hand', } - +export enum ErrorHandleMode { + Terminated = 'Terminated', + ContinueOnError = 'Continue on error', + RemoveAbnormalOutput = 'Remove abnormal output', +} export type Branch = { id: string name: string diff --git a/web/i18n/en-US/workflow.ts b/web/i18n/en-US/workflow.ts index d5ab6eb72894f3..420e3caa05ee9f 100644 --- a/web/i18n/en-US/workflow.ts +++ b/web/i18n/en-US/workflow.ts @@ -530,6 +530,23 @@ const translation = { iteration_one: '{{count}} Iteration', iteration_other: '{{count}} Iterations', currentIteration: 'Current Iteration', + comma: ', ', + error_one: '{{count}} Error', + error_other: '{{count}} Errors', + parallelMode: 'Parallel Mode', + parallelModeUpper: 'PARALLEL MODE', + parallelModeEnableTitle: 'Parallel Mode Enabled', + parallelModeEnableDesc: 'In parallel mode, tasks within iterations support parallel execution. You can configure this in the properties panel on the right.', + parallelPanelDesc: 'In parallel mode, tasks in the iteration support parallel execution.', + MaxParallelismTitle: 'Maximum parallelism', + MaxParallelismDesc: 'The maximum parallelism is used to control the number of tasks executed simultaneously in a single iteration.', + errorResponseMethod: 'Error response method', + ErrorMethod: { + operationTerminated: 'Terminated', + continueOnError: 'Continue on error', + removeAbnormalOutput: 'Remove abnormal output', + }, + answerNodeWarningDesc: 'Parallel mode warning: Answer nodes, conversation variable assignments, and persistent read/write operations within iterations may cause exceptions.', }, note: { addNote: 'Add Note', diff --git a/web/i18n/zh-Hans/workflow.ts b/web/i18n/zh-Hans/workflow.ts index 4959a87be7db77..d586d3fc9afad2 100644 --- a/web/i18n/zh-Hans/workflow.ts +++ b/web/i18n/zh-Hans/workflow.ts @@ -530,6 +530,23 @@ const translation = { iteration_one: '{{count}}个迭代', iteration_other: '{{count}}个迭代', currentIteration: '当前迭代', + comma: ',', + error_one: '{{count}}个失败', + error_other: '{{count}}个失败', + parallelMode: '并行模式', + parallelModeUpper: '并行模式', + parallelModeEnableTitle: '并行模式启用', + parallelModeEnableDesc: '启用并行模式时迭代内的任务支持并行执行。你可以在右侧的属性面板中进行配置。', + parallelPanelDesc: '在并行模式下,迭代中的任务支持并行执行。', + MaxParallelismTitle: '最大并行度', + MaxParallelismDesc: '最大并行度用于控制单次迭代中同时执行的任务数量。', + errorResponseMethod: '错误响应方法', + ErrorMethod: { + operationTerminated: '错误时终止', + continueOnError: '忽略错误并继续', + removeAbnormalOutput: '移除错误输出', + }, + answerNodeWarningDesc: '并行模式警告:在迭代中,回答节点、会话变量赋值和工具持久读/写操作可能会导致异常。', }, note: { addNote: '添加注释', diff --git a/web/types/workflow.ts b/web/types/workflow.ts index dbf2b3e587b992..0794b8c7472add 100644 --- a/web/types/workflow.ts +++ b/web/types/workflow.ts @@ -30,6 +30,7 @@ export type NodeTracing = { parallel_start_node_id?: string parent_parallel_id?: string parent_parallel_start_node_id?: string + parallel_mode_run_id?: string } metadata: { iterator_length: number From d33907d7446953160a4f5458b7c2244dc1d66ba2 Mon Sep 17 00:00:00 2001 From: Nov1c444 <857526207@qq.com> Date: Fri, 18 Oct 2024 18:01:18 +0800 Subject: [PATCH 02/13] fix: fix config unit test error --- api/tests/unit_tests/configs/test_dify_config.py | 4 ++-- .../core/workflow/nodes/iteration/test_iteration.py | 7 +------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/api/tests/unit_tests/configs/test_dify_config.py b/api/tests/unit_tests/configs/test_dify_config.py index fdce22d31559cc..3f639ccacc48f5 100644 --- a/api/tests/unit_tests/configs/test_dify_config.py +++ b/api/tests/unit_tests/configs/test_dify_config.py @@ -50,10 +50,10 @@ def test_dify_config(example_env_file): assert config.SENTRY_TRACES_SAMPLE_RATE == 1.0 # annotated field with default value - assert config.HTTP_REQUEST_MAX_READ_TIMEOUT == 600 + assert config.HTTP_REQUEST_MAX_READ_TIMEOUT == 60 # annotated field with configured value - assert config.HTTP_REQUEST_MAX_WRITE_TIMEOUT == 600 + assert config.HTTP_REQUEST_MAX_WRITE_TIMEOUT == 30 # NOTE: If there is a `.env` file in your Workspace, this test might not succeed as expected. diff --git a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py index bb0cf6b771fa9b..9c397f1e6ea89d 100644 --- a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py +++ b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py @@ -185,8 +185,6 @@ def tt_generator(self): outputs={"output": "dify 123"}, ) - # print("") - with patch.object(TemplateTransformNode, "_run", new=tt_generator): # execute node result = iteration_node._run() @@ -404,15 +402,12 @@ def tt_generator(self): outputs={"output": "dify 123"}, ) - # print("") - with patch.object(TemplateTransformNode, "_run", new=tt_generator): # execute node result = iteration_node._run() count = 0 for item in result: - # print(type(item), item) count += 1 if isinstance(item, RunCompletedEvent): assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED @@ -789,12 +784,12 @@ def test_iteration_run_error_handle(): "id": "1728357129226", }, ) - # print("") # execute continue on error node result = iteration_node._run() count = 0 for item in result: + print(type(item)) count += 1 if isinstance(item, RunCompletedEvent): assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED From 377aa0b92c79b10482248f65eda79ec2ac6829e7 Mon Sep 17 00:00:00 2001 From: Nov1c444 <857526207@qq.com> Date: Tue, 22 Oct 2024 09:44:15 +0800 Subject: [PATCH 03/13] fix: graph error can't be raised --- .../nodes/iteration/iteration_node.py | 290 ++++++++++-------- .../nodes/iteration/test_iteration.py | 190 +++++++----- 2 files changed, 278 insertions(+), 202 deletions(-) diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index 5216d382021a9d..065aed01c38a54 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -169,8 +169,10 @@ def _run(self) -> Generator[RunEvent | InNodeEvent, None, None]: if succeeded_count == len(futures): q.put(None) yield event + if isinstance(event, RunCompletedEvent): + q.put(None) + yield event except Empty: - logger.warning(msg="Parallel Iteration event queue empty") continue # wait all threads @@ -291,6 +293,26 @@ def _extract_variable_selector_to_variable_mapping( return variable_mapping + def _handle_event_metadata(self, event: BaseNodeEvent, iter_run_index: str, parallel_mode_run_id: str): + """ + add iteration metadata to event. + """ + if not isinstance(event, BaseNodeEvent): + return event + if event.route_node_state.node_run_result: + metadata = event.route_node_state.node_run_result.metadata + if not metadata: + metadata = {} + + if NodeRunMetadataKey.ITERATION_ID not in metadata: + metadata[NodeRunMetadataKey.ITERATION_ID] = self.node_id + if self.node_data.is_parallel: + metadata[NodeRunMetadataKey.PARALLEL_MODE_RUN_ID] = parallel_mode_run_id + else: + metadata[NodeRunMetadataKey.ITERATION_INDEX] = iter_run_index + event.route_node_state.node_run_result.metadata = metadata + return event + def _run_single_iter( self, iterator_list_value: list[str], @@ -302,120 +324,143 @@ def _run_single_iter( iteration_graph, parallel_mode_run_id: Optional[str] = None, ): - rst = graph_engine.run() - for event in rst: - if isinstance(event, (BaseNodeEvent | BaseParallelBranchEvent)) and not event.in_iteration_id: - event.in_iteration_id = self.node_id - - if ( - isinstance(event, BaseNodeEvent) - and event.node_type == NodeType.ITERATION_START - and not isinstance(event, NodeRunStreamChunkEvent) - ): - continue + """ + run single iteration + """ + try: + rst = graph_engine.run() + # get current iteration index + current_index = variable_pool.get([self.node_id, "index"]).value + next_index = int(current_index) + 1 + + if current_index is None: + raise ValueError(f"iteration {self.node_id} current index not found") + for event in rst: + if isinstance(event, (BaseNodeEvent | BaseParallelBranchEvent)) and not event.in_iteration_id: + event.in_iteration_id = self.node_id + + if ( + isinstance(event, BaseNodeEvent) + and event.node_type == NodeType.ITERATION_START + and not isinstance(event, NodeRunStreamChunkEvent) + ): + continue - if isinstance(event, NodeRunSucceededEvent): - self._handle_event_metadata(event, variable_pool, parallel_mode_run_id) - yield event - elif isinstance(event, BaseGraphEvent): - if isinstance(event, GraphRunFailedEvent): - # iteration run failed - if self.node_data.is_parallel: - yield IterationRunFailedEvent( - iteration_id=self.id, - iteration_node_id=self.node_id, - iteration_node_type=self.node_type, - iteration_node_data=self.node_data, - parallel_mode_run_id=parallel_mode_run_id, - start_at=start_at, - inputs=inputs, - outputs={"output": jsonable_encoder(outputs)}, - steps=len(iterator_list_value), - metadata={"total_tokens": graph_engine.graph_runtime_state.total_tokens}, - error=event.error, - ) - else: - yield IterationRunFailedEvent( - iteration_id=self.id, - iteration_node_id=self.node_id, - iteration_node_type=self.node_type, - iteration_node_data=self.node_data, - start_at=start_at, - inputs=inputs, - outputs={"output": jsonable_encoder(outputs)}, - steps=len(iterator_list_value), - metadata={"total_tokens": graph_engine.graph_runtime_state.total_tokens}, - error=event.error, - ) - yield RunCompletedEvent( - run_result=NodeRunResult( - status=WorkflowNodeExecutionStatus.FAILED, - error=event.error, - ) - ) - return - else: - event = cast(InNodeEvent, event) - if isinstance(event, NodeRunFailedEvent): - if self.node_data.error_handle_mode == ErrorHandleMode.CONTINUE_ON_ERROR: - metadata_event = self._handle_event_metadata(event, variable_pool, parallel_mode_run_id) - yield NodeInIterationFailedEvent( - **metadata_event.model_dump(), - ) - break - elif self.node_data.error_handle_mode == ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT: - current_index = variable_pool.get([self.node_id, "index"]) - if current_index is None: - raise ValueError(f"iteration {self.node_id} current index not found") - next_index = int(current_index.to_object()) + 1 - metadata_event = self._handle_event_metadata(event, variable_pool, parallel_mode_run_id) - yield NodeInIterationFailedEvent( - **metadata_event.model_dump(), + if isinstance(event, NodeRunSucceededEvent): + yield self._handle_event_metadata(event, current_index, parallel_mode_run_id) + elif isinstance(event, BaseGraphEvent): + if isinstance(event, GraphRunFailedEvent): + # iteration run failed + if self.node_data.is_parallel: + yield IterationRunFailedEvent( + iteration_id=self.id, + iteration_node_id=self.node_id, + iteration_node_type=self.node_type, + iteration_node_data=self.node_data, + parallel_mode_run_id=parallel_mode_run_id, + start_at=start_at, + inputs=inputs, + outputs={"output": jsonable_encoder(outputs)}, + steps=len(iterator_list_value), + metadata={"total_tokens": graph_engine.graph_runtime_state.total_tokens}, + error=event.error, + ) + else: + yield IterationRunFailedEvent( + iteration_id=self.id, + iteration_node_id=self.node_id, + iteration_node_type=self.node_type, + iteration_node_data=self.node_data, + start_at=start_at, + inputs=inputs, + outputs={"output": jsonable_encoder(outputs)}, + steps=len(iterator_list_value), + metadata={"total_tokens": graph_engine.graph_runtime_state.total_tokens}, + error=event.error, + ) + yield RunCompletedEvent( + run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + error=event.error, + ) ) - variable_pool.add([self.node_id, "index"], next_index) - - if next_index < len(iterator_list_value): - variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) - yield IterationRunNextEvent( - iteration_id=self.id, - iteration_node_id=self.node_id, - iteration_node_type=self.node_type, - iteration_node_data=self.node_data, - index=next_index, - parallel_mode_run_id=parallel_mode_run_id, - pre_iteration_output=None, - ) - return - yield self._handle_event_metadata(event, variable_pool, parallel_mode_run_id) - - current_index = variable_pool.get([self.node_id, "index"]) - # append to iteration output variable list - current_iteration_output = variable_pool.get_any(self.node_data.output_selector) - outputs.insert(current_index.value, current_iteration_output) - # remove all nodes outputs from variable pool - for node_id in iteration_graph.node_ids: - variable_pool.remove_node(node_id) - - # move to next iteration - if current_index is None: - raise ValueError(f"iteration {self.node_id} current index not found") - - next_index = int(current_index.to_object()) + 1 - variable_pool.add([self.node_id, "index"], next_index) - - if next_index < len(iterator_list_value): - variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) + else: + event = cast(InNodeEvent, event) + metadata_event = self._handle_event_metadata(event, current_index, parallel_mode_run_id) + if ( + isinstance(event, NodeRunFailedEvent) + and self.node_data.error_handle_mode != ErrorHandleMode.TERMINATED + ): + if self.node_data.error_handle_mode == ErrorHandleMode.CONTINUE_ON_ERROR: + yield NodeInIterationFailedEvent( + **metadata_event.model_dump(), + ) + outputs.insert(current_index, None) + variable_pool.add([self.node_id, "index"], next_index) + if next_index < len(iterator_list_value): + variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) + yield IterationRunNextEvent( + iteration_id=self.id, + iteration_node_id=self.node_id, + iteration_node_type=self.node_type, + iteration_node_data=self.node_data, + index=next_index, + parallel_mode_run_id=parallel_mode_run_id, + pre_iteration_output=None, + ) + return + elif self.node_data.error_handle_mode == ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT: + yield NodeInIterationFailedEvent( + **metadata_event.model_dump(), + ) + variable_pool.add([self.node_id, "index"], next_index) + + if next_index < len(iterator_list_value): + variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) + yield IterationRunNextEvent( + iteration_id=self.id, + iteration_node_id=self.node_id, + iteration_node_type=self.node_type, + iteration_node_data=self.node_data, + index=next_index, + parallel_mode_run_id=parallel_mode_run_id, + pre_iteration_output=None, + ) + + return + yield metadata_event + + current_iteration_output = variable_pool.get_any(self.node_data.output_selector) + outputs.insert(current_index, current_iteration_output) + # remove all nodes outputs from variable pool + for node_id in iteration_graph.node_ids: + variable_pool.remove_node(node_id) + + # move to next iteration + variable_pool.add([self.node_id, "index"], next_index) + + if next_index < len(iterator_list_value): + variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) + + yield IterationRunNextEvent( + iteration_id=self.id, + iteration_node_id=self.node_id, + iteration_node_type=self.node_type, + iteration_node_data=self.node_data, + index=next_index, + parallel_mode_run_id=parallel_mode_run_id, + pre_iteration_output=jsonable_encoder(current_iteration_output) if current_iteration_output else None, + ) - yield IterationRunNextEvent( - iteration_id=self.id, - iteration_node_id=self.node_id, - iteration_node_type=self.node_type, - iteration_node_data=self.node_data, - index=next_index, - parallel_mode_run_id=parallel_mode_run_id, - pre_iteration_output=jsonable_encoder(current_iteration_output) if current_iteration_output else None, - ) + except Exception as e: + logger.exception("Iteration run failed") + yield RunCompletedEvent( + run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + error=str(e), + ) + ) def _run_single_iter_parallel( self, @@ -430,6 +475,9 @@ def _run_single_iter_parallel( index, item, ): + """ + run single iteration in parallel mode + """ with flask_app.app_context(): parallel_mode_run_id = uuid.uuid4().hex graph_engine_copy = graph_engine.create_copy() @@ -447,23 +495,3 @@ def _run_single_iter_parallel( parallel_mode_run_id=parallel_mode_run_id, ): q.put(event) - - def _handle_event_metadata(self, event: BaseNodeEvent, variable_pool: VariablePool, parallel_mode_run_id: str): - """ - Handle success event. - """ - if not isinstance(event, BaseNodeEvent): - return event - if event.route_node_state.node_run_result: - metadata = event.route_node_state.node_run_result.metadata - if not metadata: - metadata = {} - - if NodeRunMetadataKey.ITERATION_ID not in metadata: - metadata[NodeRunMetadataKey.ITERATION_ID] = self.node_id - if self.node_data.is_parallel: - metadata[NodeRunMetadataKey.PARALLEL_MODE_RUN_ID] = parallel_mode_run_id - else: - metadata[NodeRunMetadataKey.ITERATION_INDEX] = variable_pool.get_any([self.node_id, "index"]) - event.route_node_state.node_run_result.metadata = metadata - return event diff --git a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py index 9c397f1e6ea89d..eead7e79267e3d 100644 --- a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py +++ b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py @@ -663,81 +663,125 @@ def tt_generator(self): def test_iteration_run_error_handle(): graph_config = { - "nodes": [ + "edges": [ + { + "id": "start-source-pe-target", + "source": "start", + "target": "pe", + }, + { + "id": "iteration-1-source-answer-3-target", + "source": "iteration-1", + "target": "answer-3", + }, + { + "id": "tt-source-if-else-target", + "source": "iteration-start", + "target": "if-else", + }, + { + "id": "if-else-true-answer-2-target", + "source": "if-else", + "sourceHandle": "true", + "target": "tt", + }, + { + "id": "if-else-false-answer-4-target", + "source": "if-else", + "sourceHandle": "false", + "target": "tt2", + }, { - "data": {"desc": "", "selected": False, "title": "开始", "type": "start", "variables": []}, - "id": "1727165736057", + "id": "pe-source-iteration-1-target", + "source": "pe", + "target": "iteration-1", }, + ], + "nodes": [ + {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"}, { "data": { - "answer": "{{#1728357129226.output#}}\n", - "desc": "", - "selected": False, - "title": "直接回复", - "type": "answer", - "variables": [], + "iterator_selector": ["pe", "list_output"], + "output_selector": ["tt2", "output"], + "output_type": "array[string]", + "start_node_id": "if-else", + "title": "iteration", + "type": "iteration", }, - "id": "1727166508076", + "id": "iteration-1", }, { "data": { - "code": '\ndef main(text:str) -> dict:\n return {\n "result": text.split(","),\n }\n', - "code_language": "python3", - "desc": "", - "outputs": {"result": {"children": None, "type": "array[string]"}}, - "selected": False, - "title": "Code", - "type": "code", - "variables": [{"value_selector": ["sys", "query"], "variable": "text"}], + "iteration_id": "iteration-1", + "template": "{{ arg1.split(arg2) }}", + "title": "template transform", + "type": "template-transform", + "variables": [ + {"value_selector": ["iteration-1", "item"], "variable": "arg1"}, + {"value_selector": ["iteration-1", "index"], "variable": "arg2"}, + ], }, - "id": "1727168677820", + "id": "tt", }, { "data": { - "desc": "", - "error_handle_mode": "Continue on error", - "height": 226, - "is_parallel": True, - "iterator_selector": ["1727168677820", "result"], - "output_selector": ["1728702070951", "result"], - "output_type": "array[string]", - "parallel_nums": 6, - "selected": True, - "start_node_id": "1728357129226start", - "title": "迭代 2", - "type": "iteration", - "width": 589, + "iteration_id": "iteration-1", + "template": "{{ arg1 }}", + "title": "template transform", + "type": "template-transform", + "variables": [ + {"value_selector": ["iteration-1", "item"], "variable": "arg1"}, + ], }, - "id": "1728357129226", + "id": "tt2", + }, + { + "data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"}, + "id": "answer-3", }, { - "data": {"desc": "", "isInIteration": True, "selected": False, "title": "", "type": "iteration-start"}, - "id": "1728357129226start", + "data": { + "iteration_id": "iteration-1", + "title": "iteration-start", + "type": "iteration-start", + }, + "id": "iteration-start", }, { "data": { - "type": "code", - "title": "Code 3", - "desc": "", - "variables": [{"variable": "arg1", "value_selector": ["1728357129226", "item"]}], - "code_language": "python3", - "code": '\ndef main(arg1: str) -> dict:\n return {\n "result": arg1.split(":")[1]\n }\n', # noqa: E501 - "outputs": {"result": {"type": "string", "children": None}}, - "selected": False, - "isInIteration": True, - "iteration_id": "1728357129226", + "conditions": [ + { + "comparison_operator": "is", + "id": "1721916275284", + "value": "1", + "variable_selector": ["iteration-1", "item"], + } + ], + "iteration_id": "iteration-1", + "logical_operator": "and", + "title": "if", + "type": "if-else", }, - "id": "1728702070951", + "id": "if-else", }, - ], - "edges": [ - {"id": "1727165736057-source-1727168677820-target", "target": "1727168677820", "source": "1727165736057"}, - {"id": "1727168677820-source-1728357129226-target", "target": "1728357129226", "source": "1727168677820"}, - {"id": "1728357129226-source-1727166508076-target", "target": "1727166508076", "source": "1728357129226"}, { - "id": "1728357129226start-source-1728702070951-target", - "target": "1728702070951", - "source": "1728357129226start", + "data": { + "instruction": "test1", + "model": { + "completion_params": {"temperature": 0.7}, + "mode": "chat", + "name": "gpt-4o", + "provider": "openai", + }, + "parameters": [ + {"description": "test", "name": "list_output", "required": False, "type": "array[string]"} + ], + "query": ["sys", "query"], + "reasoning_mode": "prompt", + "title": "pe", + "type": "parameter-extractor", + }, + "id": "pe", }, ], } @@ -758,12 +802,16 @@ def test_iteration_run_error_handle(): # construct variable pool pool = VariablePool( - system_variables={}, + system_variables={ + SystemVariableKey.QUERY: "dify", + SystemVariableKey.FILES: [], + SystemVariableKey.CONVERSATION_ID: "abababa", + SystemVariableKey.USER_ID: "1", + }, user_inputs={}, environment_variables=[], ) - pool.add(["1727168677820", "result"], ["test", "test:hello"]) - + pool.add(["pe", "list_output"], ["1", "1"]) iteration_node = IterationNode( id=str(uuid.uuid4()), graph_init_params=init_params, @@ -771,31 +819,31 @@ def test_iteration_run_error_handle(): graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()), config={ "data": { - "error_handle_mode": ErrorHandleMode.CONTINUE_ON_ERROR, - "is_parallel": True, - "iterator_selector": ["1727168677820", "result"], - "output_selector": ["1728702070951", "result"], + "iterator_selector": ["pe", "list_output"], + "output_selector": ["tt", "output"], "output_type": "array[string]", - "parallel_nums": 6, - "start_node_id": "1728357129226start", - "title": "迭代 2", + "startNodeType": "template-transform", + "start_node_id": "iteration-start", + "title": "iteration", "type": "iteration", + "is_parallel": True, + "error_handle_mode": ErrorHandleMode.CONTINUE_ON_ERROR, }, - "id": "1728357129226", + "id": "iteration-1", }, ) - # execute continue on error node result = iteration_node._run() + result_arr = [] count = 0 for item in result: - print(type(item)) + result_arr.append(item) count += 1 if isinstance(item, RunCompletedEvent): assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED - assert item.run_result.outputs == {"output": [None, "hello"]} + assert item.run_result.outputs == {"output": [None, None]} - assert count == 10 + assert count == 14 # execute remove abnormal output iteration_node.node_data.error_handle_mode = ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT result = iteration_node._run() @@ -804,5 +852,5 @@ def test_iteration_run_error_handle(): count += 1 if isinstance(item, RunCompletedEvent): assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED - assert item.run_result.outputs == {"output": ["hello"]} - assert count == 10 + assert item.run_result.outputs == {"output": []} + assert count == 14 From f13b9d9c94f3aabb5b2ac08ab1d75589b10b35ec Mon Sep 17 00:00:00 2001 From: Nov1c444 <857526207@qq.com> Date: Tue, 22 Oct 2024 15:11:44 +0800 Subject: [PATCH 04/13] fix: variable pool get any method --- api/core/workflow/nodes/iteration/iteration_node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index 232718ca6bee86..5d5617099944b0 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -441,7 +441,7 @@ def _run_single_iter( return yield metadata_event - current_iteration_output = variable_pool.get_any(self.node_data.output_selector) + current_iteration_output = variable_pool.get(self.node_data.output_selector).value outputs.insert(current_index, current_iteration_output) # remove all nodes outputs from variable pool for node_id in iteration_graph.node_ids: From 5005fb845f3085f3367199c8b8e12d9a53a055bc Mon Sep 17 00:00:00 2001 From: Nov1c444 <857526207@qq.com> Date: Thu, 24 Oct 2024 15:04:53 +0800 Subject: [PATCH 05/13] fix: frontend display of iteration log panel --- .../task_pipeline/workflow_cycle_manage.py | 6 ++- .../nodes/iteration/iteration_node.py | 48 +++++++++++++++---- .../nodes/iteration/test_iteration.py | 5 +- .../workflow/hooks/use-workflow-run.ts | 12 ++--- .../components/workflow/nodes/_base/node.tsx | 4 +- .../workflow/nodes/iteration/panel.tsx | 2 +- .../workflow/panel/debug-and-preview/hooks.ts | 11 ++++- web/app/components/workflow/store.ts | 8 ++-- web/app/components/workflow/utils.ts | 11 +++-- web/types/workflow.ts | 1 + 10 files changed, 77 insertions(+), 31 deletions(-) diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index 1c066efd64830f..9d09cf558c1196 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -332,8 +332,8 @@ def _handle_workflow_node_execution_failed( WorkflowNodeExecution.outputs: json.dumps(outputs) if outputs else None, WorkflowNodeExecution.finished_at: finished_at, WorkflowNodeExecution.elapsed_time: elapsed_time, + WorkflowNodeExecution.execution_metadata: execution_metadata, } - update_data[WorkflowNodeExecution.execution_metadata] = execution_metadata db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.id == workflow_node_execution.id).update( update_data ) @@ -640,7 +640,9 @@ def _workflow_iteration_completed_to_stream_response( created_at=int(time.time()), extras={}, inputs=event.inputs or {}, - status=WorkflowNodeExecutionStatus.SUCCEEDED, + status=WorkflowNodeExecutionStatus.SUCCEEDED + if event.error is None + else WorkflowNodeExecutionStatus.FAILED, error=None, elapsed_time=(datetime.now(timezone.utc).replace(tzinfo=None) - event.start_at).total_seconds(), total_tokens=event.metadata.get("total_tokens", 0) if event.metadata else 0, diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index 5d5617099944b0..cf6e14b565d6de 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -1,7 +1,7 @@ import logging import uuid from collections.abc import Generator, Mapping, Sequence -from concurrent.futures import wait +from concurrent.futures import Future, wait from datetime import datetime, timezone from queue import Empty, Queue from typing import Any, Optional, cast @@ -147,11 +147,11 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]: outputs: list[Any] = [] try: if self.node_data.is_parallel: - futures = [] + futures: list[Future] = [] q = Queue() thread_pool = GraphEngineThreadPool(max_workers=self.node_data.parallel_nums, max_submit_count=100) for index, item in enumerate(iterator_list_value): - future = thread_pool.submit( + future: Future = thread_pool.submit( self._run_single_iter_parallel, current_app._get_current_object(), q, @@ -179,8 +179,16 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]: yield event if isinstance(event, RunCompletedEvent): q.put(None) + for f in futures: + if not f.done(): + f.cancel() + yield event + if isinstance(event, IterationRunFailedEvent): + q.put(None) + yield event except Empty: + logger.warning("iteration parallel queue is empty.") continue # wait all threads @@ -196,7 +204,6 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]: graph_engine, iteration_graph, ) - yield IterationRunSucceededEvent( iteration_id=self.id, iteration_node_id=self.node_id, @@ -398,10 +405,7 @@ def _run_single_iter( else: event = cast(InNodeEvent, event) metadata_event = self._handle_event_metadata(event, current_index, parallel_mode_run_id) - if ( - isinstance(event, NodeRunFailedEvent) - and self.node_data.error_handle_mode != ErrorHandleMode.TERMINATED - ): + if isinstance(event, NodeRunFailedEvent): if self.node_data.error_handle_mode == ErrorHandleMode.CONTINUE_ON_ERROR: yield NodeInIterationFailedEvent( **metadata_event.model_dump(), @@ -437,8 +441,20 @@ def _run_single_iter( parallel_mode_run_id=parallel_mode_run_id, pre_iteration_output=None, ) - return + elif self.node_data.error_handle_mode == ErrorHandleMode.TERMINATED: + yield IterationRunFailedEvent( + iteration_id=self.id, + iteration_node_id=self.node_id, + iteration_node_type=self.node_type, + iteration_node_data=self.node_data, + start_at=start_at, + inputs=inputs, + outputs={"output": None}, + steps=len(iterator_list_value), + metadata={"total_tokens": graph_engine.graph_runtime_state.total_tokens}, + error=event.error, + ) yield metadata_event current_iteration_output = variable_pool.get(self.node_data.output_selector).value @@ -464,7 +480,19 @@ def _run_single_iter( ) except Exception as e: - logger.exception("Iteration run failed") + logger.exception(f"Iteration run failed:{str(e)}") + yield IterationRunFailedEvent( + iteration_id=self.id, + iteration_node_id=self.node_id, + iteration_node_type=self.node_type, + iteration_node_data=self.node_data, + start_at=start_at, + inputs=inputs, + outputs={"output": None}, + steps=len(iterator_list_value), + metadata={"total_tokens": graph_engine.graph_runtime_state.total_tokens}, + error=str(e), + ) yield RunCompletedEvent( run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.FAILED, diff --git a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py index 31471ac3202f38..29bd4d6c6ccab1 100644 --- a/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py +++ b/api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py @@ -646,15 +646,18 @@ def tt_generator(self): assert parallel_iteration_node.node_data.parallel_nums == 10 assert parallel_iteration_node.node_data.error_handle_mode == ErrorHandleMode.TERMINATED count = 0 - + parallel_arr = [] + sequential_arr = [] for item in parallel_result: count += 1 + parallel_arr.append(item) if isinstance(item, RunCompletedEvent): assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert item.run_result.outputs == {"output": ["dify 123", "dify 123"]} assert count == 32 for item in sequential_result: + sequential_arr.append(item) count += 1 if isinstance(item, RunCompletedEvent): assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED diff --git a/web/app/components/workflow/hooks/use-workflow-run.ts b/web/app/components/workflow/hooks/use-workflow-run.ts index 1fa2fb6836243f..c59378660fb144 100644 --- a/web/app/components/workflow/hooks/use-workflow-run.ts +++ b/web/app/components/workflow/hooks/use-workflow-run.ts @@ -431,8 +431,8 @@ export const useWorkflowRun = () => { const { workflowRunningData, setWorkflowRunningData, - runTimes, - setRunTimes, + iterTimes, + setIterTimes, } = workflowStore.getState() const { data } = params @@ -452,8 +452,8 @@ export const useWorkflowRun = () => { const nodes = getNodes() const newNodes = produce(nodes, (draft) => { const currentNode = draft.find(node => node.id === data.node_id)! - currentNode.data._iterationIndex = runTimes - setRunTimes(runTimes + 1) + currentNode.data._iterationIndex = iterTimes + setIterTimes(iterTimes + 1) }) setNodes(newNodes) @@ -466,7 +466,7 @@ export const useWorkflowRun = () => { const { workflowRunningData, setWorkflowRunningData, - setRunTimes, + setIterTimes, } = workflowStore.getState() const { getNodes, @@ -483,7 +483,7 @@ export const useWorkflowRun = () => { }) } })) - setRunTimes(1) + setIterTimes(1) const newNodes = produce(nodes, (draft) => { const currentNode = draft.find(node => node.id === data.node_id)! diff --git a/web/app/components/workflow/nodes/_base/node.tsx b/web/app/components/workflow/nodes/_base/node.tsx index c24b69732fe3e8..1dbf79da553907 100644 --- a/web/app/components/workflow/nodes/_base/node.tsx +++ b/web/app/components/workflow/nodes/_base/node.tsx @@ -168,7 +168,7 @@ const BaseNode: FC = ({ />
{data.title} { @@ -181,7 +181,7 @@ const BaseNode: FC = ({ {t('workflow.nodes.iteration.parallelModeEnableDesc')}
} > -
+
{t('workflow.nodes.iteration.parallelModeUpper')}
diff --git a/web/app/components/workflow/nodes/iteration/panel.tsx b/web/app/components/workflow/nodes/iteration/panel.tsx index a50fddbc4b3933..5aa2949fd0f167 100644 --- a/web/app/components/workflow/nodes/iteration/panel.tsx +++ b/web/app/components/workflow/nodes/iteration/panel.tsx @@ -131,7 +131,7 @@ const Panel: FC> = ({
-
diff --git a/web/app/components/workflow/panel/debug-and-preview/hooks.ts b/web/app/components/workflow/panel/debug-and-preview/hooks.ts index 58a4561e2c6017..aa5eebc7783ab3 100644 --- a/web/app/components/workflow/panel/debug-and-preview/hooks.ts +++ b/web/app/components/workflow/panel/debug-and-preview/hooks.ts @@ -9,6 +9,7 @@ import { produce, setAutoFreeze } from 'immer' import { uniqBy } from 'lodash-es' import { useWorkflowRun } from '../../hooks' import { NodeRunningStatus, WorkflowRunningStatus } from '../../types' +import { useWorkflowStore } from '../../store' import type { ChatItem, Inputs, @@ -43,6 +44,7 @@ export const useChat = ( const { notify } = useToastContext() const { handleRun } = useWorkflowRun() const hasStopResponded = useRef(false) + const workflowStore = useWorkflowStore() const conversationId = useRef('') const taskIdRef = useRef('') const [chatList, setChatList] = useState(prevChatList || []) @@ -52,6 +54,9 @@ export const useChat = ( const [suggestedQuestions, setSuggestQuestions] = useState([]) const suggestedQuestionsAbortControllerRef = useRef(null) + const { + setIterTimes, + } = workflowStore.getState() useEffect(() => { setAutoFreeze(false) return () => { @@ -102,15 +107,16 @@ export const useChat = ( handleResponding(false) if (stopChat && taskIdRef.current) stopChat(taskIdRef.current) - + setIterTimes(1) if (suggestedQuestionsAbortControllerRef.current) suggestedQuestionsAbortControllerRef.current.abort() - }, [handleResponding, stopChat]) + }, [handleResponding, setIterTimes, stopChat]) const handleRestart = useCallback(() => { conversationId.current = '' taskIdRef.current = '' handleStop() + setIterTimes(1) const newChatList = config?.opening_statement ? [{ id: `${Date.now()}`, @@ -126,6 +132,7 @@ export const useChat = ( config, handleStop, handleUpdateChatList, + setIterTimes, ]) const updateCurrentQA = useCallback(({ diff --git a/web/app/components/workflow/store.ts b/web/app/components/workflow/store.ts index d4216e1e8de471..a709d7acf8458a 100644 --- a/web/app/components/workflow/store.ts +++ b/web/app/components/workflow/store.ts @@ -166,8 +166,8 @@ type Shape = { setShowImportDSLModal: (showImportDSLModal: boolean) => void showTips: string setShowTips: (showTips: string) => void - runTimes: number - setRunTimes: (runTimes: number) => void + iterTimes: number + setIterTimes: (iterTimes: number) => void } export const createWorkflowStore = () => { @@ -283,8 +283,8 @@ export const createWorkflowStore = () => { setShowImportDSLModal: showImportDSLModal => set(() => ({ showImportDSLModal })), showTips: '', setShowTips: showTips => set(() => ({ showTips })), - runTimes: 1, - setRunTimes: runTimes => set(() => ({ runTimes })), + iterTimes: 1, + setIterTimes: iterTimes => set(() => ({ iterTimes })), })) } diff --git a/web/app/components/workflow/utils.ts b/web/app/components/workflow/utils.ts index 91656e3bbcb17c..aaf333f4d71766 100644 --- a/web/app/components/workflow/utils.ts +++ b/web/app/components/workflow/utils.ts @@ -19,7 +19,7 @@ import type { ToolWithProvider, ValueSelector, } from './types' -import { BlockEnum } from './types' +import { BlockEnum, ErrorHandleMode } from './types' import { CUSTOM_NODE, ITERATION_CHILDREN_Z_INDEX, @@ -267,8 +267,13 @@ export const initialNodes = (originNodes: Node[], originEdges: Edge[]) => { }) } - if (node.data.type === BlockEnum.Iteration) - node.data._children = iterationNodeMap[node.id] || [] + if (node.data.type === BlockEnum.Iteration) { + const iterationNodeData = node.data as IterationNodeType + iterationNodeData._children = iterationNodeMap[node.id] || [] + iterationNodeData.is_parallel = iterationNodeData.is_parallel || false + iterationNodeData.parallel_nums = iterationNodeData.parallel_nums || 10 + iterationNodeData.error_handle_mode = iterationNodeData.error_handle_mode || ErrorHandleMode.Terminated + } return node }) diff --git a/web/types/workflow.ts b/web/types/workflow.ts index 6efa09c9c4774f..f74e076058ef6d 100644 --- a/web/types/workflow.ts +++ b/web/types/workflow.ts @@ -167,6 +167,7 @@ export type NodeFinishedResponse = { parallel_start_node_id?: string iteration_index?: number iteration_id?: string + parallel_mode_run_id: string } created_at: number files?: FileResponse[] From c58e6442ca73d00b7aa67ca890a0a59f13b02a5b Mon Sep 17 00:00:00 2001 From: Nov1c444 <857526207@qq.com> Date: Thu, 24 Oct 2024 15:42:42 +0800 Subject: [PATCH 06/13] fix: chatflow teminated log show error --- api/core/app/apps/workflow_app_runner.py | 1 + api/core/app/entities/queue_entities.py | 1 + api/core/app/task_pipeline/workflow_cycle_manage.py | 7 +++++++ api/core/workflow/graph_engine/entities/event.py | 1 + api/core/workflow/nodes/iteration/iteration_node.py | 5 ++++- 5 files changed, 14 insertions(+), 1 deletion(-) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index d0f2d66cae4393..4b43805de6dc69 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -195,6 +195,7 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) node_run_index=event.route_node_state.index, predecessor_node_id=event.predecessor_node_id, in_iteration_id=event.in_iteration_id, + parallel_mode_run_id=event.parallel_mode_run_id, ) ) elif isinstance(event, NodeRunSucceededEvent): diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 47983de7103d72..d5304e2590c60d 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -273,6 +273,7 @@ class QueueNodeStartedEvent(AppQueueEvent): in_iteration_id: Optional[str] = None """iteration id if node is in iteration""" start_at: datetime + parallel_mode_run_id: Optional[str] = None class QueueNodeSucceededEvent(AppQueueEvent): diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index 9d09cf558c1196..3cdb7eb09d28f1 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -36,6 +36,7 @@ from core.ops.entities.trace_entity import TraceTaskName from core.ops.ops_trace_manager import TraceQueueManager, TraceTask from core.tools.tool_manager import ToolManager +from core.workflow.entities.node_entities import NodeRunMetadataKey from core.workflow.enums import SystemVariableKey from core.workflow.nodes import NodeType from core.workflow.nodes.tool.entities import ToolNodeData @@ -252,6 +253,12 @@ def _handle_node_execution_start( workflow_node_execution.status = WorkflowNodeExecutionStatus.RUNNING.value workflow_node_execution.created_by_role = workflow_run.created_by_role workflow_node_execution.created_by = workflow_run.created_by + workflow_node_execution.execution_metadata = json.dumps( + { + NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id, + NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id, + } + ) workflow_node_execution.created_at = datetime.now(timezone.utc).replace(tzinfo=None) session.add(workflow_node_execution) diff --git a/api/core/workflow/graph_engine/entities/event.py b/api/core/workflow/graph_engine/entities/event.py index 5064c18d28744b..bce81caed89b4c 100644 --- a/api/core/workflow/graph_engine/entities/event.py +++ b/api/core/workflow/graph_engine/entities/event.py @@ -59,6 +59,7 @@ class BaseNodeEvent(GraphEngineEvent): class NodeRunStartedEvent(BaseNodeEvent): predecessor_node_id: Optional[str] = None + parallel_mode_run_id: Optional[str] = None """predecessor node id""" diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index cf6e14b565d6de..a2c589f7498d9a 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -27,6 +27,7 @@ IterationRunSucceededEvent, NodeInIterationFailedEvent, NodeRunFailedEvent, + NodeRunStartedEvent, NodeRunStreamChunkEvent, NodeRunSucceededEvent, ) @@ -185,7 +186,6 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]: yield event if isinstance(event, IterationRunFailedEvent): q.put(None) - yield event except Empty: logger.warning("iteration parallel queue is empty.") @@ -316,6 +316,9 @@ def _handle_event_metadata(self, event: BaseNodeEvent, iter_run_index: str, para """ if not isinstance(event, BaseNodeEvent): return event + if self.node_data.is_parallel and isinstance(event, NodeRunStartedEvent): + event.parallel_mode_run_id = parallel_mode_run_id + return event if event.route_node_state.node_run_result: metadata = event.route_node_state.node_run_result.metadata if not metadata: From 96771f5d8ec26ce08279c8148e099da9ddfadbba Mon Sep 17 00:00:00 2001 From: Nov1c444 <857526207@qq.com> Date: Fri, 25 Oct 2024 14:43:36 +0800 Subject: [PATCH 07/13] fix: workflow log show error --- api/core/app/apps/workflow_app_runner.py | 1 + api/core/app/entities/queue_entities.py | 4 +- api/core/app/entities/task_entities.py | 2 + .../task_pipeline/workflow_cycle_manage.py | 24 ++--- .../workflow/graph_engine/entities/event.py | 1 + .../workflow/hooks/use-workflow-run.ts | 94 ++++++++++++++----- web/app/components/workflow/store.ts | 6 ++ web/types/workflow.ts | 3 + 8 files changed, 101 insertions(+), 34 deletions(-) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 4b43805de6dc69..9a01e8a253f97b 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -360,6 +360,7 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent) index=event.index, node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, output=event.pre_iteration_output, + parallel_mode_run_id=event.parallel_mode_run_id, ) ) elif isinstance(event, (IterationRunSucceededEvent | IterationRunFailedEvent)): diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index d5304e2590c60d..f1542ec5d8c578 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -107,7 +107,8 @@ class QueueIterationNextEvent(AppQueueEvent): """parent parallel id if node is in parallel""" parent_parallel_start_node_id: Optional[str] = None """parent parallel start node id if node is in parallel""" - + parallel_mode_run_id: Optional[str] = None + """iteratoin run in parallel mode run id""" node_run_index: int output: Optional[Any] = None # output for the current iteration @@ -274,6 +275,7 @@ class QueueNodeStartedEvent(AppQueueEvent): """iteration id if node is in iteration""" start_at: datetime parallel_mode_run_id: Optional[str] = None + """iteratoin run in parallel mode run id""" class QueueNodeSucceededEvent(AppQueueEvent): diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 4b5f4716ed08bb..7e9aad54be57e4 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -244,6 +244,7 @@ class Data(BaseModel): parent_parallel_id: Optional[str] = None parent_parallel_start_node_id: Optional[str] = None iteration_id: Optional[str] = None + parallel_run_id: Optional[str] = None event: StreamEvent = StreamEvent.NODE_STARTED workflow_run_id: str @@ -432,6 +433,7 @@ class Data(BaseModel): extras: dict = {} parallel_id: Optional[str] = None parallel_start_node_id: Optional[str] = None + parallel_mode_run_id: Optional[str] = None event: StreamEvent = StreamEvent.ITERATION_NEXT workflow_run_id: str diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index 3cdb7eb09d28f1..50928df49eb18f 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -331,18 +331,17 @@ def _handle_workflow_node_execution_failed( execution_metadata = ( json.dumps(jsonable_encoder(event.execution_metadata)) if event.execution_metadata else None ) - update_data = { - WorkflowNodeExecution.status: WorkflowNodeExecutionStatus.FAILED.value, - WorkflowNodeExecution.error: event.error, - WorkflowNodeExecution.inputs: json.dumps(inputs) if inputs else None, - WorkflowNodeExecution.process_data: json.dumps(event.process_data) if event.process_data else None, - WorkflowNodeExecution.outputs: json.dumps(outputs) if outputs else None, - WorkflowNodeExecution.finished_at: finished_at, - WorkflowNodeExecution.elapsed_time: elapsed_time, - WorkflowNodeExecution.execution_metadata: execution_metadata, - } db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.id == workflow_node_execution.id).update( - update_data + { + WorkflowNodeExecution.status: WorkflowNodeExecutionStatus.FAILED.value, + WorkflowNodeExecution.error: event.error, + WorkflowNodeExecution.inputs: json.dumps(inputs) if inputs else None, + WorkflowNodeExecution.process_data: json.dumps(event.process_data) if event.process_data else None, + WorkflowNodeExecution.outputs: json.dumps(outputs) if outputs else None, + WorkflowNodeExecution.finished_at: finished_at, + WorkflowNodeExecution.elapsed_time: elapsed_time, + WorkflowNodeExecution.execution_metadata: execution_metadata, + } ) db.session.commit() @@ -356,6 +355,7 @@ def _handle_workflow_node_execution_failed( workflow_node_execution.outputs = json.dumps(outputs) if outputs else None workflow_node_execution.finished_at = finished_at workflow_node_execution.elapsed_time = elapsed_time + workflow_node_execution.execution_metadata = execution_metadata self._wip_workflow_node_executions.pop(workflow_node_execution.node_execution_id) @@ -462,6 +462,7 @@ def _workflow_node_start_to_stream_response( parent_parallel_id=event.parent_parallel_id, parent_parallel_start_node_id=event.parent_parallel_start_node_id, iteration_id=event.in_iteration_id, + parallel_run_id=event.parallel_mode_run_id, ), ) @@ -622,6 +623,7 @@ def _workflow_iteration_next_to_stream_response( extras={}, parallel_id=event.parallel_id, parallel_start_node_id=event.parallel_start_node_id, + parallel_mode_run_id=event.parallel_mode_run_id, ), ) diff --git a/api/core/workflow/graph_engine/entities/event.py b/api/core/workflow/graph_engine/entities/event.py index bce81caed89b4c..bacea191dd866c 100644 --- a/api/core/workflow/graph_engine/entities/event.py +++ b/api/core/workflow/graph_engine/entities/event.py @@ -135,6 +135,7 @@ class BaseIterationEvent(GraphEngineEvent): parent_parallel_start_node_id: Optional[str] = None """parent parallel start node id if node is in parallel""" parallel_mode_run_id: Optional[str] = None + """iteratoin run in parallel mode run id""" class IterationRunStartedEvent(BaseIterationEvent): diff --git a/web/app/components/workflow/hooks/use-workflow-run.ts b/web/app/components/workflow/hooks/use-workflow-run.ts index c59378660fb144..31b611973fa3ce 100644 --- a/web/app/components/workflow/hooks/use-workflow-run.ts +++ b/web/app/components/workflow/hooks/use-workflow-run.ts @@ -170,11 +170,13 @@ export const useWorkflowRun = () => { const { workflowRunningData, setWorkflowRunningData, + setIterParallelLogMap, } = workflowStore.getState() const { edges, setEdges, } = store.getState() + setIterParallelLogMap(new Map()) setWorkflowRunningData(produce(workflowRunningData!, (draft) => { draft.task_id = task_id draft.result = { @@ -244,6 +246,8 @@ export const useWorkflowRun = () => { const { workflowRunningData, setWorkflowRunningData, + iterParallelLogMap, + setIterParallelLogMap, } = workflowStore.getState() const { getNodes, @@ -259,10 +263,21 @@ export const useWorkflowRun = () => { const tracing = draft.tracing! const iterations = tracing.find(trace => trace.node_id === node?.parentId) const currIteration = iterations?.details![node.data.iteration_index] || iterations?.details![iterations.details!.length - 1] - currIteration?.push({ - ...data, - status: NodeRunningStatus.Running, - } as any) + if (!data.parallel_run_id) { + currIteration?.push({ + ...data, + status: NodeRunningStatus.Running, + } as any) + } + else { + if (!iterParallelLogMap.has(data.parallel_run_id)) + iterParallelLogMap.set(data.parallel_run_id, [{ ...data, status: NodeRunningStatus.Running } as any]) + else + iterParallelLogMap.get(data.parallel_run_id)!.push({ ...data, status: NodeRunningStatus.Running } as any) + setIterParallelLogMap(iterParallelLogMap) + if (iterations) + iterations.details = Array.from(iterParallelLogMap.values()) + } })) } else { @@ -309,6 +324,8 @@ export const useWorkflowRun = () => { const { workflowRunningData, setWorkflowRunningData, + iterParallelLogMap, + setIterParallelLogMap, } = workflowStore.getState() const { getNodes, @@ -317,21 +334,21 @@ export const useWorkflowRun = () => { const nodes = getNodes() const nodeParentId = nodes.find(node => node.id === data.node_id)!.parentId if (nodeParentId) { - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - const tracing = draft.tracing! - const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node - - if (iterations && iterations.details) { - const iterationIndex = data.execution_metadata?.iteration_index || 0 - if (!iterations.details[iterationIndex]) - iterations.details[iterationIndex] = [] - - const currIteration = iterations.details[iterationIndex] - const nodeIndex = currIteration.findIndex(node => - node.node_id === data.node_id && ( - node.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || node.parallel_id === data.execution_metadata?.parallel_id), - ) - if (data.status === NodeRunningStatus.Succeeded) { + if (!data.execution_metadata.parallel_mode_run_id) { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node + + if (iterations && iterations.details) { + const iterationIndex = data.execution_metadata?.iteration_index || 0 + if (!iterations.details[iterationIndex]) + iterations.details[iterationIndex] = [] + + const currIteration = iterations.details[iterationIndex] + const nodeIndex = currIteration.findIndex(node => + node.node_id === data.node_id && ( + node.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || node.parallel_id === data.execution_metadata?.parallel_id), + ) if (nodeIndex !== -1) { currIteration[nodeIndex] = { ...currIteration[nodeIndex], @@ -344,8 +361,40 @@ export const useWorkflowRun = () => { } as any) } } - } - })) + })) + } + else { + // open parallel mode + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const tracing = draft.tracing! + const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node + + if (iterations && iterations.details) { + const iterRunID = data.execution_metadata?.parallel_mode_run_id + + const currIteration = iterParallelLogMap.get(iterRunID) + const nodeIndex = currIteration?.findIndex(node => + node.node_id === data.node_id && ( + node?.parallel_run_id === data.execution_metadata?.parallel_mode_run_id), + ) + if (currIteration) { + if (nodeIndex !== undefined && nodeIndex !== -1) { + currIteration[nodeIndex] = { + ...currIteration[nodeIndex], + ...data, + } as any + } + else { + currIteration.push({ + ...data, + } as any) + } + } + setIterParallelLogMap(iterParallelLogMap) + iterations.details = Array.from(iterParallelLogMap.values()) + } + })) + } } else { setWorkflowRunningData(produce(workflowRunningData!, (draft) => { @@ -447,7 +496,8 @@ export const useWorkflowRun = () => { if (iteration.details!.length >= iteration.metadata.iterator_length!) return } - iteration?.details!.push([]) + if (!data.parallel_mode_run_id) + iteration?.details!.push([]) })) const nodes = getNodes() const newNodes = produce(nodes, (draft) => { diff --git a/web/app/components/workflow/store.ts b/web/app/components/workflow/store.ts index a709d7acf8458a..c4a625c777e9ab 100644 --- a/web/app/components/workflow/store.ts +++ b/web/app/components/workflow/store.ts @@ -21,6 +21,7 @@ import type { WorkflowRunningData, } from './types' import { WorkflowContext } from './context' +import type { NodeTracing } from '@/types/workflow' // #TODO chatVar# // const MOCK_DATA = [ @@ -168,6 +169,8 @@ type Shape = { setShowTips: (showTips: string) => void iterTimes: number setIterTimes: (iterTimes: number) => void + iterParallelLogMap: Map + setIterParallelLogMap: (iterParallelLogMap: Map) => void } export const createWorkflowStore = () => { @@ -285,6 +288,9 @@ export const createWorkflowStore = () => { setShowTips: showTips => set(() => ({ showTips })), iterTimes: 1, setIterTimes: iterTimes => set(() => ({ iterTimes })), + iterParallelLogMap: new Map(), + setIterParallelLogMap: iterParallelLogMap => set(() => ({ iterParallelLogMap })), + })) } diff --git a/web/types/workflow.ts b/web/types/workflow.ts index f74e076058ef6d..3c0675b60572a3 100644 --- a/web/types/workflow.ts +++ b/web/types/workflow.ts @@ -19,6 +19,7 @@ export type NodeTracing = { process_data: any outputs?: any status: string + parallel_run_id?: string error?: string elapsed_time: number execution_metadata: { @@ -122,6 +123,7 @@ export type NodeStartedResponse = { id: string node_id: string iteration_id?: string + parallel_run_id?: string node_type: string index: number predecessor_node_id?: string @@ -202,6 +204,7 @@ export type IterationNextResponse = { output: any extras?: any created_at: number + parallel_mode_run_id: string execution_metadata: { parallel_id?: string } From f519f9d2c8464dab155623ae20bde3c59f8f07af Mon Sep 17 00:00:00 2001 From: Nov1c444 <857526207@qq.com> Date: Tue, 29 Oct 2024 09:54:42 +0800 Subject: [PATCH 08/13] fix: parallel nums show error --- .../nodes/iteration/iteration_node.py | 1 - web/app/components/workflow/constants.ts | 3 +- .../workflow/nodes/iteration/panel.tsx | 32 +++++++++++-------- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index a2c589f7498d9a..cb353215f9d3c1 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -188,7 +188,6 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]: q.put(None) yield event except Empty: - logger.warning("iteration parallel queue is empty.") continue # wait all threads diff --git a/web/app/components/workflow/constants.ts b/web/app/components/workflow/constants.ts index 794996d1bd8481..1d215f3fe4dabe 100644 --- a/web/app/components/workflow/constants.ts +++ b/web/app/components/workflow/constants.ts @@ -340,7 +340,8 @@ export const NODES_INITIAL_DATA = { ...ListFilterDefault.defaultValue, }, } - +export const MAX_ITERATION_PARALLEL_NUM = 10 +export const MIN_ITERATION_PARALLEL_NUM = 1 export const NODE_WIDTH = 240 export const X_OFFSET = 60 export const NODE_WIDTH_X_OFFSET = NODE_WIDTH + X_OFFSET diff --git a/web/app/components/workflow/nodes/iteration/panel.tsx b/web/app/components/workflow/nodes/iteration/panel.tsx index 5aa2949fd0f167..56e407ca7adb01 100644 --- a/web/app/components/workflow/nodes/iteration/panel.tsx +++ b/web/app/components/workflow/nodes/iteration/panel.tsx @@ -8,6 +8,7 @@ import VarReferencePicker from '../_base/components/variable/var-reference-picke import Split from '../_base/components/split' import ResultPanel from '../../run/result-panel' import IterationResultPanel from '../../run/iteration-result-panel' +import { MAX_ITERATION_PARALLEL_NUM, MIN_ITERATION_PARALLEL_NUM } from '../../constants' import type { IterationNodeType } from './types' import useConfig from './use-config' import { ErrorHandleMode, InputVarType, type NodePanelProps } from '@/app/components/workflow/types' @@ -113,21 +114,24 @@ const Panel: FC> = ({
-
- {t(`${i18nPrefix}.MaxParallelismDesc`)}
}> -
- { changeParallelNums(Number(e.target.value)) }} /> - -
+ { + inputs.is_parallel && (
+ {t(`${i18nPrefix}.MaxParallelismDesc`)}
}> +
+ { changeParallelNums(Number(e.target.value)) }} /> + +
+ + +
) + } - -
From ac0708250e5a3fd86a5d96d9671b07642acdbfb5 Mon Sep 17 00:00:00 2001 From: Nov1c444 <857526207@qq.com> Date: Tue, 29 Oct 2024 16:26:02 +0800 Subject: [PATCH 09/13] fix: unify frontend styles --- web/app/components/base/select/index.tsx | 2 +- web/app/components/workflow/constants.ts | 1 + .../components/workflow/hooks/use-workflow-run.ts | 5 ++++- web/app/components/workflow/nodes/_base/node.tsx | 8 +++++--- .../components/workflow/nodes/iteration/panel.tsx | 12 +++++++----- .../workflow/panel/debug-and-preview/hooks.ts | 5 +++-- .../workflow/run/iteration-result-panel.tsx | 2 +- 7 files changed, 22 insertions(+), 13 deletions(-) diff --git a/web/app/components/base/select/index.tsx b/web/app/components/base/select/index.tsx index ee5cee977b87b7..c70cf246619c9c 100644 --- a/web/app/components/base/select/index.tsx +++ b/web/app/components/base/select/index.tsx @@ -125,7 +125,7 @@ const Select: FC = ({
- {filteredItems.length > 0 && ( + {(filteredItems.length > 0 && open) && ( {filteredItems.map((item: Item) => ( { const { workflowRunningData, setWorkflowRunningData, + setIterTimes, } = workflowStore.getState() const { getNodes, @@ -437,6 +439,7 @@ export const useWorkflowRun = () => { transform, } = store.getState() const nodes = getNodes() + setIterTimes(DEFAULT_ITER_TIMES) setWorkflowRunningData(produce(workflowRunningData!, (draft) => { draft.tracing!.push({ ...data, @@ -533,7 +536,7 @@ export const useWorkflowRun = () => { }) } })) - setIterTimes(1) + setIterTimes(DEFAULT_ITER_TIMES) const newNodes = produce(nodes, (draft) => { const currentNode = draft.find(node => node.id === data.node_id)! diff --git a/web/app/components/workflow/nodes/_base/node.tsx b/web/app/components/workflow/nodes/_base/node.tsx index 1dbf79da553907..e864c419e22042 100644 --- a/web/app/components/workflow/nodes/_base/node.tsx +++ b/web/app/components/workflow/nodes/_base/node.tsx @@ -168,9 +168,11 @@ const BaseNode: FC = ({ />
- {data.title} +
+ {data.title} +
{ data.type === BlockEnum.Iteration && (data as IterationNodeType).is_parallel && ( = ({ {t('workflow.nodes.iteration.parallelModeEnableDesc')}
} > -
+
{t('workflow.nodes.iteration.parallelModeUpper')}
diff --git a/web/app/components/workflow/nodes/iteration/panel.tsx b/web/app/components/workflow/nodes/iteration/panel.tsx index 56e407ca7adb01..4ba42d488e81b1 100644 --- a/web/app/components/workflow/nodes/iteration/panel.tsx +++ b/web/app/components/workflow/nodes/iteration/panel.tsx @@ -109,14 +109,14 @@ const Panel: FC> = ({ />
-
+
{t(`${i18nPrefix}.parallelPanelDesc`)}
} inline>
{ - inputs.is_parallel && (
- {t(`${i18nPrefix}.MaxParallelismDesc`)}
}> + inputs.is_parallel && (
+ {t(`${i18nPrefix}.MaxParallelismDesc`)}
}>
{ changeParallelNums(Number(e.target.value)) }} /> > = ({
) } +
+ +
- -
+
diff --git a/web/app/components/workflow/panel/debug-and-preview/hooks.ts b/web/app/components/workflow/panel/debug-and-preview/hooks.ts index aa5eebc7783ab3..5d932a1ba21608 100644 --- a/web/app/components/workflow/panel/debug-and-preview/hooks.ts +++ b/web/app/components/workflow/panel/debug-and-preview/hooks.ts @@ -10,6 +10,7 @@ import { uniqBy } from 'lodash-es' import { useWorkflowRun } from '../../hooks' import { NodeRunningStatus, WorkflowRunningStatus } from '../../types' import { useWorkflowStore } from '../../store' +import { DEFAULT_ITER_TIMES } from '../../constants' import type { ChatItem, Inputs, @@ -107,7 +108,7 @@ export const useChat = ( handleResponding(false) if (stopChat && taskIdRef.current) stopChat(taskIdRef.current) - setIterTimes(1) + setIterTimes(DEFAULT_ITER_TIMES) if (suggestedQuestionsAbortControllerRef.current) suggestedQuestionsAbortControllerRef.current.abort() }, [handleResponding, setIterTimes, stopChat]) @@ -116,7 +117,7 @@ export const useChat = ( conversationId.current = '' taskIdRef.current = '' handleStop() - setIterTimes(1) + setIterTimes(DEFAULT_ITER_TIMES) const newChatList = config?.opening_statement ? [{ id: `${Date.now()}`, diff --git a/web/app/components/workflow/run/iteration-result-panel.tsx b/web/app/components/workflow/run/iteration-result-panel.tsx index efe039a8cb972c..c4cd909f2ed9ae 100644 --- a/web/app/components/workflow/run/iteration-result-panel.tsx +++ b/web/app/components/workflow/run/iteration-result-panel.tsx @@ -75,7 +75,7 @@ const IterationResultPanel: FC = ({ { iteration.some(item => item.status === 'failed') ? ( - + ) : (< RiArrowRightSLine className={ cn( From d15c9d3141af7717587d82fe7123aed1f045e18e Mon Sep 17 00:00:00 2001 From: Nov1c444 <857526207@qq.com> Date: Thu, 31 Oct 2024 11:28:27 +0800 Subject: [PATCH 10/13] fix: add missing type annotations --- api/core/workflow/nodes/iteration/entities.py | 9 +-------- .../workflow/nodes/iteration/iteration_node.py | 16 +++++++++------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/api/core/workflow/nodes/iteration/entities.py b/api/core/workflow/nodes/iteration/entities.py index 1784b2d0f57795..81ec5a7623efb5 100644 --- a/api/core/workflow/nodes/iteration/entities.py +++ b/api/core/workflow/nodes/iteration/entities.py @@ -6,18 +6,11 @@ from core.workflow.nodes.base import BaseIterationNodeData, BaseIterationState, BaseNodeData -class ErrorHandleMode(Enum): +class ErrorHandleMode(str, Enum): TERMINATED = "Terminated" CONTINUE_ON_ERROR = "Continue on error" REMOVE_ABNORMAL_OUTPUT = "Remove abnormal output" - def to_json(self): - return self.value - - @classmethod - def from_json(cls, value): - return cls(value) - class IterationNodeData(BaseIterationNodeData): """ diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index cb353215f9d3c1..4a859f24675a9e 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -4,7 +4,7 @@ from concurrent.futures import Future, wait from datetime import datetime, timezone from queue import Empty, Queue -from typing import Any, Optional, cast +from typing import TYPE_CHECKING, Any, Optional, cast from flask import Flask, current_app @@ -38,6 +38,8 @@ from core.workflow.nodes.iteration.entities import ErrorHandleMode, IterationNodeData from models.workflow import WorkflowNodeExecutionStatus +if TYPE_CHECKING: + from core.workflow.graph_engine.graph_engine import GraphEngine logger = logging.getLogger(__name__) @@ -339,8 +341,8 @@ def _run_single_iter( inputs: dict[str, list], outputs: list, start_at: datetime, - graph_engine, - iteration_graph, + graph_engine: "GraphEngine", + iteration_graph: Graph, parallel_mode_run_id: Optional[str] = None, ): """ @@ -510,10 +512,10 @@ def _run_single_iter_parallel( inputs: dict[str, list], outputs: list, start_at: datetime, - graph_engine, - iteration_graph, - index, - item, + graph_engine: "GraphEngine", + iteration_graph: Graph, + index: int, + item: Any, ): """ run single iteration in parallel mode From b5413f41d3c358f1d42c4b8d348728d62e65c401 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Mon, 4 Nov 2024 13:13:36 +0800 Subject: [PATCH 11/13] feat(workflow): add handling for QueueNodeInIterationFailedEvent - Extended event parameter to include QueueNodeInIterationFailedEvent. --- api/core/app/task_pipeline/workflow_cycle_manage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index 50928df49eb18f..b89edf9079f043 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -479,7 +479,7 @@ def _workflow_node_start_to_stream_response( def _workflow_node_finish_to_stream_response( self, - event: QueueNodeSucceededEvent | QueueNodeFailedEvent, + event: QueueNodeSucceededEvent | QueueNodeFailedEvent | QueueNodeInIterationFailedEvent, task_id: str, workflow_node_execution: WorkflowNodeExecution, ) -> Optional[NodeFinishStreamResponse]: From 92ec7ae10df7c3112d0b1e804ccc679139e5fcab Mon Sep 17 00:00:00 2001 From: Nov1c444 <857526207@qq.com> Date: Fri, 1 Nov 2024 15:43:10 +0800 Subject: [PATCH 12/13] chore: change the parallel warning show logic --- .../workflow/hooks/use-nodes-interactions.ts | 6 ++-- .../workflow/nodes/iteration/default.ts | 34 ++++++++++++++----- .../workflow/nodes/iteration/node.tsx | 4 +-- .../workflow/nodes/iteration/types.ts | 3 +- web/app/components/workflow/run/node.tsx | 6 +++- 5 files changed, 37 insertions(+), 16 deletions(-) diff --git a/web/app/components/workflow/hooks/use-nodes-interactions.ts b/web/app/components/workflow/hooks/use-nodes-interactions.ts index 7e53592f2c7c16..375a269377166a 100644 --- a/web/app/components/workflow/hooks/use-nodes-interactions.ts +++ b/web/app/components/workflow/hooks/use-nodes-interactions.ts @@ -644,10 +644,10 @@ export const useNodesInteractions = () => { newNode.data.isInIteration = true newNode.data.iteration_id = prevNode.parentId newNode.zIndex = ITERATION_CHILDREN_Z_INDEX - if (newNode.data.type === BlockEnum.Answer) { + if (newNode.data.type === BlockEnum.Answer || newNode.data.type === BlockEnum.Tool || newNode.data.type === BlockEnum.Assigner) { const parentIterNodeIndex = nodes.findIndex(node => node.id === prevNode.parentId) - if (nodes[parentIterNodeIndex].data._isFirstTime) - nodes[parentIterNodeIndex].data._isShowTips = true + const iterNodeData: IterationNodeType = nodes[parentIterNodeIndex].data + iterNodeData._isShowTips = true } } diff --git a/web/app/components/workflow/nodes/iteration/default.ts b/web/app/components/workflow/nodes/iteration/default.ts index 82845e204903a8..cdef268adbdae3 100644 --- a/web/app/components/workflow/nodes/iteration/default.ts +++ b/web/app/components/workflow/nodes/iteration/default.ts @@ -1,7 +1,10 @@ import { BlockEnum, ErrorHandleMode } from '../../types' import type { NodeDefault } from '../../types' import type { IterationNodeType } from './types' -import { ALL_CHAT_AVAILABLE_BLOCKS, ALL_COMPLETION_AVAILABLE_BLOCKS } from '@/app/components/workflow/constants' +import { + ALL_CHAT_AVAILABLE_BLOCKS, + ALL_COMPLETION_AVAILABLE_BLOCKS, +} from '@/app/components/workflow/constants' const i18nPrefix = 'workflow' const nodeDefault: NodeDefault = { @@ -10,7 +13,6 @@ const nodeDefault: NodeDefault = { iterator_selector: [], output_selector: [], _children: [], - _isFirstTime: true, _isShowTips: false, is_parallel: false, parallel_nums: 10, @@ -19,21 +21,37 @@ const nodeDefault: NodeDefault = { getAvailablePrevNodes(isChatMode: boolean) { const nodes = isChatMode ? ALL_CHAT_AVAILABLE_BLOCKS - : ALL_COMPLETION_AVAILABLE_BLOCKS.filter(type => type !== BlockEnum.End) + : ALL_COMPLETION_AVAILABLE_BLOCKS.filter( + type => type !== BlockEnum.End, + ) return nodes }, getAvailableNextNodes(isChatMode: boolean) { - const nodes = isChatMode ? ALL_CHAT_AVAILABLE_BLOCKS : ALL_COMPLETION_AVAILABLE_BLOCKS + const nodes = isChatMode + ? ALL_CHAT_AVAILABLE_BLOCKS + : ALL_COMPLETION_AVAILABLE_BLOCKS return nodes }, checkValid(payload: IterationNodeType, t: any) { let errorMessages = '' - if (!errorMessages && (!payload.iterator_selector || payload.iterator_selector.length === 0)) - errorMessages = t(`${i18nPrefix}.errorMsg.fieldRequired`, { field: t(`${i18nPrefix}.nodes.iteration.input`) }) + if ( + !errorMessages + && (!payload.iterator_selector || payload.iterator_selector.length === 0) + ) { + errorMessages = t(`${i18nPrefix}.errorMsg.fieldRequired`, { + field: t(`${i18nPrefix}.nodes.iteration.input`), + }) + } - if (!errorMessages && (!payload.output_selector || payload.output_selector.length === 0)) - errorMessages = t(`${i18nPrefix}.errorMsg.fieldRequired`, { field: t(`${i18nPrefix}.nodes.iteration.output`) }) + if ( + !errorMessages + && (!payload.output_selector || payload.output_selector.length === 0) + ) { + errorMessages = t(`${i18nPrefix}.errorMsg.fieldRequired`, { + field: t(`${i18nPrefix}.nodes.iteration.output`), + }) + } return { isValid: !errorMessages, diff --git a/web/app/components/workflow/nodes/iteration/node.tsx b/web/app/components/workflow/nodes/iteration/node.tsx index 05b2fe15b30369..fda033b87a22c7 100644 --- a/web/app/components/workflow/nodes/iteration/node.tsx +++ b/web/app/components/workflow/nodes/iteration/node.tsx @@ -31,13 +31,13 @@ const Node: FC> = ({ useEffect(() => { if (nodesInitialized) handleNodeIterationRerender(id) - if (data.is_parallel && data._isShowTips && data._isFirstTime) { + if (data.is_parallel && data._isShowTips) { Toast.notify({ type: 'warning', message: t(`${i18nPrefix}.answerNodeWarningDesc`), duration: 5000, }) - data._isFirstTime = false + data._isShowTips = false } }, [nodesInitialized, id, handleNodeIterationRerender, data, t]) diff --git a/web/app/components/workflow/nodes/iteration/types.ts b/web/app/components/workflow/nodes/iteration/types.ts index a35d56c2f07697..4a20dbd456c910 100644 --- a/web/app/components/workflow/nodes/iteration/types.ts +++ b/web/app/components/workflow/nodes/iteration/types.ts @@ -16,6 +16,5 @@ export type IterationNodeType = CommonNodeType & { is_parallel: boolean // open the parallel mode or not parallel_nums: number // the numbers of parallel error_handle_mode: ErrorHandleMode // how to handle error in the iteration - _isShowTips?: boolean // when answer node in parallel mode iteration show tips - _isFirstTime?: boolean // is the first time to add parallel iteration node + _isShowTips: boolean // when answer node in parallel mode iteration show tips } diff --git a/web/app/components/workflow/run/node.tsx b/web/app/components/workflow/run/node.tsx index 7d1a73154518fa..f77b1aca3a80cc 100644 --- a/web/app/components/workflow/run/node.tsx +++ b/web/app/components/workflow/run/node.tsx @@ -76,7 +76,11 @@ const NodePanel: FC = ({ if (!details || details.length === 0) return 0 - return details.flat().filter(item => item.status === 'failed').length + return details.reduce((acc, iteration) => { + if (iteration.some(item => item.status === 'failed')) + acc++ + return acc + }, 0) } useEffect(() => { setCollapseState(!nodeInfo.expand) From cbf6c77d36eaf64d15f6e4b705826d10069adf78 Mon Sep 17 00:00:00 2001 From: Nov1c444 <857526207@qq.com> Date: Tue, 5 Nov 2024 10:01:15 +0800 Subject: [PATCH 13/13] fix: correct enum value naming --- api/core/workflow/nodes/iteration/entities.py | 6 +++--- api/core/workflow/nodes/iteration/iteration_node.py | 9 +++++---- web/app/components/workflow/types.ts | 6 +++--- web/i18n/en-US/workflow.ts | 6 +++--- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/api/core/workflow/nodes/iteration/entities.py b/api/core/workflow/nodes/iteration/entities.py index 81ec5a7623efb5..ebcb6f82fbc397 100644 --- a/api/core/workflow/nodes/iteration/entities.py +++ b/api/core/workflow/nodes/iteration/entities.py @@ -7,9 +7,9 @@ class ErrorHandleMode(str, Enum): - TERMINATED = "Terminated" - CONTINUE_ON_ERROR = "Continue on error" - REMOVE_ABNORMAL_OUTPUT = "Remove abnormal output" + TERMINATED = "terminated" + CONTINUE_ON_ERROR = "continue-on-error" + REMOVE_ABNORMAL_OUTPUT = "remove-abnormal-output" class IterationNodeData(BaseIterationNodeData): diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index 4a859f24675a9e..d121b0530a6b32 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -311,7 +311,9 @@ def _extract_variable_selector_to_variable_mapping( return variable_mapping - def _handle_event_metadata(self, event: BaseNodeEvent, iter_run_index: str, parallel_mode_run_id: str): + def _handle_event_metadata( + self, event: BaseNodeEvent, iter_run_index: str, parallel_mode_run_id: str + ) -> NodeRunStartedEvent | BaseNodeEvent: """ add iteration metadata to event. """ @@ -344,7 +346,7 @@ def _run_single_iter( graph_engine: "GraphEngine", iteration_graph: Graph, parallel_mode_run_id: Optional[str] = None, - ): + ) -> Generator[NodeEvent | InNodeEvent, None, None]: """ run single iteration """ @@ -472,7 +474,6 @@ def _run_single_iter( if next_index < len(iterator_list_value): variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) - yield IterationRunNextEvent( iteration_id=self.id, iteration_node_id=self.node_id, @@ -516,7 +517,7 @@ def _run_single_iter_parallel( iteration_graph: Graph, index: int, item: Any, - ): + ) -> Generator[NodeEvent | InNodeEvent, None, None]: """ run single iteration in parallel mode """ diff --git a/web/app/components/workflow/types.ts b/web/app/components/workflow/types.ts index 2fb7f6758f6a3a..9b6ad033bfc17d 100644 --- a/web/app/components/workflow/types.ts +++ b/web/app/components/workflow/types.ts @@ -37,9 +37,9 @@ export enum ControlMode { Hand = 'hand', } export enum ErrorHandleMode { - Terminated = 'Terminated', - ContinueOnError = 'Continue on error', - RemoveAbnormalOutput = 'Remove abnormal output', + Terminated = 'terminated', + ContinueOnError = 'continue-on-error', + RemoveAbnormalOutput = 'remove-abnormal-output', } export type Branch = { id: string diff --git a/web/i18n/en-US/workflow.ts b/web/i18n/en-US/workflow.ts index 7fc7bf9f699e45..1c6639aba01ed2 100644 --- a/web/i18n/en-US/workflow.ts +++ b/web/i18n/en-US/workflow.ts @@ -568,9 +568,9 @@ const translation = { MaxParallelismDesc: 'The maximum parallelism is used to control the number of tasks executed simultaneously in a single iteration.', errorResponseMethod: 'Error response method', ErrorMethod: { - operationTerminated: 'Terminated', - continueOnError: 'Continue on error', - removeAbnormalOutput: 'Remove abnormal output', + operationTerminated: 'terminated', + continueOnError: 'continue-on-error', + removeAbnormalOutput: 'remove-abnormal-output', }, answerNodeWarningDesc: 'Parallel mode warning: Answer nodes, conversation variable assignments, and persistent read/write operations within iterations may cause exceptions.', },