-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core] Make Placement Group Wildcard and Indexed Resource Assignments Consistent #48088
Changes from 14 commits
e62696c
b266809
02d7e68
20fc4e9
1bc549a
6b319ae
e9ced9f
7b7837d
795ea6c
8bd2969
bc16850
92092d1
a7813ab
e0c0b15
4377f8b
af50264
444423a
b6571cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -97,8 +97,22 @@ struct PgFormattedResourceData { | |
std::string original_resource; | ||
/// -1 if it is a wildcard resource. | ||
int64_t bundle_index; | ||
std::string group_id; | ||
}; | ||
|
||
/// Format a placement group resource with provided parameters. | ||
/// | ||
/// \param original_resource_name The original resource name of the pg resource. | ||
/// \param group_id_str The group id in string format. | ||
/// \param bundle_index The bundle index. If -1, generate the wildcard pg resource. | ||
/// E.g., [original_resource_name]_group_[group_id_str]. | ||
/// If >=0, generate the indexed pg resource. E.g., | ||
/// [original_resource_name]_group_[bundle_index]_[group_id_str] | ||
/// \return The corresponding formatted placement group resource string. | ||
std::string FormatPlacementGroupResource(const std::string &original_resource_name, | ||
const std::string &group_id_str, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. group_id_hex? |
||
int64_t bundle_index = -1); | ||
|
||
/// Format a placement group resource, e.g., CPU -> CPU_group_i | ||
std::string FormatPlacementGroupResource(const std::string &original_resource_name, | ||
const PlacementGroupID &group_id, | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -16,7 +16,9 @@ | |||
|
||||
#include <cmath> | ||||
#include <sstream> | ||||
#include <utility> | ||||
|
||||
#include "ray/common/bundle_spec.h" | ||||
#include "ray/util/logging.h" | ||||
|
||||
namespace ray { | ||||
|
@@ -43,6 +45,29 @@ bool NodeResourceInstanceSet::Has(ResourceID resource_id) const { | |||
|
||||
void NodeResourceInstanceSet::Remove(ResourceID resource_id) { | ||||
resources_.erase(resource_id); | ||||
|
||||
// Remove from the pg_indexed_resources_ as well | ||||
auto data = ParsePgFormattedResource(resource_id.Binary(), | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suspect this will add significant overhead to the scheduler (because parsing string is so much more expensive). is there any way to do this only when we know resource id is pg? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed offline. And I checked the code. The At the same time, the concerns on the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the overhead will be smaller if PG resources starts with a prefix so we can check using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense! At same time, I think the downside is: we need to make sure any custom resources won't start with the same prefix and we still need to do the parsing for all the placement group allocation. Also, as the release didn't show noticeable performance regression, the code can be checked in as it is. I'll add the idea to the TODO for the future improvement. |
||||
/*for_wildcard_resource=*/false, | ||||
/*for_indexed_resource=*/true); | ||||
if (data) { | ||||
ResourceID original_resource_id(data->original_resource); | ||||
|
||||
auto pg_resource_map_it = pg_indexed_resources_.find(original_resource_id); | ||||
if (pg_resource_map_it != pg_indexed_resources_.end()) { | ||||
auto resource_set_it = pg_resource_map_it->second.find(data->group_id); | ||||
|
||||
if (resource_set_it != pg_resource_map_it->second.end()) { | ||||
resource_set_it->second.erase(resource_id); | ||||
if (resource_set_it->second.empty()) { | ||||
pg_resource_map_it->second.erase(data->group_id); | ||||
} | ||||
if (pg_resource_map_it->second.empty()) { | ||||
pg_indexed_resources_.erase(original_resource_id); | ||||
} | ||||
} | ||||
} | ||||
} | ||||
} | ||||
|
||||
const std::vector<FixedPoint> &NodeResourceInstanceSet::Get( | ||||
|
@@ -69,6 +94,22 @@ NodeResourceInstanceSet &NodeResourceInstanceSet::Set(ResourceID resource_id, | |||
resources_.erase(resource_id); | ||||
} else { | ||||
resources_[resource_id] = std::move(instances); | ||||
|
||||
// Popluate the pg_indexed_resources_map_ | ||||
// TODO: The parsing of the resource_id String can be costly and impact the task | ||||
// creation throughput if the parting is required every time we allocate resources | ||||
// for a task and updating the available resources. The current benchmark shows no | ||||
// observable impact for now but in the furture, an idea of improvement is to add | ||||
// the placement group id as well as the bundle index inside the ResourceID class. | ||||
// And instead of parse the String, leveraging the fields in the ResourceID class | ||||
// directly. | ||||
auto data = ParsePgFormattedResource(resource_id.Binary(), | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment as above |
||||
/*for_wildcard_resource=*/false, | ||||
/*for_indexed_resource=*/true); | ||||
if (data) { | ||||
pg_indexed_resources_[ResourceID(data->original_resource)][data->group_id].emplace( | ||||
resource_id); | ||||
} | ||||
} | ||||
return *this; | ||||
} | ||||
|
@@ -93,21 +134,158 @@ bool NodeResourceInstanceSet::operator==(const NodeResourceInstanceSet &other) c | |||
std::optional<absl::flat_hash_map<ResourceID, std::vector<FixedPoint>>> | ||||
NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { | ||||
absl::flat_hash_map<ResourceID, std::vector<FixedPoint>> allocations; | ||||
|
||||
// During resource allocation with a placement group, no matter whether the allocation | ||||
// requirement specifies a bundle index, we generate the allocation on both | ||||
// the wildcard resource and the indexed resource. The resource_demand shouldn't be | ||||
// assigned across bundles. And If no bundle index is specified, we will iterate | ||||
// through the bundles and find the first bundle that can fit the required resources. | ||||
// In addition, for unit resources, we make sure that the allocation on the | ||||
// wildcard resource and the indexed resource are consistent, meaning the same | ||||
// instance ids should be allocated. | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add a simple example in this comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also can you update .h file and add this explanation? |
||||
// | ||||
// For example, considering the GPU resource on a host. Assuming the host has 3 GPUs | ||||
// and 1 placement group with 2 bundles. The bundle with index 1 contains 1 GPU and | ||||
// the bundle with index 2 contains 2 GPU. | ||||
// | ||||
// The current node resource can be as follows: | ||||
// resource id: total, available | ||||
// GPU: [1, 1, 1], [0, 0, 0] | ||||
// GPU_<pg_id>: [1, 1, 1], [1, 1, 1] | ||||
// GPU_1_<pg_id>: [1, 0, 0], [1, 0, 0] | ||||
// GPU_2_<pg_id>: [0, 1, 1], [0, 1, 1] | ||||
// | ||||
// Now, we want to allocate a task with 2 GPUs and in the placement group <pg_id>, | ||||
// reflecting in the following resource demand: | ||||
// GPU_<pg_id> : 2 | ||||
// | ||||
// We will iterate though all the bundles in the placement group and bundle with | ||||
// index=2 has the required capacity. So we will allocate the task to the 2 GPUs in | ||||
// bundle 2 in placement group <pg_id> and the same allocation should be reflected in | ||||
// the wildcard GPU resource. So the allocation will be: | ||||
// GPU_<pg_id> : [0, 1, 1] | ||||
// GPU_2_<pg_id> : [0, 1, 1] | ||||
// | ||||
// And as a result, after the allocation, current node resource will be: | ||||
// resource id: total, available | ||||
// GPU: [1, 1, 1], [0, 0, 0] | ||||
// GPU_<pg_id>: [1, 1, 1], [1, 0, 0] | ||||
// GPU_1_<pg_id>: [1, 0, 0], [1, 0, 0] | ||||
// GPU_2_<pg_id>: [0, 1, 1], [0, 0, 0] | ||||
|
||||
absl::flat_hash_map</* original resource id (GPU, CPU, etc.) */ ResourceID, | ||||
std::vector<std::pair</* actual resource id */ ResourceID, | ||||
PgFormattedResourceData>>> | ||||
pg_resource_map; | ||||
|
||||
for (const auto &[resource_id, demand] : resource_demands.Resources()) { | ||||
auto allocation = TryAllocate(resource_id, demand); | ||||
if (allocation) { | ||||
// Even if allocation failed we need to remember partial allocations to correctly | ||||
// free resources. | ||||
allocations[resource_id] = std::move(*allocation); | ||||
auto data = ParsePgFormattedResource(resource_id.Binary(), | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same concern as overhead! |
||||
/*for_wildcard_resource*/ true, | ||||
/*for_indexed_resource*/ true); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't this supposed to be false? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea here is to find all pg resources including both wildcard resources and indexed resources. And my understanding is that we need to set both "for_wildcard_resource" and "for_indexed_resource" to be So here I think we should still set it to be |
||||
|
||||
if (data) { | ||||
// Aggregate based on resource type | ||||
ResourceID original_resource_id{data->original_resource}; | ||||
pg_resource_map[original_resource_id].push_back( | ||||
std::make_pair(resource_id, data.value())); | ||||
} else { | ||||
// Allocation failed. Restore partially allocated resources. | ||||
for (const auto &[resource_id, allocation] : allocations) { | ||||
Free(resource_id, allocation); | ||||
// Directly allocate the resources if the resource is not with a placement group | ||||
auto allocation = TryAllocate(resource_id, demand); | ||||
if (allocation) { | ||||
// Even if allocation failed we need to remember partial allocations to | ||||
// correctly free resources. | ||||
allocations[resource_id] = std::move(*allocation); | ||||
} else { | ||||
// Allocation failed. Restore partially allocated resources. | ||||
for (const auto &[resource_id, allocation] : allocations) { | ||||
Free(resource_id, allocation); | ||||
} | ||||
return std::nullopt; | ||||
} | ||||
return std::nullopt; | ||||
} | ||||
} | ||||
|
||||
// Handle the resource allocation for resources with placement group | ||||
for (const auto &[original_resource_id, resource_id_vector] : pg_resource_map) { | ||||
// Assuming exactly 1 placement group and at most 1 bundle index can be specified in | ||||
// the resource requirement for a single resource type | ||||
// Also assuming the wildcard resource id will always exist in the resource_demands | ||||
// no matter how the resource requirement is specified in task | ||||
std::optional<std::vector<FixedPoint>> wildcard_allocation; | ||||
const ResourceID *wildcard_resource_id = nullptr; | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why using a pointer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I used a pointer mainly to avoid copy of the wildcard resource id. But please let me know a better pattern for the purpose. |
||||
const std::string *pg_id = nullptr; | ||||
|
||||
// Allocate indexed resource | ||||
if (resource_id_vector.size() == 1) { | ||||
// The case where no bundle index is specified | ||||
// Iterate through the bundles with the same original resource and pg_id to find | ||||
// the first one with enough space | ||||
bool found = false; | ||||
wildcard_resource_id = &resource_id_vector[0].first; | ||||
pg_id = &resource_id_vector[0].second.group_id; | ||||
|
||||
auto pg_index_resources_it = pg_indexed_resources_.find(original_resource_id); | ||||
if (pg_index_resources_it != pg_indexed_resources_.end()) { | ||||
auto index_resources_it = pg_index_resources_it->second.find(*pg_id); | ||||
if (index_resources_it != pg_index_resources_it->second.end()) { | ||||
for (ResourceID indexed_resource_id : index_resources_it->second) { | ||||
if (Has(indexed_resource_id)) { | ||||
auto allocation = TryAllocate( | ||||
indexed_resource_id, resource_demands.Get(resource_id_vector[0].first)); | ||||
|
||||
if (allocation) { | ||||
// Found the allocation in a bundle | ||||
wildcard_allocation = *allocation; | ||||
allocations[indexed_resource_id] = std::move(*allocation); | ||||
found = true; | ||||
break; | ||||
} | ||||
} | ||||
} | ||||
} | ||||
} | ||||
|
||||
if (!found) { | ||||
// No bundle can fit the required resources, allocation failed | ||||
for (const auto &[resource_id, allocation] : allocations) { | ||||
Free(resource_id, allocation); | ||||
} | ||||
return std::nullopt; | ||||
} | ||||
} else { | ||||
// The case where the bundle index is specified | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there no case where the demands only include indexed resources, but not wildcard resources? I assume no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (feel like the logic assumes this to be correct. maybe add it to the docstring?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, here: ray/src/ray/common/bundle_spec.cc Line 199 in d80b22f
At the same time, I'll add the assumption to the comment as well. |
||||
// For each resource type, both the wildcard resource and the indexed resource | ||||
// should be in the resource_demand | ||||
for (const std::pair<ResourceID, PgFormattedResourceData> &pair : | ||||
resource_id_vector) { | ||||
if (pair.second.bundle_index != -1) { | ||||
// This is the indexed resource | ||||
wildcard_allocation = TryAllocate(pair.first, resource_demands.Get(pair.first)); | ||||
|
||||
if (wildcard_allocation) { | ||||
allocations[pair.first] = *wildcard_allocation; | ||||
} else { | ||||
// The corresponding bundle cannot hold the required resources. | ||||
// Allocation failed | ||||
for (const auto &[resource_id, allocation] : allocations) { | ||||
Free(resource_id, allocation); | ||||
} | ||||
return std::nullopt; | ||||
} | ||||
} else { | ||||
// This is the wildcard resource | ||||
wildcard_resource_id = &pair.first; | ||||
} | ||||
} | ||||
} | ||||
|
||||
// Allocate wildcard resource, should be consistent with the indexed resource | ||||
RAY_CHECK(wildcard_resource_id != nullptr); | ||||
RAY_CHECK(!(*wildcard_allocation).empty()); | ||||
AllocateWithReference(*wildcard_allocation, *wildcard_resource_id); | ||||
allocations[*wildcard_resource_id] = std::move(*wildcard_allocation); | ||||
} | ||||
|
||||
return std::make_optional<absl::flat_hash_map<ResourceID, std::vector<FixedPoint>>>( | ||||
std::move(allocations)); | ||||
} | ||||
|
@@ -139,7 +317,7 @@ std::optional<std::vector<FixedPoint>> NodeResourceInstanceSet::TryAllocate( | |||
// | ||||
// As long as remaining_demand is greater than 1., | ||||
// allocate full unit-capacity instances until the remaining_demand becomes fractional. | ||||
// Then try to find the best fit for the fractional remaining_resources. Best fist means | ||||
// Then try to find the best fit for the fractional remaining_resources. Best fit means | ||||
// allocating the resource instance with the smallest available capacity greater than | ||||
// remaining_demand | ||||
if (remaining_demand >= 1.) { | ||||
|
@@ -186,6 +364,20 @@ std::optional<std::vector<FixedPoint>> NodeResourceInstanceSet::TryAllocate( | |||
return std::make_optional<std::vector<FixedPoint>>(std::move(allocation)); | ||||
} | ||||
|
||||
void NodeResourceInstanceSet::AllocateWithReference( | ||||
const std::vector<FixedPoint> &ref_allocation, ResourceID resource_id) { | ||||
std::vector<FixedPoint> available = Get(resource_id); | ||||
RAY_CHECK(!available.empty()); | ||||
RAY_CHECK_EQ(available.size(), ref_allocation.size()); | ||||
|
||||
for (size_t i = 0; i < ref_allocation.size(); i++) { | ||||
RAY_CHECK_GE(available[i], ref_allocation[i]); | ||||
available[i] -= ref_allocation[i]; | ||||
} | ||||
|
||||
Set(resource_id, std::move(available)); | ||||
} | ||||
|
||||
void NodeResourceInstanceSet::Free(ResourceID resource_id, | ||||
const std::vector<FixedPoint> &allocation) { | ||||
std::vector<FixedPoint> available = Get(resource_id); | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
group_id_hex?