Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Make Placement Group Wildcard and Indexed Resource Assignments Consistent #48088

Merged
merged 18 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/ray/common/bundle_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ std::string BundleSpecification::DebugString() const {
}

std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const PlacementGroupID &group_id,
const std::string &group_id_str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group_id_hex?

int64_t bundle_index) {
std::stringstream os;
if (bundle_index >= 0) {
os << original_resource_name << kGroupKeyword << std::to_string(bundle_index) << "_"
<< group_id.Hex();
<< group_id_str;
} else {
RAY_CHECK(bundle_index == -1) << "Invalid index " << bundle_index;
os << original_resource_name << kGroupKeyword << group_id.Hex();
os << original_resource_name << kGroupKeyword << group_id_str;
}
std::string result = os.str();
RAY_DCHECK(GetOriginalResourceName(result) == original_resource_name)
Expand All @@ -112,6 +112,13 @@ std::string FormatPlacementGroupResource(const std::string &original_resource_na
return result;
}

std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const PlacementGroupID &group_id,
int64_t bundle_index) {
return FormatPlacementGroupResource(
original_resource_name, group_id.Hex(), bundle_index);
}

std::string GetOriginalResourceName(const std::string &resource) {
auto data = ParsePgFormattedResource(
resource, /*for_wildcard_resource*/ true, /*for_indexed_resource*/ true);
Expand Down Expand Up @@ -146,6 +153,7 @@ std::optional<PgFormattedResourceData> ParsePgFormattedResource(
match_groups.size() == 3) {
data.original_resource = match_groups[1].str();
data.bundle_index = -1;
data.group_id = match_groups[2].str();
return data;
}
}
Expand All @@ -157,6 +165,7 @@ std::optional<PgFormattedResourceData> ParsePgFormattedResource(
match_groups.size() == 4) {
data.original_resource = match_groups[1].str();
data.bundle_index = stoi(match_groups[2].str());
data.group_id = match_groups[3].str();
return data;
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/ray/common/bundle_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,22 @@ struct PgFormattedResourceData {
std::string original_resource;
/// -1 if it is a wildcard resource.
int64_t bundle_index;
std::string group_id;
};

/// Format a placement group resource with provided parameters.
///
/// \param original_resource_name The original resource name of the pg resource.
/// \param group_id_str The group id in string format.
/// \param bundle_index The bundle index. If -1, generate the wildcard pg resource.
/// E.g., [original_resource_name]_group_[group_id_str].
/// If >=0, generate the indexed pg resource. E.g.,
/// [original_resource_name]_group_[bundle_index]_[group_id_str]
/// \return The corresponding formatted placement group resource string.
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const std::string &group_id_str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group_id_hex?

int64_t bundle_index = -1);

/// Format a placement group resource, e.g., CPU -> CPU_group_i
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const PlacementGroupID &group_id,
Expand Down
207 changes: 195 additions & 12 deletions src/ray/common/scheduling/resource_instance_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#include <cmath>
#include <sstream>
#include <utility>

#include "ray/common/bundle_spec.h"
#include "ray/util/logging.h"

namespace ray {
Expand All @@ -43,6 +45,25 @@ bool NodeResourceInstanceSet::Has(ResourceID resource_id) const {

void NodeResourceInstanceSet::Remove(ResourceID resource_id) {
resources_.erase(resource_id);

// Remove from the pg_indexed_resources_ as well
auto data = ParsePgFormattedResource(resource_id.Binary(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this will add significant overhead to the scheduler (because parsing string is so much more expensive). is there any way to do this only when we know resource id is pg?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. And I checked the code. The Remove function is only called in the DeleteLocalResource function in LocalResourceManager when returning the pg bundle. The overhead added shouldn't impact the schedule of the task.

At the same time, the concerns on the Set and TryAllocate are valid. One potential idea I can think of is adding a boolean in the ResourceID to indicate whether it is a pg or not. Will further explore the idea.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the overhead will be smaller if PG resources starts with a prefix so we can check using startswith instead of regex.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense!

At same time, I think the downside is: we need to make sure any custom resources won't start with the same prefix and we still need to do the parsing for all the placement group allocation.

Also, as the release didn't show noticeable performance regression, the code can be checked in as it is. I'll add the idea to the TODO for the future improvement.

/*for_wildcard_resource=*/false,
/*for_indexed_resource=*/true);
if (data) {
ResourceID original_resource_id(data->original_resource);
absl::flat_hash_map<std::string, absl::flat_hash_set<ResourceID>> &pg_resource_map =
pg_indexed_resources_[original_resource_id];
absl::flat_hash_set<ResourceID> &resource_set = pg_resource_map[data->group_id];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid using []!


resource_set.erase(resource_id);
if (resource_set.empty()) {
pg_resource_map.erase(data->group_id);
}
if (pg_resource_map.empty()) {
pg_indexed_resources_.erase(original_resource_id);
}
}
}

const std::vector<FixedPoint> &NodeResourceInstanceSet::Get(
Expand All @@ -69,6 +90,15 @@ NodeResourceInstanceSet &NodeResourceInstanceSet::Set(ResourceID resource_id,
resources_.erase(resource_id);
} else {
resources_[resource_id] = std::move(instances);

// Popluate the pg_indexed_resources_map_
auto data = ParsePgFormattedResource(resource_id.Binary(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above

/*for_wildcard_resource=*/false,
/*for_indexed_resource=*/true);
if (data) {
pg_indexed_resources_[ResourceID(data->original_resource)][data->group_id].emplace(
resource_id);
}
}
return *this;
}
Expand All @@ -93,21 +123,130 @@ bool NodeResourceInstanceSet::operator==(const NodeResourceInstanceSet &other) c
std::optional<absl::flat_hash_map<ResourceID, std::vector<FixedPoint>>>
NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) {
absl::flat_hash_map<ResourceID, std::vector<FixedPoint>> allocations;

// During resource allocation with a placement group, no matter whether the allocation
// requirement specifies a bundle index, we need to generate the allocation on both
// the wildcard resource and the indexed resource. The resource_demand shouldn't be
// assigned across bundles. And If no bundle index is specified, we will iterate
// through the bundles and find the first bundle that can fit the required resources.
// In addition, for unit resources, we need to make sure that the allocation on the
// wildcard resource and the indexed resource are consistent, meaning the same
// instance ids should be allocated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a simple example in this comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also can you update .h file and add this explanation?


// In the format of:
// key: original resource id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this comment? (already in the type?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment was mainly to emphasize the ResourceID in the key of the map is the original ResourceID (CPU, GPU, etc. without pg_id and bundle index) and the ResourceID in the value is the actual ResourceID (with pg_id and bundle index). I'll update the comment to only include the differentiation.

// value: [resource id, parsed pg format resource data]
absl::flat_hash_map<ResourceID,
std::vector<std::pair<ResourceID, PgFormattedResourceData>>>
pg_resource_map;

for (const auto &[resource_id, demand] : resource_demands.Resources()) {
auto allocation = TryAllocate(resource_id, demand);
if (allocation) {
// Even if allocation failed we need to remember partial allocations to correctly
// free resources.
allocations[resource_id] = std::move(*allocation);
auto data = ParsePgFormattedResource(resource_id.Binary(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same concern as overhead!

/*for_wildcard_resource*/ true,
/*for_indexed_resource*/ true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this supposed to be false?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to find all pg resources including both wildcard resources and indexed resources.

And my understanding is that we need to set both "for_wildcard_resource" and "for_indexed_resource" to be true to find the resource ids for both.

So here I think we should still set it to be true.


if (data) {
// Aggregate based on resource type
ResourceID original_resource_id{data->original_resource};
pg_resource_map[original_resource_id].push_back(
std::make_pair(resource_id, data.value()));
} else {
// Allocation failed. Restore partially allocated resources.
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
// Directly allocate the resources if the resource is not with a placement group
auto allocation = TryAllocate(resource_id, demand);
if (allocation) {
// Even if allocation failed we need to remember partial allocations to
// correctly free resources.
allocations[resource_id] = std::move(*allocation);
} else {
// Allocation failed. Restore partially allocated resources.
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
}
return std::nullopt;
}
return std::nullopt;
}
}

// Handle the resource allocation for resources with placement group
for (const auto &[original_resource_id, resource_id_vector] : pg_resource_map) {
// Assuming exactly 1 placement group and at most 1 bundle index can be specified in
// the resource requirement for a single resource type
std::vector<FixedPoint> wildcard_allocation;
const ResourceID *wildcard_resource_id = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why using a pointer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a pointer mainly to avoid copy of the wildcard resource id. But please let me know a better pattern for the purpose.

const std::string *pg_id = nullptr;

// Allocate indexed resource
if (resource_id_vector.size() == 1) {
// The case where no bundle index is specified
// Iterate through the bundles with the same original resource and pg_id to find
// the first one with enough space
bool found = false;
wildcard_resource_id = &resource_id_vector[0].first;
pg_id = &resource_id_vector[0].second.group_id;

auto pg_index_resources = pg_indexed_resources_.find(original_resource_id);
MengjinYan marked this conversation as resolved.
Show resolved Hide resolved
if (pg_index_resources != pg_indexed_resources_.end()) {
auto index_resources = pg_index_resources->second.find(*pg_id);
MengjinYan marked this conversation as resolved.
Show resolved Hide resolved
if (index_resources != pg_index_resources->second.end()) {
for (ResourceID indexed_resource_id : index_resources->second) {
if (Has(indexed_resource_id)) {
auto allocation = TryAllocate(
indexed_resource_id, resource_demands.Get(resource_id_vector[0].first));

if (allocation) {
// Found the allocation in a bundle
wildcard_allocation = *allocation;
allocations[indexed_resource_id] = std::move(*allocation);
found = true;
break;
}
}
}
}
}

if (!found) {
// No bundle can fit the required resources, allocation failed
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
}
return std::nullopt;
}
} else {
// The case where the bundle index is specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there no case where the demands only include indexed resources, but not wildcard resources? I assume no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(feel like the logic assumes this to be correct. maybe add it to the docstring?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, here:

// Note that all nodes that have placement group have a special
it will always include the wildcard resource.

At the same time, I'll add the assumption to the comment as well.

// For each resource type, both the wildcard resource and the indexed resource
// should be in the resource_demand
for (const std::pair<ResourceID, PgFormattedResourceData> &pair :
resource_id_vector) {
if (pair.second.bundle_index != -1) {
// This is the indexed resource
auto allocation = TryAllocate(pair.first, resource_demands.Get(pair.first));

if (allocation) {
wildcard_allocation = *allocation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe do this only once to avoid repetitive copy?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Currently, the same allocation will used for both wildcard resource and indexed resource.

Instead of doing a copy here and 2 moves for both wildcard and index resource, I think I can be a copy for the indexed resource and a move for the wildcard resource,

allocations[pair.first] = std::move(*allocation);
} else {
// The corresponding bundle cannot hold the required resources.
// Allocation failed
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
}
return std::nullopt;
}
} else {
// This is the wildcard resource
wildcard_resource_id = &pair.first;
}
}
}

// Allocate wildcard resource, should be consistent with the indexed resource
RAY_CHECK(wildcard_resource_id != nullptr);
RAY_CHECK(!wildcard_allocation.empty());
AllocateWithReference(wildcard_allocation, *wildcard_resource_id);
allocations[*wildcard_resource_id] = std::move(wildcard_allocation);
}

return std::make_optional<absl::flat_hash_map<ResourceID, std::vector<FixedPoint>>>(
std::move(allocations));
}
Expand Down Expand Up @@ -139,7 +278,7 @@ std::optional<std::vector<FixedPoint>> NodeResourceInstanceSet::TryAllocate(
//
// As long as remaining_demand is greater than 1.,
// allocate full unit-capacity instances until the remaining_demand becomes fractional.
// Then try to find the best fit for the fractional remaining_resources. Best fist means
// Then try to find the best fit for the fractional remaining_resources. Best fit means
// allocating the resource instance with the smallest available capacity greater than
// remaining_demand
if (remaining_demand >= 1.) {
Expand Down Expand Up @@ -186,6 +325,20 @@ std::optional<std::vector<FixedPoint>> NodeResourceInstanceSet::TryAllocate(
return std::make_optional<std::vector<FixedPoint>>(std::move(allocation));
}

void NodeResourceInstanceSet::AllocateWithReference(
const std::vector<FixedPoint> &ref_allocation, ResourceID resource_id) {
std::vector<FixedPoint> available = Get(resource_id);
RAY_CHECK(!available.empty());
RAY_CHECK_EQ(available.size(), ref_allocation.size());

for (size_t i = 0; i < ref_allocation.size(); i++) {
RAY_CHECK_GE(available[i], ref_allocation[i]);
available[i] -= ref_allocation[i];
}

Set(resource_id, std::move(available));
}

void NodeResourceInstanceSet::Free(ResourceID resource_id,
const std::vector<FixedPoint> &allocation) {
std::vector<FixedPoint> available = Get(resource_id);
Expand Down Expand Up @@ -244,7 +397,7 @@ std::vector<FixedPoint> NodeResourceInstanceSet::Subtract(

std::string NodeResourceInstanceSet::DebugString() const {
std::stringstream buffer;
buffer << "{";
buffer << "{resources_:{";
bool first = true;
for (const auto &[id, quantity] : resources_) {
if (!first) {
Expand All @@ -253,7 +406,37 @@ std::string NodeResourceInstanceSet::DebugString() const {
first = false;
buffer << id.Binary() << ": " << FixedPointVectorToString(quantity);
}
buffer << "}";
buffer << "}, pg_indexed_resources_:{";

first = true;
for (const auto &[original_id, pg_resource_id_map] : pg_indexed_resources_) {
if (!first) {
buffer << ", ";
}
first = false;

buffer << original_id.Binary() << ": {";
bool firstInMap = true;
for (const auto &[pg_id, indexed_ids] : pg_resource_id_map) {
if (!firstInMap) {
buffer << ", ";
}
firstInMap = false;

buffer << pg_id << ": {";
bool firstInSet = true;
for (const auto &index_id : indexed_ids) {
if (!firstInSet) {
buffer << ", ";
}
firstInSet = false;
buffer << index_id.Binary();
}
buffer << "}";
}
buffer << "}";
}
buffer << "}}";
return buffer.str();
}

Expand Down
30 changes: 30 additions & 0 deletions src/ray/common/scheduling/resource_instance_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ class NodeResourceInstanceSet {
return resources_;
}

/// Only for testing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it really used? I couldn't find it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. It is not used. I was to follow the above function for resources_.

At the same time, I think it makes sense to remove the function and add it when it is needed in the test

const absl::flat_hash_map<
ResourceID,
absl::flat_hash_map<std::string, absl::flat_hash_set<ResourceID>>>
&PgIndexedResources() const {
return pg_indexed_resources_;
}

private:
/// Allocate enough capacity across the instances of a resource to satisfy "demand".
///
Expand All @@ -108,8 +116,30 @@ class NodeResourceInstanceSet {
std::optional<std::vector<FixedPoint>> TryAllocate(ResourceID resource_id,
FixedPoint demand);

/// Allocate resource to the resource_id based on a provided reference allocation.
/// The function is used for placement group allocation. Making the allocation of
/// the wildcard resource be identical to the indexed resource allocation.
///
/// The function assumes and also verifies that (1) the resource_id exists in the
/// node; (2) the available resources with resource_id on the node can satisfy the
/// provided ref_allocation.
///
/// \param ref_allocation: The reference allocation used to allocate the resource_id
/// \param resource_id: The id of the resource to be allocated
void AllocateWithReference(const std::vector<FixedPoint> &ref_allocation,
ResourceID resource_id);

/// Map from the resource IDs to the resource instance values.
absl::flat_hash_map<ResourceID, std::vector<FixedPoint>> resources_;

/// This is a derived map from the resources_ map. The map aggregates all the current
/// placement group indexed resources in resources_ by their original resource id and
/// pd id. The key of the map is the original resource id. The value is a map of pg id
/// to the corresponding placement group indexed resource ids. This map should always
/// be consistent with the _resource map.
MengjinYan marked this conversation as resolved.
Show resolved Hide resolved
absl::flat_hash_map<ResourceID,
absl::flat_hash_map<std::string, absl::flat_hash_set<ResourceID>>>
pg_indexed_resources_;
};

} // namespace ray
Loading