Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Conditional Routing in Haystack 2.x Pipelines #6109

Closed
vblagoje opened this issue Oct 18, 2023 · 5 comments · Fixed by #6147
Closed

Add Conditional Routing in Haystack 2.x Pipelines #6109

vblagoje opened this issue Oct 18, 2023 · 5 comments · Fixed by #6147
Assignees
Labels
2.x Related to Haystack v2.0
Milestone

Comments

@vblagoje
Copy link
Member

vblagoje commented Oct 18, 2023

Description:

In the current state of Haystack 2.x pipelines, we can create pipelines by connecting output slots to input slots:

pipe.connect("fetcher.streams", "converter.sources")
pipe.connect("converter.documents", "text_splitter.documents")

However, there isn't direct support for conditional routing based on user-specified boolean conditions. For example, when LinkContentFetcher returns too few streams (i.e. perhaps it got blocked) we want to make decisions based on the number of streams received. If we receive too few streams, we might want to reformulate the query in some QueryRewriter component, and then rerun it via LinkContentFetcher etc etc.

Describe the solution you'd like:

Introduce a well-known Haystack component, possibly named ConnectionRouter (or a similar fitting name), which supports registration of boolean expressions as keys and component input slots as values:

fetcher = LinkContentFetcher()
router = ConnectionRouter()
converter = HTMLToDocument()
query_rewriter = QueryRewriter()

router.register(lambda streams: len(streams) < 2, query_rewriter)
router.register(lambda streams: len(streams) >= 2, converter)
...
pipe.add_component("fetcher", fetcher)
pipe.add_component("router", router)
pipe.add_component("query_rewriter", query_rewriter)
pipe.add_component("converter", html_converter)
....
pipe.connect("fetcher.streams", "router.input")
pipe.connect("converter.documents", "text_splitter.documents")
pipe.connect("query_rewriter.urls", "fetcher.urls")

By utilizing such a component, users will have the flexibility to implement custom branching logic in their pipelines based on the component result object boolean expressions and not only on output slots.

Describe alternatives you've considered

The alternative seems to be creating a custom component for each branching scenario (use case)

Additional context

Having such a generic ConnectionRouter would offer immediate benefits:

  1. Flexibility: Allows custom conditional, super-detailed, custom branching in the pipeline, making Haystack more adaptable to custom user scenarios.
  2. Readability: Having a dedicated router component can make the pipeline more readable, especially when complex routing decisions are needed.
  3. Usability: Users can leverage lambda functions or any other callable to define custom conditions, making it powerful yet simple.
@vblagoje vblagoje added the 2.x Related to Haystack v2.0 label Oct 18, 2023
@vblagoje
Copy link
Member Author

Another use case where I needed such a ConnectionRouter is with LLM results and function calling. In step 4 of the colab notebook, I want to inspect ChatMessage to see if it's metadata indicates finish_reason to be function_call. In such a case, I'd like to route ChatMessage to a component A that'll resolve the function, invoke it, and append the response to messages. By routing the output slot of component A back to LLM, we can complete the function call (step 6 of the notebook).

@masci
Copy link
Contributor

masci commented Oct 19, 2023

This looks great and the code example really helped me understanding what we want to achieve here. I have two concerns with the solution, let me illustrate them.

  1. The register API doesn't play well with serialization - imagine I want to rewrite the example in Yaml format:
components:
  converter:
    type: HTMLToDocument

  fetcher:
    init_parameters:
      raise_on_failure: true
      retry_attempts: 2
      timeout: 3
      user_agents:
      - haystack/LinkContentFetcher/1.22.0rc0
    type: LinkContentFetcher

  router:
    type: ConnectionRouter
    # how do I express "register"?

connections:
- receiver: router.input
  sender: fetcher.streams

One alternative solution might be leveraging the new filters (see the proposal) so that we can express conditions in text format instead of Python code:

## Before:
# router = ConnectionRouter()
# router.register(lambda streams: len(streams) < 2, query_rewriter)
# router.register(lambda streams: len(streams) >= 2, converter)

# After
routes = {
  query_rewriter: { "input": "streams", "operator": "<", "value": "2" },
  converter: { "input": "streams", "operator": ">=", "value": "2" },
}
router = ConnectionRouter(routes=routes)

# ... rest stays the same

which would be easy to express in Yaml:

  router:
    type: ConnectionRouter
    init_parameters:
      routes: 
        query_rewriter: { "input": "streams", "operator": "<", "value": "2" }
        converter: { "input": "streams", "operator": ">=", "value": "2" }
# ...
  1. How do you connect the router to the downstream components? If you draw the pipeline of the example, query_rewriter and converter would have no inputs.

@ZanSara
Copy link
Contributor

ZanSara commented Oct 20, 2023

@masci I like in general the idea of using filters, but I'm not sure they're expressive enough. One may want to route objects (think Answers, ChatMessages,ByteStreams, even just Documents) by the content of some attribute. Would the filters manage to represent that?

@ZanSara
Copy link
Contributor

ZanSara commented Oct 20, 2023

I'm also a bit perplexed by the fact that in the examples, router never appears as the source in any connect statement. An oversight? Or is it meant to be that way? I was thinking that the register() function would have a signature like:

def register(self, input_name: str, input_type: Type, function: Callable, output_name: str, output_type: Type):

while right now seems to be:

def register(self, function: Callable, component_to_connect_to: Component):

which I think Canals wouldn't support right now.

@masci
Copy link
Contributor

masci commented Oct 23, 2023

I like in general the idea of using filters, but I'm not sure they're expressive enough

True, still eval can't be the solution, it would be nice to iterate the concept of a "declarative" representation of the boolean condition.

@Timoeller Timoeller added this to the 2.0-beta milestone Nov 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
2.x Related to Haystack v2.0
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants