Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Make Placement Group Wildcard and Indexed Resource Assignments Consistent #48088

Merged
merged 18 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/ray/common/bundle_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ std::string BundleSpecification::DebugString() const {
}

std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const PlacementGroupID &group_id,
const std::string &group_id_hex,
int64_t bundle_index) {
std::stringstream os;
if (bundle_index >= 0) {
os << original_resource_name << kGroupKeyword << std::to_string(bundle_index) << "_"
<< group_id.Hex();
<< group_id_hex;
} else {
RAY_CHECK(bundle_index == -1) << "Invalid index " << bundle_index;
os << original_resource_name << kGroupKeyword << group_id.Hex();
os << original_resource_name << kGroupKeyword << group_id_hex;
}
std::string result = os.str();
RAY_DCHECK(GetOriginalResourceName(result) == original_resource_name)
Expand All @@ -112,6 +112,13 @@ std::string FormatPlacementGroupResource(const std::string &original_resource_na
return result;
}

std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const PlacementGroupID &group_id,
int64_t bundle_index) {
return FormatPlacementGroupResource(
original_resource_name, group_id.Hex(), bundle_index);
}

std::string GetOriginalResourceName(const std::string &resource) {
auto data = ParsePgFormattedResource(
resource, /*for_wildcard_resource*/ true, /*for_indexed_resource*/ true);
Expand Down Expand Up @@ -146,6 +153,7 @@ std::optional<PgFormattedResourceData> ParsePgFormattedResource(
match_groups.size() == 3) {
data.original_resource = match_groups[1].str();
data.bundle_index = -1;
data.group_id = match_groups[2].str();
return data;
}
}
Expand All @@ -157,6 +165,7 @@ std::optional<PgFormattedResourceData> ParsePgFormattedResource(
match_groups.size() == 4) {
data.original_resource = match_groups[1].str();
data.bundle_index = stoi(match_groups[2].str());
data.group_id = match_groups[3].str();
return data;
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/ray/common/bundle_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,22 @@ struct PgFormattedResourceData {
std::string original_resource;
/// -1 if it is a wildcard resource.
int64_t bundle_index;
std::string group_id;
};

/// Format a placement group resource with provided parameters.
///
/// \param original_resource_name The original resource name of the pg resource.
/// \param group_id_str The group id in string format.
/// \param bundle_index The bundle index. If -1, generate the wildcard pg resource.
/// E.g., [original_resource_name]_group_[group_id_str].
/// If >=0, generate the indexed pg resource. E.g.,
/// [original_resource_name]_group_[bundle_index]_[group_id_str]
/// \return The corresponding formatted placement group resource string.
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const std::string &group_id_hex,
int64_t bundle_index = -1);

/// Format a placement group resource, e.g., CPU -> CPU_group_i
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const PlacementGroupID &group_id,
Expand Down
212 changes: 202 additions & 10 deletions src/ray/common/scheduling/resource_instance_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#include <cmath>
#include <sstream>
#include <utility>

#include "ray/common/bundle_spec.h"
#include "ray/util/logging.h"

namespace ray {
Expand All @@ -43,6 +45,29 @@ bool NodeResourceInstanceSet::Has(ResourceID resource_id) const {

void NodeResourceInstanceSet::Remove(ResourceID resource_id) {
resources_.erase(resource_id);

// Remove from the pg_indexed_resources_ as well
auto data = ParsePgFormattedResource(resource_id.Binary(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this will add significant overhead to the scheduler (because parsing string is so much more expensive). is there any way to do this only when we know resource id is pg?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. And I checked the code. The Remove function is only called in the DeleteLocalResource function in LocalResourceManager when returning the pg bundle. The overhead added shouldn't impact the schedule of the task.

At the same time, the concerns on the Set and TryAllocate are valid. One potential idea I can think of is adding a boolean in the ResourceID to indicate whether it is a pg or not. Will further explore the idea.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the overhead will be smaller if PG resources starts with a prefix so we can check using startswith instead of regex.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense!

At same time, I think the downside is: we need to make sure any custom resources won't start with the same prefix and we still need to do the parsing for all the placement group allocation.

Also, as the release didn't show noticeable performance regression, the code can be checked in as it is. I'll add the idea to the TODO for the future improvement.

/*for_wildcard_resource=*/false,
/*for_indexed_resource=*/true);
if (data) {
ResourceID original_resource_id(data->original_resource);

auto pg_resource_map_it = pg_indexed_resources_.find(original_resource_id);
if (pg_resource_map_it != pg_indexed_resources_.end()) {
auto resource_set_it = pg_resource_map_it->second.find(data->group_id);

if (resource_set_it != pg_resource_map_it->second.end()) {
resource_set_it->second.erase(resource_id);
if (resource_set_it->second.empty()) {
pg_resource_map_it->second.erase(data->group_id);
}
if (pg_resource_map_it->second.empty()) {
pg_indexed_resources_.erase(original_resource_id);
}
}
}
}
}

const std::vector<FixedPoint> &NodeResourceInstanceSet::Get(
Expand All @@ -69,6 +94,24 @@ NodeResourceInstanceSet &NodeResourceInstanceSet::Set(ResourceID resource_id,
resources_.erase(resource_id);
} else {
resources_[resource_id] = std::move(instances);

// Popluate the pg_indexed_resources_map_
// TODO (myan): The parsing of the resource_id String can be costly and impact the
// task creation throughput if the parting is required every time we allocate
// resources for a task and updating the available resources. The current benchmark
// shows no observable impact for now. But in the furture, ideas of improvement are:
// (1) to add the placement group id as well as the bundle index inside the
// ResourceID class. And instead of parse the String, leveraging the fields in the
// ResourceID class directly; (2) to update the pg resource id format to start with
// a special prefix so that we can do "startwith" instead of regex match which is
// less costly
auto data = ParsePgFormattedResource(resource_id.Binary(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above

/*for_wildcard_resource=*/false,
/*for_indexed_resource=*/true);
if (data) {
pg_indexed_resources_[ResourceID(data->original_resource)][data->group_id].emplace(
resource_id);
}
}
return *this;
}
Expand All @@ -93,21 +136,156 @@ bool NodeResourceInstanceSet::operator==(const NodeResourceInstanceSet &other) c
std::optional<absl::flat_hash_map<ResourceID, std::vector<FixedPoint>>>
NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) {
absl::flat_hash_map<ResourceID, std::vector<FixedPoint>> allocations;

// During resource allocation with a placement group, no matter whether the allocation
// requirement specifies a bundle index, we generate the allocation on both
// the wildcard resource and the indexed resource. The resource_demand shouldn't be
// assigned across bundles. And If no bundle index is specified, we will iterate
// through the bundles and find the first bundle that can fit the required resources.
// In addition, for unit resources, we make sure that the allocation on the
// wildcard resource and the indexed resource are consistent, meaning the same
// instance ids should be allocated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a simple example in this comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also can you update .h file and add this explanation?

//
// For example, considering the GPU resource on a host. Assuming the host has 3 GPUs
// and 1 placement group with 2 bundles. The bundle with index 1 contains 1 GPU and
// the bundle with index 2 contains 2 GPU.
//
// The current node resource can be as follows:
// resource id: total, available
// GPU: [1, 1, 1], [0, 0, 0]
// GPU_<pg_id>: [1, 1, 1], [1, 1, 1]
// GPU_1_<pg_id>: [1, 0, 0], [1, 0, 0]
// GPU_2_<pg_id>: [0, 1, 1], [0, 1, 1]
//
// Now, we want to allocate a task with 2 GPUs and in the placement group <pg_id>,
// reflecting in the following resource demand:
// GPU_<pg_id> : 2
//
// We will iterate though all the bundles in the placement group and bundle with
// index=2 has the required capacity. So we will allocate the task to the 2 GPUs in
// bundle 2 in placement group <pg_id> and the same allocation should be reflected in
// the wildcard GPU resource. So the allocation will be:
// GPU_<pg_id> : [0, 1, 1]
// GPU_2_<pg_id> : [0, 1, 1]
//
// And as a result, after the allocation, current node resource will be:
// resource id: total, available
// GPU: [1, 1, 1], [0, 0, 0]
// GPU_<pg_id>: [1, 1, 1], [1, 0, 0]
// GPU_1_<pg_id>: [1, 0, 0], [1, 0, 0]
// GPU_2_<pg_id>: [0, 1, 1], [0, 0, 0]

absl::flat_hash_map</* original resource id (GPU, CPU, etc.) */ ResourceID,
std::vector<std::pair</* actual resource id */ ResourceID,
PgFormattedResourceData>>>
pg_resource_map;

for (const auto &[resource_id, demand] : resource_demands.Resources()) {
auto allocation = TryAllocate(resource_id, demand);
if (allocation) {
// Even if allocation failed we need to remember partial allocations to correctly
// free resources.
allocations[resource_id] = std::move(*allocation);
auto data = ParsePgFormattedResource(resource_id.Binary(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same concern as overhead!

/*for_wildcard_resource*/ true,
/*for_indexed_resource*/ true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this supposed to be false?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to find all pg resources including both wildcard resources and indexed resources.

And my understanding is that we need to set both "for_wildcard_resource" and "for_indexed_resource" to be true to find the resource ids for both.

So here I think we should still set it to be true.


if (data) {
// Aggregate based on resource type
ResourceID original_resource_id{data->original_resource};
pg_resource_map[original_resource_id].push_back(
std::make_pair(resource_id, data.value()));
} else {
// Allocation failed. Restore partially allocated resources.
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
// Directly allocate the resources if the resource is not with a placement group
auto allocation = TryAllocate(resource_id, demand);
if (allocation) {
// Even if allocation failed we need to remember partial allocations to
// correctly free resources.
allocations[resource_id] = std::move(*allocation);
} else {
// Allocation failed. Restore partially allocated resources.
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
}
return std::nullopt;
}
return std::nullopt;
}
}

// Handle the resource allocation for resources with placement group
for (const auto &[original_resource_id, resource_id_vector] : pg_resource_map) {
// Assuming exactly 1 placement group and at most 1 bundle index can be specified in
// the resource requirement for a single resource type
// Also assuming the wildcard resource id will always exist in the resource_demands
// no matter how the resource requirement is specified in task
std::optional<std::vector<FixedPoint>> wildcard_allocation;
const ResourceID *wildcard_resource_id = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why using a pointer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a pointer mainly to avoid copy of the wildcard resource id. But please let me know a better pattern for the purpose.

const std::string *pg_id = nullptr;

// Allocate indexed resource
if (resource_id_vector.size() == 1) {
// The case where no bundle index is specified
// Iterate through the bundles with the same original resource and pg_id to find
// the first one with enough space
bool found = false;
wildcard_resource_id = &resource_id_vector[0].first;
pg_id = &resource_id_vector[0].second.group_id;

auto pg_index_resources_it = pg_indexed_resources_.find(original_resource_id);
if (pg_index_resources_it != pg_indexed_resources_.end()) {
auto index_resources_it = pg_index_resources_it->second.find(*pg_id);
if (index_resources_it != pg_index_resources_it->second.end()) {
for (ResourceID indexed_resource_id : index_resources_it->second) {
if (Has(indexed_resource_id)) {
wildcard_allocation = TryAllocate(
indexed_resource_id, resource_demands.Get(*wildcard_resource_id));
if (wildcard_allocation) {
// Found the allocation in a bundle
allocations[indexed_resource_id] = *wildcard_allocation;
found = true;
break;
}
}
}
}
}

if (!found) {
// No bundle can fit the required resources, allocation failed
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
}
return std::nullopt;
}
} else {
// The case where the bundle index is specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there no case where the demands only include indexed resources, but not wildcard resources? I assume no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(feel like the logic assumes this to be correct. maybe add it to the docstring?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, here:

// Note that all nodes that have placement group have a special
it will always include the wildcard resource.

At the same time, I'll add the assumption to the comment as well.

// For each resource type, both the wildcard resource and the indexed resource
// should be in the resource_demand
for (const std::pair<ResourceID, PgFormattedResourceData> &pair :
resource_id_vector) {
if (pair.second.bundle_index != -1) {
// This is the indexed resource
wildcard_allocation = TryAllocate(pair.first, resource_demands.Get(pair.first));

if (wildcard_allocation) {
allocations[pair.first] = *wildcard_allocation;
} else {
// The corresponding bundle cannot hold the required resources.
// Allocation failed
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
}
return std::nullopt;
}
} else {
// This is the wildcard resource
wildcard_resource_id = &pair.first;
}
}
}

// Allocate wildcard resource, should be consistent with the indexed resource
RAY_CHECK(wildcard_resource_id != nullptr);
RAY_CHECK(!(*wildcard_allocation).empty());
AllocateWithReference(*wildcard_allocation, *wildcard_resource_id);
allocations[*wildcard_resource_id] = std::move(*wildcard_allocation);
}

return std::make_optional<absl::flat_hash_map<ResourceID, std::vector<FixedPoint>>>(
std::move(allocations));
}
Expand Down Expand Up @@ -139,7 +317,7 @@ std::optional<std::vector<FixedPoint>> NodeResourceInstanceSet::TryAllocate(
//
// As long as remaining_demand is greater than 1.,
// allocate full unit-capacity instances until the remaining_demand becomes fractional.
// Then try to find the best fit for the fractional remaining_resources. Best fist means
// Then try to find the best fit for the fractional remaining_resources. Best fit means
// allocating the resource instance with the smallest available capacity greater than
// remaining_demand
if (remaining_demand >= 1.) {
Expand Down Expand Up @@ -186,6 +364,20 @@ std::optional<std::vector<FixedPoint>> NodeResourceInstanceSet::TryAllocate(
return std::make_optional<std::vector<FixedPoint>>(std::move(allocation));
}

void NodeResourceInstanceSet::AllocateWithReference(
const std::vector<FixedPoint> &ref_allocation, ResourceID resource_id) {
std::vector<FixedPoint> available = Get(resource_id);
RAY_CHECK(!available.empty());
RAY_CHECK_EQ(available.size(), ref_allocation.size());

for (size_t i = 0; i < ref_allocation.size(); i++) {
RAY_CHECK_GE(available[i], ref_allocation[i]);
available[i] -= ref_allocation[i];
}

Set(resource_id, std::move(available));
}

void NodeResourceInstanceSet::Free(ResourceID resource_id,
const std::vector<FixedPoint> &allocation) {
std::vector<FixedPoint> available = Get(resource_id);
Expand Down
Loading