-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC] Parallel & Batch Ingestion #12457
Comments
@peternied Yes, it looks like the proposed feature 3 in this RFC has very similar idea with the streaming API especially the coordinator part to load balancing the ingest load. For feature 3, it just tries to reuse the Feature 1 and 2 are different from streaming API as they focus on parallel and batch ingestion on a single node which would happen post streaming API or feature 3. |
@dbwiddis, @joshpalis -- you may be interested in this, as you've been thinking about parallel execution for search pipelines. For ingest pipelines, the use-case is a little bit more "natural", because we already do parallel execution of @chishui, can you confirm where exactly the parallel/batch execution would run? A bulk request is received on one node (that serves as coordinator for the request), then the underlying |
@msfroh, the "parallel/batch execution" would be run on the ingest pipeline side. The DocWriteRequests are first processed by ingest pipeline and its processors on a single ingest node, then the processed documents are fanned out to shards to be indexed. To answer your question, the logic would be run on the coordinator. |
Additional information about parallel ingestion: Performance:Light-weighted processors - no improvementWe benchmarked the performance on some light weighted processors (lowercase + append) with current solution and parallelized batch solution, we don't see improvement on either latency or throughput which is aligned with our expectation that they are already very fast and parallelization wouldn't help and could bring some additional overhead. ML processors - already in asyncML processors are the processors doing heavy lifting work, but they actually put the predict logic in a thread (code) which brings the ingestion of that document to async. Reasons to have parallel ingestion
Reasons not to have parallel ingestion
|
@reta , could you also help to take a look at this RFC, thanks! |
Thanks @gaobinlong
@msfroh @model-collapse @chishui I think the idea of enhancing ingest processors with batch processing is sound in general but it may have unintended consequences, due to complexity of
Making the ingestion API streaming based (apologies again for bribing for #3000) is fundamentally a different approach to ingestion - we would be able to vary the ingestion based on how fast the documents could be ingested at this moment of time, without introducing the complexity of batch / parallelism management. @nknize I think you mind be eager to chime in here :) |
Thanks for the comment. For machine learning inference, making use of batched inference API will significantly increase the GPU utilization and reduce the ingestion time. Thus batch is very important thing. You pointed out that "picking the right batch for bulk is very difficult, but adding yet more parallelization / internal batching would make it much harder". Can you elaborate more on that and give your suggestions on how to make ingestion faster? |
@reta thanks for the feedbacks
The proposal only targets the ingest pipeline & its processor part, it won't touch the indexing part. Even documents are processed in a batch manner, these things are still ensured:
Either the action is index or update, upsert or script, they would be processed by ingest pipeline in the same way. I don't see the proposal will cause "changing the ingestion sequence", please let me know if I miss a piece of the puzzle. |
Due to the aforementioned reasons about "parallel ingestion", we won't have immediate gain from delivering the feature, we have decided to deprioritize the “parallel ingestion” part of this RFC and mainly focus on the "batch ingestion". |
@chishui The parallelization (which is mentioned in this proposal) naturally changes the order which documents are being ingested, does it make sense? I think your last comment is the reflection of that, thank you.
@model-collapse the problem with batching (at least how it is implemented currently in OS and what we've seen so far with |
@reta in ingest flow when documents are processed by ingest pipeline, could one document depend on another? Even for today, text_embedding and sparse_encoding processors have their inference logic run in a thread which makes the document ingestion run in parallel, right? https://github.com/opensearch-project/ml-commons/blob/020207ecd6322fed424d5d54c897be74623db103/plugin/src/main/java/org/opensearch/ml/task/MLPredictTaskRunner.java#L194 |
@chishui yes, in general documents could depend on each other (just think about an example of the documents that are ingested out of any CDC or message broker, where the documents are being constructed as a sequence of changes).
This is purely plugin specific logic |
In my understanding, in terms of the execution of pipeline, each document in a bulk runs independently, no ingest processor can access other in-flight documents in the same bulk request, so in the process of executing pipelines, maybe a document cannot depend on another? And subsequently, for the processing of indexing(call lucene api to write), we have the write thread_pool, each document is processed in parallel, so the indexing order in a bulk cannot be guaranteed, the client side needs to ensure the indexing order. @reta, correct me if something is wrong, thank you! |
I think executing pipelines run before the indexing process, firstly, we use a single transport thread to execute pipelines for all the documents in a bulk request, and then use the write thread_pool to process the new generated documents in parallel, so it seems that when executing pipelines for the documents, the execution order doesn't matter. |
Thanks @gaobinlong
The documents could logically depend on each other (I am not referring to any sharing that may happen in ingest processor). Since we are talking about bulk ingestion, where document could be indexed / updated / deleted, we certainly don't want to the deletes to be "visible" before documents are indexed.
This part is not clear to me: AFAIK we offload processing of bulk requests (batches) to thread pool, not individual documents. Could you please point out where we parallelize the ingestion of the individual documents in the batch? Thank you |
Yeah, you're correct, but for this RFC, it only focuses on the execution of ingest pipeline which only performs on the coordinate node, just the pre-processing part, not the indexing part, the indexing operations will not happen before the execution of ingest pipeline completes for all the documents in a bulk request.
After the execution of ingest pipeline for all documents in a bulk, the coordinate code groups these documents by shard and send them to different shards, each shard processes its documents in parallel, so at least in shard level, we process the documents in a bulk request in parallel. But I think this RFC will not touch the processing logic in each shard which processes the create/update/delete operations for the same document in order, so it's not harmful. |
@reta What is your estimation where the circuit breaking will happen? If you mean it will happen in side the batch processor's own process, that could be, because it is impossible to estimate how much memory will be consumed by its code. Therefore, we need to let the users to configure the batch_size in the bulk_api. |
@model-collapse there are no estimates the one could make upfront, this is purely operational issue (basically depends on what is going on at the moment)
Due to previous comment, users have difficulties with that: same batch_size may work now and may not 10m from now (if cluster is under duress). The issue referred there has all the details. |
Benchmark Results on Batch ingestion with Neural Search ProcessorsWe implemented the PoC of batch ingestion locally and enabled the capability of sending batch documents to remote ML servers. We used "opensearch-benchmark" to benchmark both batch enabled and disabled situation on different ML servers (SageMaker, Cohere, OpenAI) and here are the benchmark results Benchmark ResultsEnvironment Setup
SageMakerEnvironment Setup
CohereEnvironment Setup
OpenAIEnvironment Setup
Results
[1]: The errors are coming from SageMaker 4xx response which was also reported in ml-commons issue opensearch-project/ml-commons#2249 |
* [PoC][issues-12457] Support Batch Ingestion Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Rewrite batch interface and handle error and metrics Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Remove unnecessary change Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Revert some unnecessary test change Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Keep executeBulkRequest main logic untouched Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add UT Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add UT & yamlRest test, fix BulkRequest se/deserialization Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add missing java docs Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Remove Writable from BatchIngestionOption Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add more UTs Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Fix spotlesscheck Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Rename parameter name to batch_size Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add more rest yaml tests & update rest spec Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Remove batch_ingestion_option and only use batch_size Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Throw invalid request exception for invalid batch_size Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Update server/src/main/java/org/opensearch/action/bulk/BulkRequest.java Co-authored-by: Andriy Redko <drreta@gmail.com> Signed-off-by: Liyun Xiu <chishui2@gmail.com> * Remove version constant Signed-off-by: Liyun Xiu <xiliyun@amazon.com> --------- Signed-off-by: Liyun Xiu <xiliyun@amazon.com> Signed-off-by: Liyun Xiu <chishui2@gmail.com> Co-authored-by: Andriy Redko <drreta@gmail.com>
…13462) * Support batch ingestion in bulk API (#12457) (#13306) * [PoC][issues-12457] Support Batch Ingestion Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Rewrite batch interface and handle error and metrics Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Remove unnecessary change Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Revert some unnecessary test change Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Keep executeBulkRequest main logic untouched Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add UT Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add UT & yamlRest test, fix BulkRequest se/deserialization Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add missing java docs Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Remove Writable from BatchIngestionOption Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add more UTs Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Fix spotlesscheck Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Rename parameter name to batch_size Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add more rest yaml tests & update rest spec Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Remove batch_ingestion_option and only use batch_size Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Throw invalid request exception for invalid batch_size Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Update server/src/main/java/org/opensearch/action/bulk/BulkRequest.java Co-authored-by: Andriy Redko <drreta@gmail.com> Signed-off-by: Liyun Xiu <chishui2@gmail.com> * Remove version constant Signed-off-by: Liyun Xiu <xiliyun@amazon.com> --------- Signed-off-by: Liyun Xiu <xiliyun@amazon.com> Signed-off-by: Liyun Xiu <chishui2@gmail.com> Co-authored-by: Andriy Redko <drreta@gmail.com> (cherry picked from commit 1219c56) * Adjust changelog item position to trigger CI Signed-off-by: Liyun Xiu <xiliyun@amazon.com> --------- Signed-off-by: Liyun Xiu <xiliyun@amazon.com>
…earch-project#13306) * [PoC][issues-12457] Support Batch Ingestion Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Rewrite batch interface and handle error and metrics Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Remove unnecessary change Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Revert some unnecessary test change Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Keep executeBulkRequest main logic untouched Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add UT Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add UT & yamlRest test, fix BulkRequest se/deserialization Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add missing java docs Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Remove Writable from BatchIngestionOption Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add more UTs Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Fix spotlesscheck Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Rename parameter name to batch_size Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add more rest yaml tests & update rest spec Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Remove batch_ingestion_option and only use batch_size Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Throw invalid request exception for invalid batch_size Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Update server/src/main/java/org/opensearch/action/bulk/BulkRequest.java Co-authored-by: Andriy Redko <drreta@gmail.com> Signed-off-by: Liyun Xiu <chishui2@gmail.com> * Remove version constant Signed-off-by: Liyun Xiu <xiliyun@amazon.com> --------- Signed-off-by: Liyun Xiu <xiliyun@amazon.com> Signed-off-by: Liyun Xiu <chishui2@gmail.com> Co-authored-by: Andriy Redko <drreta@gmail.com>
…earch-project#13306) * [PoC][issues-12457] Support Batch Ingestion Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Rewrite batch interface and handle error and metrics Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Remove unnecessary change Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Revert some unnecessary test change Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Keep executeBulkRequest main logic untouched Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add UT Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add UT & yamlRest test, fix BulkRequest se/deserialization Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add missing java docs Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Remove Writable from BatchIngestionOption Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add more UTs Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Fix spotlesscheck Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Rename parameter name to batch_size Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Add more rest yaml tests & update rest spec Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Remove batch_ingestion_option and only use batch_size Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Throw invalid request exception for invalid batch_size Signed-off-by: Liyun Xiu <xiliyun@amazon.com> * Update server/src/main/java/org/opensearch/action/bulk/BulkRequest.java Co-authored-by: Andriy Redko <drreta@gmail.com> Signed-off-by: Liyun Xiu <chishui2@gmail.com> * Remove version constant Signed-off-by: Liyun Xiu <xiliyun@amazon.com> --------- Signed-off-by: Liyun Xiu <xiliyun@amazon.com> Signed-off-by: Liyun Xiu <chishui2@gmail.com> Co-authored-by: Andriy Redko <drreta@gmail.com>
I know I'm super late here as this has already been implemented and released in 2.14, but I'm questioning the new batch_size parameter in the bulk request. Why did we push this all the way up for the end user to supply in every single request? Is this something that is expected to vary from one request to the next, and do we think the end user is in the best position to configure this value? I don't know all the details, but to me it would seem better for the optimal batch size to be chosen by each ingest processor implementation. It could be a setting that a system admin provides to each processor, or each processor could make an automatic decision based on a variety of factors. Regardless, it seems to me that both the system itself or a system administrator is in a better position to choose this batch size than the end user. Also, the "batch_size" name is quite confusing. The user is already responsible for choosing a batch size, i.e. the number of documents they provide in each bulk request. Now there is a new parameter named "batch_size" that only applies to ingest pipelines, and only actually results in a different behavior if the ingest pipeline they are using happens to support batching. |
To most users they don't need to use this parameter at all. Only users who use remote ML servers to generate embeddings and who are sensitive to ingestion latency and want to optimize the ingestion performance will use it. They may want to tune it to achieve a better performance.
Agreed, that's how we implement in neural search processors. I think as a request parameter, it provides per-request flexibility to give user fine-grain control. I'm not against a system setting as a default value without request parameter. I think they can live together like pipeline setting which is both a bulk request parameter and also a default setting of the index. If user wants to make it a system setting, we can support then. |
So there are potentially three levels of batching? 1) the user determines how many documents to put into the _bulk request, 2) the user decides how those batches are sub-batched and fed into the ingest processors, and 3) the ingest processors decide how to sub-batch the sub-batches that were fed to them On the surface it seems this second level of batching might be overly complex and not necessary.
Do you have a concrete use case where this level of flexibility is needed? |
My previous statement might be inaccurate. The ingest-processors level batch is not always required. Only if batch size matters to their logic they can have it. For example, for So from user's perspective, they don't need to consider all three. They could:
|
If the goal is to find a single optimal value, then a server-side setting is better because you can set it once and you don't have to modify all your ingestion tools to specify a new parameter. You can still benchmark with different values all you want with a server side setting.
If the optimal size depends on the model, then it seems like it should be configured on the server side when you provide whatever configuration is needed to wire up that model? Again, this avoids the need to modify all your ingestion tooling to provide the optimal parameter. |
A request parameter or a system setting, they don't conflict. If users want to have a system setting, I don't see a reason why we shouldn't. |
What I don't understand is the need to do the sub-batching that happens prior to documents being fed into the ingest processors (this is what the request parameter controls). Why is this needed or even desirable? It adds complexity and makes for a bad user experience. Why not just pass the whole batch that came in the _bulk request to each ingest processor, and then let each ingest processor make the decision on how to use those batches? If an ingest processor makes a call to OpenAI and needs to break the batch into sub-batches it can do so (and it must do so even with the current design because there is nothing preventing a user from setting a To be clear, I'm advocating for deprecating and removing the |
We could have a cluster setting or a pipeline setting or a processor setting but that also means the more fine grained control we provide to users, more settings they need to make. And if user wants to modify the settings, we don't even have a pipeline or processor update API.
I don't think it's a bad user experience. Different choices have their pros and cons. Most users won't even need to be aware of this parameter, it's optional. For ingest latency sensitive users who utilize remote ML models for inferencing, they may want to seek out ways to improve their latency. And they would also want to experiment with different values and maybe different values for different types of documents. These are my assumption, but I know the parameter could give their flexibility to experiment. A system setting may also work, but whenever they want to try with a different batch_size, they'd need to make an additional request to update setting, that't even more work comparing to adding a parameter.
Still, users have all kinds of requirements, the parameter and the setting don't conflict, one can be a good supplement to the other, it's not one way door. |
The experience I would like to deliver to these users is that they upgrade to a newer version of OpenSearch and their performance improves because of increased efficiency offered by allowing the inferencing processor to make batched calls to a remote service. In my proposal, this is possible because the batch would be given to the processor, which could have reasonable defaults that often result near-optimal performance. The solution as implemented, with a |
@andrross I am sorry for being late to respond to your concerns
I think there are 2 sides to it: it is difficult to come up with optimal batch size because it depends on the cluster state (#12457 (comment)), more specifically in a busy cluster it could trigger circuit breaker due to heap usage. In these regards, having an option to have per-request batch size could help.
The other side of it is the safe default (which is not necessarily optimal) and I agree that having this option (which works most if not all the time) looks quite beneficial. |
@chishui so we finally looked into code with @andrross and there is certainly an issue with this batch_size: it has no relation to |
I'm going to close this issue, since the initial feature has been implemented and released. I created a follow up issue for the improvements I'm advocating for, and we can continue the discussion there: #14283 If anyone thinks this needs to stay open please let me know. |
Is your feature request related to a problem? Please describe
Problem Statements
Today, users can utilize
bulk
API to ingest multiple documents in a single request. All documents from this request are handled by one ingest node and on this node, if there's any ingest pipeline configured, documents are processed by pipeline one at a time in a sequential order (ref). The ingest pipeline is constituted by a collection of processors and processor is the computing unit of a pipeline. Most of the processors are pretty light weighted such as append, uppercase, lowercase, and to process multiple documents one after another or to process them in parallel would make no observable difference. But for time-consuming processors such as neural search processors, which by their nature, require more time to compute, being able to run them in parallel could save user some valuable ingest time. Apart from ingestion time, processors like neural search, can benefit from processing batch documents together as it can reduce the requests to remote ML services via batch APIs to maximally avoid hitting rate limit restriction. (Feature request: opensearch-project/ml-commons#1840, rate limit example from OpenAI: https://platform.openai.com/docs/guides/rate-limits)Due to the lack of parallel ingestion and batch ingestion capabilities in ingest flow, we propose below solution to address them.
Describe the solution you'd like
Proposed Features
1. Batch Ingestion
An ingest pipeline is constructed by a list of processors and a single document could flow through each processor one by one before it can be stored into index. Currently, both pipeline and processor can only handle one document each time and even if with bulk API, documents are iterated and handled in sequential order. As shown in figure 1, to ingest doc1, it would firstly flow through ingest pipeline 1, then through pipeline 2. Then, the next document would go through both pipeline.
To support batch processing of documents, we'll add a
batchExecute
API in ingest pipeline and processors which take multiple documents as input parameters. We will provide a default implementation inProcessor
interface to iteratively call existingexecute
API to process document one by one so that most of the processors don't need to make change and only if there's necessity for them to batch process documents (e.g. text embedding processor), they can have their own implementation, otherwise, even receiving documents altogether, they default to process them one by one.To batch process documents, user need to use
bulk
API. We'll add two optional parameters forbulk
API for user to enable batch feature and set batch size. Based onmaximum_batch_size
value, documents are split into batches.Since in bulk API, different documents could be ingested to different indexes, indexes could use the same pipelines but in different order, e.g. index “movies” uses pipeline P1 as default pipeline, P2 as final pipeline; index “musics” uses P2 as default pipeline and P1 as final pipeline. To avoid over-complexity of handling cross indexes batching (topology sorting), we would batch documents in index level.
2. Parallel Ingestion
Apart from batch ingestion, we also propose to have parallel ingestion to accompany with batch ingestion to boost the ingestion performance. When user enables parallel ingestion, based on batch size, documents from bulk API will be split into batches, then, batches are processed in parallel with threads managed by thread pool. Although limiting the maximum concurrency of parallel ingestion, thread pool can help us protect host resources to not be exhausted by batch ingestion threads.
Ingest flow logic change
Current logic of the ingestion flow of documents can be shown from the pseudo code below:
We'll change the flow to logic shown below if the pipeline has enable the batch option.
Update to Bulk API
We propose new parameters to
bulk
API, all of them are optional.none
,enable
andparallel
. By default, it'snone
. When set it toenable
, batch ingestion is enabled, and batches are processed in sequential order. When set it toparallel
, batch ingestion is enabled and batches are processes in parallel.enable
orparallel
. It's 1 by default.3. Split and Redistribute Bulk API
Users tend to use
bulk
API to ingest many documents which can be very time consuming sometimes. In order to achieve lower ingestion time, they have to use multiple clients to make multiplebulk
requests with smaller document size so that the requests can be distributed to different ingest nodes. To offload the burden from user side, we can support the split and redistribute work from server side and help distribute the ingest load more evenly.Note: although brought up here, we think it's better to discuss this topic in a separate RFC doc which will be published later.
Related component
Indexing:Performance
Describe alternatives you've considered
No response
Additional context
No response
The text was updated successfully, but these errors were encountered: