Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search Pipelines] Share state between multiple processors #6722

Closed
macohen opened this issue Mar 16, 2023 · 19 comments
Closed

[Search Pipelines] Share state between multiple processors #6722

macohen opened this issue Mar 16, 2023 · 19 comments
Assignees
Labels
Search:Relevance Search Search query, autocomplete ...etc v2.12.0 Issues and PRs related to version 2.12.0 v3.0.0 Issues and PRs related to version 3.0.0

Comments

@macohen
Copy link
Contributor

macohen commented Mar 16, 2023

As a user of search pipelines I want a processor that can modify both search requests and responses so I can do things like paginating where the user requests results 21-30, but a reranker needs 1-40 as input; the reranker sorts and then returns 21-30 from the sorted list.

@macohen
Copy link
Contributor Author

macohen commented Jul 26, 2023

This is described more in opensearch-project/search-processor#80

@msfroh
Copy link
Collaborator

msfroh commented Jul 31, 2023

I'd like to try to formalize some behavior, especially now that a) search pipelines are out, so we know exactly how request/response processors get configured, and b) we'd like to try to get bracket processors out soon.

Balance and order

I still feel that we should enforce a rule saying bracket processors need to be balanced like a stack (like brackets or parentheses) -- that is, just as we can't say foo[bar(baz]) (where foo[] and bar() are function calls), the order of bracket processors on the request chain must be reversed on the response chain. I like that because it means that you can represent the whole flow from initial request through final response as nested function calls. (Maybe that's only important to me, though.)

Explicit and implicit balancing

Because we may want to modify requests and responses with SearchRequestProcessors and SearchResponseProcessors before/after bracket processors act, we need a way of explicitly communicating when a given processor should run (as long as it doesn't violate the "reverse order" rule specified above).

An example of that explicit ordering could be something like:

"search_pipeline": {
  "request_processors" : [
    {
      "type": "oversample_bracket",
      "scale": 10,
      "tag": "oversample_10x"
    }
  ],
  "response_processors": [
    {
      "type": "load_external_fields",
      "source_url_pattern": "https://example.com/data/{id}"
    },
    {
      "type": "filter_value_regex",
      "target_field": "external_field_b",
      "value_regex": "foo.*"
    },
    {
      "type": "oversample_bracket",
      "tag": "oversample_10x"
    }
  ]
}

In this example, we're using a hypothetical request processor to increase the requested size by 10x before sending the request to the OpenSearch index. Then we augment the matched results by calling some hypothetical external data store (similar to DynamoDBStoreResponseProcessor that i hacked together a few weeks ago). Then we exclude results if one of the externally-defined fields don't match some regex (which we couldn't easily do as part of the search query). Finally, we undo the oversampling, discarding any remaining results beyond the initial size parameter.

The tag property in this example can be used to identify specific instances of a given bracket processor, if multiple bracket processors of the same type are referenced. Multiple bracket processors of the same type with the same tag (or absence of tag) should throw an exception, I think.

I was also thinking that we might want to implicitly balance any bracket processors by closing any open brackets at the end of the chain of response processors. In the above example, if the bracket processor was not specified in the response processor chain, it would still be included at the end. You would need to explicitly reference it in the response processor chain if you want it to execute before some other response processor(s). Given that implicit balancing would be a convenience feature that supports a subset of explicit balancing behavior, we could leave that to a later release (i.e. the initial release would only support explicit balancing and would throw an exception if a bracket processor is not balanced).

Implementation challenge: per-request processor state

Right now, search pipelines are created as a result of a PUT pipeline request (which writes to cluster state and that cluster state gets propagated to every data node, which listens for the cluster state update and creates the search pipeline object which is held in a HashMap keyed off the pipeline ID) and processors in the pipeline are assumed to be immutable. The whole reason why one would want to create a bracket processor (instead of just creating separate request and response processors) is to be able to share some state between the request side and the response side.

So, how can we make a pipeline with stateful processors without changing everything? I think we can take advantage of the PipelinedRequest class that already combines a pipeline with a search request.

I see a couple of possible solutions for propagating state:

  1. Before we transform the search request, we ask the pipeline to create a request-scoped instance of itself, which would delegate to the individual processors to return a request-scoped instance. For existing immutable request/response processors, they could just return this, whereas a bracket processor could return an instance with mutable fields to carry state between the transformRequest and transformResponse calls. (The same instance would be included as a request processor and response processor.)
  2. We do something like a servlet's "request scope", where there's a Map<String, Object> held by the PipelinedRequest. The BracketProcessor interface would have its own version of transformRequest and transformResponse that would take this scope map as an additional parameter. We could make it a Map<String, Map<String, Object>>, where the top-level map would be keyed off the processor's type:tag unique identifier, so that processors are only given access to their own scope.

I think the second option is easier to implement in OpenSearch core, but I feel like the first option might feel a little more natural for processor implementers (since the transformRequest and transformResponse method signatures are the same as for request/response processors). On the other hand, I kind of like that the second option makes it abundantly clear (via the method signatures) that you're doing this to propagate state from request to response (while the processor instance itself is still immutable). I'm leaning more toward the second option, but I'm happy to be convinced otherwise.

@macohen macohen added the v2.10.0 label Aug 3, 2023
@austintlee
Copy link
Contributor

@msfroh
On the matter of the processor ordering, I think it will be far simpler to construct search pipelines if the Processor interface had both "before" and "after" methods.

E.g.

interface Processor {

    SearchRequest before(SearchRequest request);
    SearchResponse after(SearchRequest request, SearchResponse response);

}

Then a search pipeline can look like this instead:

"search_pipeline": {
  "processors" : [
    {
      "type": "oversample_bracket",
      "scale": 10,
      "tag": "oversample_10x"
    },
   {
      "type": "load_external_fields",
      "source_url_pattern": "https://example.com/data/{id}"
    },
    {
      "type": "filter_value_regex",
      "target_field": "external_field_b",
      "value_regex": "foo.*"
    }
  ]
}

To make that work, we'll have to add some default methods to SearchRequestProcessor and SearchResponseProcessor:

public interface SearchRequestProcessor extends Processor {
    SearchRequest processRequest(SearchRequest request) throws Exception;

    default SearchRequest before(SearchRequest request) throws Exception {
        return processRequest(request);
    }

    default SearchResponse after(SearchRequest request, SearchResponse response) {
        return response;
    }
}
public interface SearchResponseProcessor extends Processor {
    SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception;

    default SearchRequest before(SearchRequest request) {
        return request;
    }

    default SearchResponse after(SearchRequest request, SearchResponse response) throws Exception {
        return processResponse(request, response);
    }
}

I think it's counter-intuitive to have to put a bracket process in two places ("request_processors" and "response_processors").

On the question of state propagation, have you decided which option you want to proceed with?

Are there any tasks that I can help with?

@navneet1v
Copy link
Contributor

Hi @msfroh ,
Looking at the proposal, one thing which is not clear to me motivation behind having a bracket processor. If there are response and request processors separately that can just undo the work then we can just avoid this complexity.

As per my understanding this is just more of convenience feature so that users are not making mistakes while building the query weighs less than adding states in simple and stateful api interface.(again this is just my thought)

If possible can you add more use cases where having a stateful Search pipeline will be useful?

@austintlee
Copy link
Contributor

@navneet1v One use case from conversational search is that we want to allow the request processor to re-write the query/question and have the re-written question be propagated to the response processor. The response processor now has both the question and the answer (from an LLM) that it can then store in (conversational) memory. Here, we want to think of what the request processor does and what the response processor does as one unit of work in RAG. This can be modeled using a Processor interface with before() and after().

@navneet1v
Copy link
Contributor

navneet1v commented Aug 16, 2023

@austintlee

One use case from opensearch-project/ml-commons#1150 is that we want to allow the request processor to re-write the query/question and have the re-written question be propagated to the response processor.

If Search Pipeline States are there then I don't see why this cannot be done, by just creating 2 processors 1 Request Processor(Re-write Query/QuesProcessor) and another Response Processor(Saves info in Conversation memory). Users will have a flexibility to say whether they want to store their conversations or not.

Here, we want to think of what the request processor does and what the response processor does as one unit of work in RAG. This can be modeled using a Processor interface with before() and after().

(Just for reference, if you understand how search pipeline works please ignore this)
A Search Pipeline chain the processors and run them in order which they are defined. Also, once a Request Processor is run its never run again on the same Search Request. So these Processors are not like Layers which request pass through before doing search and after doing search. It's more like a split, where you define processors(Request) that runs before running search and then you define Response processors that run once search is done.

@austintlee
Copy link
Contributor

If Search Pipeline States are there

This doesn't exist today, correct? I don't think you want any arbitrary request/response processor to have access to this state. What is being proposed is that this state be scoped to a single processor. Or are you referring to something that is already available to be able to pass state/context between processors?

@navneet1v
Copy link
Contributor

navneet1v commented Aug 16, 2023

@austintlee
Yes Search pipeline states are not there. As part of this Feature @msfroh is working on that.

What is being proposed is that this state be scoped to a single processor

No this is not true. The proposal is state will be scoped at the Search Request Level.

@austintlee
Copy link
Contributor

Thanks for the clarification. I am OK with that. Mainly, I wanted to get clarification that "something new needs to be done for state to be shared between search processors".

@msfroh
Copy link
Collaborator

msfroh commented Aug 16, 2023

So, I started to hack away at an implementation of "approach 2" where we have the state managed outside of the processors (essentially as a field of PipelinedSearchRequest), since I think that's probably easier than making processors stateful (since right now, a processor is a long-lived object owned by a long-lived pipeline object and that pipeline object gets applied to every request).

Now, the approach I'm working on is a Map<String, Map<String, Object>>, where it would map processor_id -> variable_name -> variable_value, where processor_id is essentially processor type + processor tag. The pipeline execution will pass the variable_name -> variable_value map for each processor, so that each processor gets its own isolated state under its own namespace (and can't touch any other processor's state).

On the flip side, if I made just made it a Map<String, Object>, then I think @navneet1v would be right that we wouldn't need a new processor type and could just pass this per-request state to each processor and could define request + response processors designed to work well together. That would be much easier to implement for me (which I like), and might be less confusing than "Here is something that you're adding as a request processor, but it's also a response processor and you can reference it in your response chain."

The catch of that approach is that it relies on users doing the right thing -- e.g. only add the response processor that reads some state if you've added the corresponding request processor that writes the state.

@austintlee
Copy link
Contributor

I like it. It solves my problem :-) and it does not preclude introducing a new processor type later (if that turns out to be a useful thing someone wants).

@msfroh
Copy link
Collaborator

msfroh commented Aug 16, 2023

Okay -- I think I have something that essentially works in #9405.

To avoid breaking backward compatibility with the existing stateless processors, I did some interface hackery to make it easy to say that you're creating a "stateful" request/response processor (though it's ultimately the same interface).

I was previously planning to add an OversamplingBracketProcessor (that would scale up size on the request, and truncate back down to the original size on the response) as a proof of concept. Instead I created an OversamplingRequestProcessor and TruncateHitsResponseProcessor, where the original size gets handed from one to the other via the request context. Check out 60_oversample_truncate.yml for an integration test that verifies that it all works.

As written, you can't specify multiple OversampleRequestProcessors and truncate back to the original size (whereas that would have worked with the bracket processor). Of course, we could fix that by adding a config parameter to the stateful processors -- something like state_prefix, so it would become:

"search_pipeline": {

  "request_processors": [
    {
      "oversample" : {
        "scale_factor": 10,
        "state_prefix": "first_" // Writes original size to first_original_size
      }
    },
    {
      "oversample" : {
        "scale_factor": 5,
        "state_prefix": "second_" // Writes 10x size to second_original_size
      }
    }
  ],
  "response_processors" : [
    {
      "first_pass_reranker" : {
        // Rerank 50x results
      }
    },
    {
      "truncate_hits": {
        "state_prefix": "second_" // Truncates down to 10x original size
      }
    },
    {
      "second_pass_reranker" : {
        // Rerank 10x results
      }
    },
    {
      "truncate_hits": {
        "state_prefix": "first_" // Truncate to original size
      }
    }
  ]
}

@austintlee
Copy link
Contributor

@msfroh what is the next step on that PR? It's still in draft status.

@macohen macohen added v2.11.0 Issues and PRs related to version 2.11.0 and removed v2.10.0 labels Sep 7, 2023
@hdhalter
Copy link

@macohen - Hi Mark, Can you please enter a doc issue if this needs documentation? Also, please confirm that it is going in 2.11. Thanks!

@macohen macohen added v2.12.0 Issues and PRs related to version 2.12.0 and removed v2.11.0 Issues and PRs related to version 2.11.0 labels Oct 4, 2023
@macohen
Copy link
Contributor Author

macohen commented Oct 4, 2023

@hdhalter moving this to a 2.12 release will update the doc issue as well.

@msfroh msfroh changed the title Processors that modify both request and response ("bracket processor") [Search Pipelines] Share state between multiple processors Oct 30, 2023
@reta reta added the v3.0.0 Issues and PRs related to version 3.0.0 label Dec 5, 2023
@reta
Copy link
Collaborator

reta commented Dec 5, 2023

@msfroh please feel free to close it but we need the documentation update as well for this feature

@macohen
Copy link
Contributor Author

macohen commented Dec 11, 2023

@msfroh will this be ready for a 2.12 release? if no, can you change the label, please?

@msfroh
Copy link
Collaborator

msfroh commented Dec 12, 2023

@macohen: It was merged and backported.

I'll write a draft of documentation over at opensearch-project/documentation-website#5151

@macohen
Copy link
Contributor Author

macohen commented Jan 5, 2024

Closing since it was merged and backported already.

@macohen macohen closed this as completed Jan 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Search:Relevance Search Search query, autocomplete ...etc v2.12.0 Issues and PRs related to version 2.12.0 v3.0.0 Issues and PRs related to version 3.0.0
Projects
Status: 2.12.0 (Launched)
Status: Done
Development

No branches or pull requests

6 participants