Replies: 2 comments 2 replies
-
Hi @cobycloud, I'd like to propose a pipeline implementation that offers flexible, extensible capabilities for our SDK. Based on our previous discussions about creating specialized task processing mechanisms, I've drafted a Pipeline class with the following key characteristics: @dataclass
class Pipeline(Generic[StageType, ResultType]):
stages: List[PipelineStage[StageType, ResultType]] = field(default_factory=list)
status: PipelineStatus = PipelineStatus.PENDING
def add_stage(self, stage: PipelineStage[StageType, ResultType]):
"""Add a stage to the pipeline"""
self.stages.append(stage)
def run(self, initial_data: StageType) -> Optional[ResultType]:
"""Execute pipeline stages sequentially"""
try:
self.status = PipelineStatus.RUNNING
current_data = initial_data
for stage in self.stages:
current_data = stage.process(current_data)
self.status = PipelineStatus.COMPLETED
return current_data
except Exception as e:
self.status = PipelineStatus.FAILED
print(f"Pipeline execution failed: {e}")
return None Key Design Considerations1. Flexibility
2. Status Tracking
3. Stage Processing
Proposed Enhancements
Example Use Case# Example: Text Processing Pipeline
def tokenize(text: str) -> List[str]:
return text.split()
def remove_stopwords(tokens: List[str]) -> List[str]:
stopwords = {'the', 'a', 'an', 'in'}
return [token for token in tokens if token not in stopwords]
def count_tokens(tokens: List[str]) -> Dict[str, int]:
from collections import Counter
return dict(Counter(tokens))
# Creating the pipeline
text_pipeline = Pipeline[str, Dict[str, int]]()
text_pipeline.add_stage(PipelineStage(
name="Tokenization",
processor=tokenize,
input_type=str,
output_type=List[str]
))
text_pipeline.add_stage(PipelineStage(
name="Stop Word Removal",
processor=remove_stopwords,
input_type=List[str],
output_type=List[str]
))
text_pipeline.add_stage(PipelineStage(
name="Token Counting",
processor=count_tokens,
input_type=List[str],
output_type=Dict[str, int]
))
# Running the pipeline
result = text_pipeline.run("The quick brown fox jumps over the lazy dog")
print(result) # Outputs token counts Thoughts? |
Beta Was this translation helpful? Give feedback.
1 reply
-
here is my proposed implementation for from swarmauri.agents.base.AgentBase import AgentBase
class AgentStatus(Enum):
IDLE = auto()
WORKING = auto()
COMPLETED = auto()
FAILED = auto()
class PipelineAgent(AgentBase):
"""
Manages a pipeline of agents, where the output of one agent serves as the input for the next.
"""
llm: SubclassUnion[LLMBase]
conversation: SubclassUnion[ConversationBase]
pipeline: List[SubclassUnion[SwarmAgentBase]] = Field(
default_factory=list, description="Sequence of agents forming the pipeline."
)
type: Literal["PipelineAgent"] = "PipelineAgent"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._initialize_pipeline()
def _initialize_pipeline(self):
"""
Initialize the pipeline with specified agent classes.
"""
if not self.pipeline:
raise ValueError("Pipeline must contain at least one agent.")
self.agents = [
agent_class(llm=self.llm, conversation=self.conversation)
for agent_class in self.pipeline
]
async def _process_pipeline(
self, initial_input: Any, llm_kwargs: Optional[Dict] = {}
):
"""
Process tasks sequentially through the pipeline.
"""
current_input = initial_input
for idx, agent in enumerate(self.agents):
try:
agent.status = AgentStatus.WORKING
result = await agent.process(current_input, llm_kwargs)
current_input = result # Pass the result as input to the next agent
agent.status = AgentStatus.COMPLETED
except Exception as e:
agent.status = AgentStatus.FAILED
logging.error(f"Error in pipeline at agent {idx}: {e}")
break # Stop the pipeline on failure
return current_input
async def exec_pipeline(self, initial_data: Any, llm_kwargs: Optional[Dict] = {}):
"""
Execute the entire pipeline.
"""
if not isinstance(initial_data, (str, IMessage)):
raise TypeError("Initial data must be a string or IMessage instance.")
if isinstance(initial_data, str):
initial_data = HumanMessage(content=initial_data)
self.conversation.add_message(initial_data)
# Run the pipeline with the initial input
final_output = await self._process_pipeline(initial_data.content, llm_kwargs)
return final_output |
Beta Was this translation helpful? Give feedback.
1 reply
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Discussion regarding pipeline strategies
Beta Was this translation helpful? Give feedback.
All reactions