Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Make Placement Group Wildcard and Indexed Resource Assignments Consistent #48088

Merged
merged 18 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/ray/common/bundle_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ std::string BundleSpecification::DebugString() const {
}

std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const PlacementGroupID &group_id,
const std::string &group_id_str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group_id_hex?

int64_t bundle_index) {
std::stringstream os;
if (bundle_index >= 0) {
os << original_resource_name << kGroupKeyword << std::to_string(bundle_index) << "_"
<< group_id.Hex();
<< group_id_str;
} else {
RAY_CHECK(bundle_index == -1) << "Invalid index " << bundle_index;
os << original_resource_name << kGroupKeyword << group_id.Hex();
os << original_resource_name << kGroupKeyword << group_id_str;
}
std::string result = os.str();
RAY_DCHECK(GetOriginalResourceName(result) == original_resource_name)
Expand All @@ -112,6 +112,13 @@ std::string FormatPlacementGroupResource(const std::string &original_resource_na
return result;
}

std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const PlacementGroupID &group_id,
int64_t bundle_index) {
return FormatPlacementGroupResource(
original_resource_name, group_id.Hex(), bundle_index);
}

std::string GetOriginalResourceName(const std::string &resource) {
auto data = ParsePgFormattedResource(
resource, /*for_wildcard_resource*/ true, /*for_indexed_resource*/ true);
Expand Down Expand Up @@ -146,6 +153,7 @@ std::optional<PgFormattedResourceData> ParsePgFormattedResource(
match_groups.size() == 3) {
data.original_resource = match_groups[1].str();
data.bundle_index = -1;
data.group_id = match_groups[2].str();
return data;
}
}
Expand All @@ -157,6 +165,7 @@ std::optional<PgFormattedResourceData> ParsePgFormattedResource(
match_groups.size() == 4) {
data.original_resource = match_groups[1].str();
data.bundle_index = stoi(match_groups[2].str());
data.group_id = match_groups[3].str();
return data;
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/ray/common/bundle_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,22 @@ struct PgFormattedResourceData {
std::string original_resource;
/// -1 if it is a wildcard resource.
int64_t bundle_index;
std::string group_id;
};

/// Format a placement group resource with provided parameters.
///
/// \param original_resource_name The original resource name of the pg resource.
/// \param group_id_str The group id in string format.
/// \param bundle_index The bundle index. If -1, generate the wildcard pg resource.
/// E.g., [original_resource_name]_group_[group_id_str].
/// If >=0, generate the indexed pg resource. E.g.,
/// [original_resource_name]_group_[bundle_index]_[group_id_str]
/// \return The corresponding formatted placement group resource string.
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const std::string &group_id_str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group_id_hex?

int64_t bundle_index = -1);

/// Format a placement group resource, e.g., CPU -> CPU_group_i
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const PlacementGroupID &group_id,
Expand Down
185 changes: 173 additions & 12 deletions src/ray/common/scheduling/resource_instance_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#include <cmath>
#include <sstream>
#include <utility>

#include "ray/common/bundle_spec.h"
#include "ray/util/logging.h"

namespace ray {
Expand All @@ -43,6 +45,21 @@ bool NodeResourceInstanceSet::Has(ResourceID resource_id) const {

void NodeResourceInstanceSet::Remove(ResourceID resource_id) {
resources_.erase(resource_id);

// Remove from the pg_indexed_resources_ as well
auto data = ParsePgFormattedResource(resource_id.Binary(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this will add significant overhead to the scheduler (because parsing string is so much more expensive). is there any way to do this only when we know resource id is pg?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. And I checked the code. The Remove function is only called in the DeleteLocalResource function in LocalResourceManager when returning the pg bundle. The overhead added shouldn't impact the schedule of the task.

At the same time, the concerns on the Set and TryAllocate are valid. One potential idea I can think of is adding a boolean in the ResourceID to indicate whether it is a pg or not. Will further explore the idea.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the overhead will be smaller if PG resources starts with a prefix so we can check using startswith instead of regex.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense!

At same time, I think the downside is: we need to make sure any custom resources won't start with the same prefix and we still need to do the parsing for all the placement group allocation.

Also, as the release didn't show noticeable performance regression, the code can be checked in as it is. I'll add the idea to the TODO for the future improvement.

/*for_wildcard_resource=*/false,
/*for_indexed_resource=*/true);
if (data) {
ResourceID original_resource_id(data->original_resource);
absl::flat_hash_set<ResourceID> &resource_set =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you get data by bracket and if the data doesn't exist, it. creates a new one

Suggested change
absl::flat_hash_set<ResourceID> &resource_set =
pg_indexed_resources_.find(original_resource_id) != pg_indexed_resources_.end() {
it->second.erase(...)
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Will update.

pg_indexed_resources_[original_resource_id];

resource_set.erase(resource_id);
if (resource_set.empty()) {
pg_indexed_resources_.erase(original_resource_id);
}
}
}

const std::vector<FixedPoint> &NodeResourceInstanceSet::Get(
Expand All @@ -69,6 +86,14 @@ NodeResourceInstanceSet &NodeResourceInstanceSet::Set(ResourceID resource_id,
resources_.erase(resource_id);
} else {
resources_[resource_id] = std::move(instances);

// Popluate the pg_indexed_resources_map_
auto data = ParsePgFormattedResource(resource_id.Binary(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above

/*for_wildcard_resource=*/false,
/*for_indexed_resource=*/true);
if (data) {
pg_indexed_resources_[ResourceID(data->original_resource)].emplace(resource_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually for this particular case, I'd like to create a default vector when the particular resource id doesn't exist. I'm wondering if square brackets is okay here. As I don't have a lot of context on c++ best practices so please let me know what's the best pattern.

}
}
return *this;
}
Expand All @@ -93,19 +118,121 @@ bool NodeResourceInstanceSet::operator==(const NodeResourceInstanceSet &other) c
std::optional<absl::flat_hash_map<ResourceID, std::vector<FixedPoint>>>
NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) {
absl::flat_hash_map<ResourceID, std::vector<FixedPoint>> allocations;

// During resource allocation with a placement group, no matter whether the allocation
// requirement specifies a bundle index, we need to generate the allocation on both
// the wildcard resource and the indexed resource. The resource_demand shouldn't be
// assigned across bundles. And If no bundle index is specified, we will iterate
// through the bundles and find the first bundle that can fit the required resources.
// In addition, for unit resources, we need to make sure that the allocation on the
// wildcard resource and the indexed resource are consistent, meaning the same
// instance ids should be allocated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a simple example in this comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also can you update .h file and add this explanation?


// In the format of:
// key: original resource id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this comment? (already in the type?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment was mainly to emphasize the ResourceID in the key of the map is the original ResourceID (CPU, GPU, etc. without pg_id and bundle index) and the ResourceID in the value is the actual ResourceID (with pg_id and bundle index). I'll update the comment to only include the differentiation.

// value: [resource id, parsed pg format resource data]
absl::flat_hash_map<ResourceID,
std::vector<std::pair<ResourceID, PgFormattedResourceData>>>
pg_resource_map;

for (const auto &[resource_id, demand] : resource_demands.Resources()) {
auto allocation = TryAllocate(resource_id, demand);
if (allocation) {
// Even if allocation failed we need to remember partial allocations to correctly
// free resources.
allocations[resource_id] = std::move(*allocation);
auto data = ParsePgFormattedResource(resource_id.Binary(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same concern as overhead!

/*for_wildcard_resource*/ true,
/*for_indexed_resource*/ true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this supposed to be false?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to find all pg resources including both wildcard resources and indexed resources.

And my understanding is that we need to set both "for_wildcard_resource" and "for_indexed_resource" to be true to find the resource ids for both.

So here I think we should still set it to be true.


if (data) {
// Aggregate based on resource type
ResourceID original_resource_id{data->original_resource};
pg_resource_map[original_resource_id].push_back(
std::make_pair(resource_id, data.value()));
} else {
// Allocation failed. Restore partially allocated resources.
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
// Directly allocate the resources if the resource is not with a placement group
auto allocation = TryAllocate(resource_id, demand);
if (allocation) {
// Even if allocation failed we need to remember partial allocations to
// correctly free resources.
allocations[resource_id] = std::move(*allocation);
} else {
// Allocation failed. Restore partially allocated resources.
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
}
return std::nullopt;
}
}
}

// Handle the resource allocation for resources with placement group
for (const auto &[original_resource_id, resource_id_vector] : pg_resource_map) {
// Assuming exactly 1 placement group and at most 1 bundle index can be specified in
// the resource requirement for a single resource type
std::vector<FixedPoint> wildcard_allocation;
const ResourceID *wildcard_resource_id = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why using a pointer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a pointer mainly to avoid copy of the wildcard resource id. But please let me know a better pattern for the purpose.


// Allocate indexed resource
if (resource_id_vector.size() == 1) {
// The case where no bundle index is specified
// Iterate through the bundles to find the first one with enough space
bool found = false;
wildcard_resource_id = &resource_id_vector[0].first;
auto index_resources = pg_indexed_resources_.find(original_resource_id);
if (index_resources != pg_indexed_resources_.end()) {
for (ResourceID indexed_resource_id : index_resources->second) {
if (Has(indexed_resource_id)) {
auto allocation = TryAllocate(
indexed_resource_id, resource_demands.Get(resource_id_vector[0].first));

if (allocation) {
// Found the allocation in a bundle
wildcard_allocation = *allocation;
allocations[indexed_resource_id] = std::move(*allocation);
found = true;
break;
}
}
}
}

if (!found) {
// No bundle can fit the required resources, allocation failed
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
}
return std::nullopt;
}
} else {
// The case where the bundle index is specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there no case where the demands only include indexed resources, but not wildcard resources? I assume no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(feel like the logic assumes this to be correct. maybe add it to the docstring?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, here:

// Note that all nodes that have placement group have a special
it will always include the wildcard resource.

At the same time, I'll add the assumption to the comment as well.

// The each resource type, both the wildcard resource and the indexed resource
// should be in the resource_demand
for (const std::pair<ResourceID, PgFormattedResourceData> &pair :
resource_id_vector) {
if (pair.second.bundle_index != -1) {
// This is the indexed resource
auto allocation = TryAllocate(pair.first, resource_demands.Get(pair.first));

if (allocation) {
wildcard_allocation = *allocation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe do this only once to avoid repetitive copy?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Currently, the same allocation will used for both wildcard resource and indexed resource.

Instead of doing a copy here and 2 moves for both wildcard and index resource, I think I can be a copy for the indexed resource and a move for the wildcard resource,

allocations[pair.first] = std::move(*allocation);
} else {
// The corresponding bundle cannot hold the required resources.
// Allocation failed
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
}
return std::nullopt;
}
} else {
// This is the wildcard resource
wildcard_resource_id = &pair.first;
}
}
return std::nullopt;
}

// Allocate wildcard resource, should be consistent with the indexed resource
RAY_CHECK(wildcard_resource_id != nullptr);
RAY_CHECK(!wildcard_allocation.empty());
AllocateWithReference(wildcard_allocation, *wildcard_resource_id);
allocations[*wildcard_resource_id] = std::move(wildcard_allocation);
}

return std::make_optional<absl::flat_hash_map<ResourceID, std::vector<FixedPoint>>>(
Expand Down Expand Up @@ -139,7 +266,7 @@ std::optional<std::vector<FixedPoint>> NodeResourceInstanceSet::TryAllocate(
//
// As long as remaining_demand is greater than 1.,
// allocate full unit-capacity instances until the remaining_demand becomes fractional.
// Then try to find the best fit for the fractional remaining_resources. Best fist means
// Then try to find the best fit for the fractional remaining_resources. Best fit means
// allocating the resource instance with the smallest available capacity greater than
// remaining_demand
if (remaining_demand >= 1.) {
Expand Down Expand Up @@ -186,6 +313,20 @@ std::optional<std::vector<FixedPoint>> NodeResourceInstanceSet::TryAllocate(
return std::make_optional<std::vector<FixedPoint>>(std::move(allocation));
}

void NodeResourceInstanceSet::AllocateWithReference(
const std::vector<FixedPoint> &ref_allocation, ResourceID resource_id) {
std::vector<FixedPoint> available = Get(resource_id);
RAY_CHECK(!available.empty());
RAY_CHECK_EQ(available.size(), ref_allocation.size());

for (size_t i = 0; i < ref_allocation.size(); i++) {
RAY_CHECK_GE(available[i], ref_allocation[i]);
available[i] -= ref_allocation[i];
}

Set(resource_id, std::move(available));
}

void NodeResourceInstanceSet::Free(ResourceID resource_id,
const std::vector<FixedPoint> &allocation) {
std::vector<FixedPoint> available = Get(resource_id);
Expand Down Expand Up @@ -244,7 +385,7 @@ std::vector<FixedPoint> NodeResourceInstanceSet::Subtract(

std::string NodeResourceInstanceSet::DebugString() const {
std::stringstream buffer;
buffer << "{";
buffer << "{{";
bool first = true;
for (const auto &[id, quantity] : resources_) {
if (!first) {
Expand All @@ -253,7 +394,27 @@ std::string NodeResourceInstanceSet::DebugString() const {
first = false;
buffer << id.Binary() << ": " << FixedPointVectorToString(quantity);
}
buffer << "}";
buffer << "}, {";

first = true;
for (const auto &[original_id, indexed_ids] : pg_indexed_resources_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this useful btw? feel like it may just spam debugstring

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used it when testing the change in this PR to make sure the information in the auxiliary map is correct . But makes sense, it might not be interesting after the feature is fully tested and stable.

if (!first) {
buffer << ", ";
}
first = false;

buffer << original_id.Binary() << ": {";
bool firstInSet = true;
for (const auto &index_id : indexed_ids) {
if (!firstInSet) {
buffer << ", ";
}
firstInSet = false;
buffer << index_id.Binary();
}
buffer << "}";
}
buffer << "}}";
return buffer.str();
}

Expand Down
24 changes: 24 additions & 0 deletions src/ray/common/scheduling/resource_instance_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ class NodeResourceInstanceSet {
return resources_;
}

/// Only for testing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it really used? I couldn't find it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. It is not used. I was to follow the above function for resources_.

At the same time, I think it makes sense to remove the function and add it when it is needed in the test

const absl::flat_hash_map<ResourceID, absl::flat_hash_set<ResourceID>>
&PgIndexedResources() const {
return pg_indexed_resources_;
}

private:
/// Allocate enough capacity across the instances of a resource to satisfy "demand".
///
Expand All @@ -108,8 +114,26 @@ class NodeResourceInstanceSet {
std::optional<std::vector<FixedPoint>> TryAllocate(ResourceID resource_id,
FixedPoint demand);

/// Allocate resource to the resource_id based on a provided reference allocation.
/// The function is used for placement group allocation. Making the allocation of
/// the wildcard resource be identical to the indexed resource allocation.
///
/// The function assumes and also verifies that (1) the resource_id exists in the
/// node; (2) the available resources with resource_id on the node can satisfy the
/// provided ref_allocation.
///
/// \param ref_allocation: The reference allocation used to allocate the resource_id
/// \param resource_id: The id of the resource to be allocated
void AllocateWithReference(const std::vector<FixedPoint> &ref_allocation,
ResourceID resource_id);

/// Map from the resource IDs to the resource instance values.
absl::flat_hash_map<ResourceID, std::vector<FixedPoint>> resources_;

/// Map from the original resource IDs to the actual resource IDs for all the indexed
/// placement group resources. This map should be treated as a derived map from the
/// resources_ map and should always be consistent with the _resource map.
absl::flat_hash_map<ResourceID, absl::flat_hash_set<ResourceID>> pg_indexed_resources_;
};

} // namespace ray
Loading