Skip to content

Commit

Permalink
[Core] Make Placement Group Wildcard and Indexed Resource Assignments…
Browse files Browse the repository at this point in the history
… Consistent (#48088)

Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
  • Loading branch information
MengjinYan authored Nov 7, 2024
1 parent d3e4a51 commit 8deca45
Show file tree
Hide file tree
Showing 6 changed files with 1,214 additions and 181 deletions.
15 changes: 12 additions & 3 deletions src/ray/common/bundle_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ std::string BundleSpecification::DebugString() const {
}

std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const PlacementGroupID &group_id,
const std::string &group_id_hex,
int64_t bundle_index) {
std::stringstream os;
if (bundle_index >= 0) {
os << original_resource_name << kGroupKeyword << std::to_string(bundle_index) << "_"
<< group_id.Hex();
<< group_id_hex;
} else {
RAY_CHECK(bundle_index == -1) << "Invalid index " << bundle_index;
os << original_resource_name << kGroupKeyword << group_id.Hex();
os << original_resource_name << kGroupKeyword << group_id_hex;
}
std::string result = os.str();
RAY_DCHECK(GetOriginalResourceName(result) == original_resource_name)
Expand All @@ -112,6 +112,13 @@ std::string FormatPlacementGroupResource(const std::string &original_resource_na
return result;
}

std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const PlacementGroupID &group_id,
int64_t bundle_index) {
return FormatPlacementGroupResource(
original_resource_name, group_id.Hex(), bundle_index);
}

std::string GetOriginalResourceName(const std::string &resource) {
auto data = ParsePgFormattedResource(
resource, /*for_wildcard_resource*/ true, /*for_indexed_resource*/ true);
Expand Down Expand Up @@ -146,6 +153,7 @@ std::optional<PgFormattedResourceData> ParsePgFormattedResource(
match_groups.size() == 3) {
data.original_resource = match_groups[1].str();
data.bundle_index = -1;
data.group_id = match_groups[2].str();
return data;
}
}
Expand All @@ -157,6 +165,7 @@ std::optional<PgFormattedResourceData> ParsePgFormattedResource(
match_groups.size() == 4) {
data.original_resource = match_groups[1].str();
data.bundle_index = stoi(match_groups[2].str());
data.group_id = match_groups[3].str();
return data;
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/ray/common/bundle_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,22 @@ struct PgFormattedResourceData {
std::string original_resource;
/// -1 if it is a wildcard resource.
int64_t bundle_index;
std::string group_id;
};

/// Format a placement group resource with provided parameters.
///
/// \param original_resource_name The original resource name of the pg resource.
/// \param group_id_str The group id in string format.
/// \param bundle_index The bundle index. If -1, generate the wildcard pg resource.
/// E.g., [original_resource_name]_group_[group_id_str].
/// If >=0, generate the indexed pg resource. E.g.,
/// [original_resource_name]_group_[bundle_index]_[group_id_str]
/// \return The corresponding formatted placement group resource string.
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const std::string &group_id_hex,
int64_t bundle_index = -1);

/// Format a placement group resource, e.g., CPU -> CPU_group_i
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const PlacementGroupID &group_id,
Expand Down
212 changes: 202 additions & 10 deletions src/ray/common/scheduling/resource_instance_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#include <cmath>
#include <sstream>
#include <utility>

#include "ray/common/bundle_spec.h"
#include "ray/util/logging.h"

namespace ray {
Expand All @@ -43,6 +45,29 @@ bool NodeResourceInstanceSet::Has(ResourceID resource_id) const {

void NodeResourceInstanceSet::Remove(ResourceID resource_id) {
resources_.erase(resource_id);

// Remove from the pg_indexed_resources_ as well
auto data = ParsePgFormattedResource(resource_id.Binary(),
/*for_wildcard_resource=*/false,
/*for_indexed_resource=*/true);
if (data) {
ResourceID original_resource_id(data->original_resource);

auto pg_resource_map_it = pg_indexed_resources_.find(original_resource_id);
if (pg_resource_map_it != pg_indexed_resources_.end()) {
auto resource_set_it = pg_resource_map_it->second.find(data->group_id);

if (resource_set_it != pg_resource_map_it->second.end()) {
resource_set_it->second.erase(resource_id);
if (resource_set_it->second.empty()) {
pg_resource_map_it->second.erase(data->group_id);
}
if (pg_resource_map_it->second.empty()) {
pg_indexed_resources_.erase(original_resource_id);
}
}
}
}
}

const std::vector<FixedPoint> &NodeResourceInstanceSet::Get(
Expand All @@ -69,6 +94,24 @@ NodeResourceInstanceSet &NodeResourceInstanceSet::Set(ResourceID resource_id,
resources_.erase(resource_id);
} else {
resources_[resource_id] = std::move(instances);

// Popluate the pg_indexed_resources_map_
// TODO (myan): The parsing of the resource_id String can be costly and impact the
// task creation throughput if the parting is required every time we allocate
// resources for a task and updating the available resources. The current benchmark
// shows no observable impact for now. But in the furture, ideas of improvement are:
// (1) to add the placement group id as well as the bundle index inside the
// ResourceID class. And instead of parse the String, leveraging the fields in the
// ResourceID class directly; (2) to update the pg resource id format to start with
// a special prefix so that we can do "startwith" instead of regex match which is
// less costly
auto data = ParsePgFormattedResource(resource_id.Binary(),
/*for_wildcard_resource=*/false,
/*for_indexed_resource=*/true);
if (data) {
pg_indexed_resources_[ResourceID(data->original_resource)][data->group_id].emplace(
resource_id);
}
}
return *this;
}
Expand All @@ -93,21 +136,156 @@ bool NodeResourceInstanceSet::operator==(const NodeResourceInstanceSet &other) c
std::optional<absl::flat_hash_map<ResourceID, std::vector<FixedPoint>>>
NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) {
absl::flat_hash_map<ResourceID, std::vector<FixedPoint>> allocations;

// During resource allocation with a placement group, no matter whether the allocation
// requirement specifies a bundle index, we generate the allocation on both
// the wildcard resource and the indexed resource. The resource_demand shouldn't be
// assigned across bundles. And If no bundle index is specified, we will iterate
// through the bundles and find the first bundle that can fit the required resources.
// In addition, for unit resources, we make sure that the allocation on the
// wildcard resource and the indexed resource are consistent, meaning the same
// instance ids should be allocated.
//
// For example, considering the GPU resource on a host. Assuming the host has 3 GPUs
// and 1 placement group with 2 bundles. The bundle with index 1 contains 1 GPU and
// the bundle with index 2 contains 2 GPU.
//
// The current node resource can be as follows:
// resource id: total, available
// GPU: [1, 1, 1], [0, 0, 0]
// GPU_<pg_id>: [1, 1, 1], [1, 1, 1]
// GPU_1_<pg_id>: [1, 0, 0], [1, 0, 0]
// GPU_2_<pg_id>: [0, 1, 1], [0, 1, 1]
//
// Now, we want to allocate a task with 2 GPUs and in the placement group <pg_id>,
// reflecting in the following resource demand:
// GPU_<pg_id> : 2
//
// We will iterate though all the bundles in the placement group and bundle with
// index=2 has the required capacity. So we will allocate the task to the 2 GPUs in
// bundle 2 in placement group <pg_id> and the same allocation should be reflected in
// the wildcard GPU resource. So the allocation will be:
// GPU_<pg_id> : [0, 1, 1]
// GPU_2_<pg_id> : [0, 1, 1]
//
// And as a result, after the allocation, current node resource will be:
// resource id: total, available
// GPU: [1, 1, 1], [0, 0, 0]
// GPU_<pg_id>: [1, 1, 1], [1, 0, 0]
// GPU_1_<pg_id>: [1, 0, 0], [1, 0, 0]
// GPU_2_<pg_id>: [0, 1, 1], [0, 0, 0]

absl::flat_hash_map</* original resource id (GPU, CPU, etc.) */ ResourceID,
std::vector<std::pair</* actual resource id */ ResourceID,
PgFormattedResourceData>>>
pg_resource_map;

for (const auto &[resource_id, demand] : resource_demands.Resources()) {
auto allocation = TryAllocate(resource_id, demand);
if (allocation) {
// Even if allocation failed we need to remember partial allocations to correctly
// free resources.
allocations[resource_id] = std::move(*allocation);
auto data = ParsePgFormattedResource(resource_id.Binary(),
/*for_wildcard_resource*/ true,
/*for_indexed_resource*/ true);

if (data) {
// Aggregate based on resource type
ResourceID original_resource_id{data->original_resource};
pg_resource_map[original_resource_id].push_back(
std::make_pair(resource_id, data.value()));
} else {
// Allocation failed. Restore partially allocated resources.
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
// Directly allocate the resources if the resource is not with a placement group
auto allocation = TryAllocate(resource_id, demand);
if (allocation) {
// Even if allocation failed we need to remember partial allocations to
// correctly free resources.
allocations[resource_id] = std::move(*allocation);
} else {
// Allocation failed. Restore partially allocated resources.
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
}
return std::nullopt;
}
return std::nullopt;
}
}

// Handle the resource allocation for resources with placement group
for (const auto &[original_resource_id, resource_id_vector] : pg_resource_map) {
// Assuming exactly 1 placement group and at most 1 bundle index can be specified in
// the resource requirement for a single resource type
// Also assuming the wildcard resource id will always exist in the resource_demands
// no matter how the resource requirement is specified in task
std::optional<std::vector<FixedPoint>> wildcard_allocation;
const ResourceID *wildcard_resource_id = nullptr;
const std::string *pg_id = nullptr;

// Allocate indexed resource
if (resource_id_vector.size() == 1) {
// The case where no bundle index is specified
// Iterate through the bundles with the same original resource and pg_id to find
// the first one with enough space
bool found = false;
wildcard_resource_id = &resource_id_vector[0].first;
pg_id = &resource_id_vector[0].second.group_id;

auto pg_index_resources_it = pg_indexed_resources_.find(original_resource_id);
if (pg_index_resources_it != pg_indexed_resources_.end()) {
auto index_resources_it = pg_index_resources_it->second.find(*pg_id);
if (index_resources_it != pg_index_resources_it->second.end()) {
for (ResourceID indexed_resource_id : index_resources_it->second) {
if (Has(indexed_resource_id)) {
wildcard_allocation = TryAllocate(
indexed_resource_id, resource_demands.Get(*wildcard_resource_id));
if (wildcard_allocation) {
// Found the allocation in a bundle
allocations[indexed_resource_id] = *wildcard_allocation;
found = true;
break;
}
}
}
}
}

if (!found) {
// No bundle can fit the required resources, allocation failed
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
}
return std::nullopt;
}
} else {
// The case where the bundle index is specified
// For each resource type, both the wildcard resource and the indexed resource
// should be in the resource_demand
for (const std::pair<ResourceID, PgFormattedResourceData> &pair :
resource_id_vector) {
if (pair.second.bundle_index != -1) {
// This is the indexed resource
wildcard_allocation = TryAllocate(pair.first, resource_demands.Get(pair.first));

if (wildcard_allocation) {
allocations[pair.first] = *wildcard_allocation;
} else {
// The corresponding bundle cannot hold the required resources.
// Allocation failed
for (const auto &[resource_id, allocation] : allocations) {
Free(resource_id, allocation);
}
return std::nullopt;
}
} else {
// This is the wildcard resource
wildcard_resource_id = &pair.first;
}
}
}

// Allocate wildcard resource, should be consistent with the indexed resource
RAY_CHECK(wildcard_resource_id != nullptr);
RAY_CHECK(!(*wildcard_allocation).empty());
AllocateWithReference(*wildcard_allocation, *wildcard_resource_id);
allocations[*wildcard_resource_id] = std::move(*wildcard_allocation);
}

return std::make_optional<absl::flat_hash_map<ResourceID, std::vector<FixedPoint>>>(
std::move(allocations));
}
Expand Down Expand Up @@ -139,7 +317,7 @@ std::optional<std::vector<FixedPoint>> NodeResourceInstanceSet::TryAllocate(
//
// As long as remaining_demand is greater than 1.,
// allocate full unit-capacity instances until the remaining_demand becomes fractional.
// Then try to find the best fit for the fractional remaining_resources. Best fist means
// Then try to find the best fit for the fractional remaining_resources. Best fit means
// allocating the resource instance with the smallest available capacity greater than
// remaining_demand
if (remaining_demand >= 1.) {
Expand Down Expand Up @@ -186,6 +364,20 @@ std::optional<std::vector<FixedPoint>> NodeResourceInstanceSet::TryAllocate(
return std::make_optional<std::vector<FixedPoint>>(std::move(allocation));
}

void NodeResourceInstanceSet::AllocateWithReference(
const std::vector<FixedPoint> &ref_allocation, ResourceID resource_id) {
std::vector<FixedPoint> available = Get(resource_id);
RAY_CHECK(!available.empty());
RAY_CHECK_EQ(available.size(), ref_allocation.size());

for (size_t i = 0; i < ref_allocation.size(); i++) {
RAY_CHECK_GE(available[i], ref_allocation[i]);
available[i] -= ref_allocation[i];
}

Set(resource_id, std::move(available));
}

void NodeResourceInstanceSet::Free(ResourceID resource_id,
const std::vector<FixedPoint> &allocation) {
std::vector<FixedPoint> available = Get(resource_id);
Expand Down
Loading

0 comments on commit 8deca45

Please sign in to comment.