From e62696c25739c8fcede9d932e67f1ae990889011 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Wed, 16 Oct 2024 23:21:58 -0700 Subject: [PATCH 1/9] Fix placement group resource assignment Signed-off-by: Mengjin Yan --- src/ray/common/bundle_spec.cc | 15 +- src/ray/common/bundle_spec.h | 14 + .../scheduling/resource_instance_set.cc | 185 ++++- .../common/scheduling/resource_instance_set.h | 24 + .../common/test/resource_instance_set_test.cc | 707 +++++++++++++++++- .../placement_group_resource_manager_test.cc | 260 +++---- 6 files changed, 1022 insertions(+), 183 deletions(-) diff --git a/src/ray/common/bundle_spec.cc b/src/ray/common/bundle_spec.cc index 8c410263e39d..6b11249db270 100644 --- a/src/ray/common/bundle_spec.cc +++ b/src/ray/common/bundle_spec.cc @@ -95,15 +95,15 @@ std::string BundleSpecification::DebugString() const { } std::string FormatPlacementGroupResource(const std::string &original_resource_name, - const PlacementGroupID &group_id, + const std::string &group_id_str, int64_t bundle_index) { std::stringstream os; if (bundle_index >= 0) { os << original_resource_name << kGroupKeyword << std::to_string(bundle_index) << "_" - << group_id.Hex(); + << group_id_str; } else { RAY_CHECK(bundle_index == -1) << "Invalid index " << bundle_index; - os << original_resource_name << kGroupKeyword << group_id.Hex(); + os << original_resource_name << kGroupKeyword << group_id_str; } std::string result = os.str(); RAY_DCHECK(GetOriginalResourceName(result) == original_resource_name) @@ -112,6 +112,13 @@ std::string FormatPlacementGroupResource(const std::string &original_resource_na return result; } +std::string FormatPlacementGroupResource(const std::string &original_resource_name, + const PlacementGroupID &group_id, + int64_t bundle_index) { + return FormatPlacementGroupResource( + original_resource_name, group_id.Hex(), bundle_index); +} + std::string GetOriginalResourceName(const std::string &resource) { auto data = ParsePgFormattedResource( resource, /*for_wildcard_resource*/ true, /*for_indexed_resource*/ true); @@ -146,6 +153,7 @@ std::optional ParsePgFormattedResource( match_groups.size() == 3) { data.original_resource = match_groups[1].str(); data.bundle_index = -1; + data.group_id = match_groups[2].str(); return data; } } @@ -157,6 +165,7 @@ std::optional ParsePgFormattedResource( match_groups.size() == 4) { data.original_resource = match_groups[1].str(); data.bundle_index = stoi(match_groups[2].str()); + data.group_id = match_groups[3].str(); return data; } } diff --git a/src/ray/common/bundle_spec.h b/src/ray/common/bundle_spec.h index dad44cd2f5db..12a6d90eddb6 100644 --- a/src/ray/common/bundle_spec.h +++ b/src/ray/common/bundle_spec.h @@ -97,8 +97,22 @@ struct PgFormattedResourceData { std::string original_resource; /// -1 if it is a wildcard resource. int64_t bundle_index; + std::string group_id; }; +/// Format a placement group resource with provided parameters. +/// +/// \param original_resource_name The original resource name of the pg resource. +/// \param group_id_str The group id in string format. +/// \param bundle_index The bundle index. If -1, generate the wildcard pg resource. +/// E.g., [original_resource_name]_group_[group_id_str]. +/// If >=0, generate the indexed pg resource. E.g., +/// [original_resource_name]_group_[bundle_index]_[group_id_str] +/// \return The corresponding formatted placement group resource string. +std::string FormatPlacementGroupResource(const std::string &original_resource_name, + const std::string &group_id_str, + int64_t bundle_index = -1); + /// Format a placement group resource, e.g., CPU -> CPU_group_i std::string FormatPlacementGroupResource(const std::string &original_resource_name, const PlacementGroupID &group_id, diff --git a/src/ray/common/scheduling/resource_instance_set.cc b/src/ray/common/scheduling/resource_instance_set.cc index 976deee39383..74cf15375b9a 100644 --- a/src/ray/common/scheduling/resource_instance_set.cc +++ b/src/ray/common/scheduling/resource_instance_set.cc @@ -16,7 +16,9 @@ #include #include +#include +#include "ray/common/bundle_spec.h" #include "ray/util/logging.h" namespace ray { @@ -43,6 +45,21 @@ bool NodeResourceInstanceSet::Has(ResourceID resource_id) const { void NodeResourceInstanceSet::Remove(ResourceID resource_id) { resources_.erase(resource_id); + + // Remove from the pg_indexed_resources_ as well + auto data = ParsePgFormattedResource(resource_id.Binary(), + /*for_wildcard_resource=*/false, + /*for_indexed_resource=*/true); + if (data) { + ResourceID original_resource_id(data->original_resource); + absl::flat_hash_set &resource_set = + pg_indexed_resources_[original_resource_id]; + + resource_set.erase(resource_id); + if (resource_set.empty()) { + pg_indexed_resources_.erase(original_resource_id); + } + } } const std::vector &NodeResourceInstanceSet::Get( @@ -69,6 +86,14 @@ NodeResourceInstanceSet &NodeResourceInstanceSet::Set(ResourceID resource_id, resources_.erase(resource_id); } else { resources_[resource_id] = std::move(instances); + + // Popluate the pg_indexed_resources_map_ + auto data = ParsePgFormattedResource(resource_id.Binary(), + /*for_wildcard_resource=*/false, + /*for_indexed_resource=*/true); + if (data) { + pg_indexed_resources_[ResourceID(data->original_resource)].emplace(resource_id); + } } return *this; } @@ -93,19 +118,121 @@ bool NodeResourceInstanceSet::operator==(const NodeResourceInstanceSet &other) c std::optional>> NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { absl::flat_hash_map> allocations; + + // During resource allocation with a placement group, no matter whether the allocation + // requirement specifies a bundle index, we need to generate the allocation on both + // the wildcard resource and the indexed resource. The resource_demand shouldn't be + // assigned across bundles. And If no bundle index is specified, we will iterate + // through the bundles and find the first bundle that can fit the required resources. + // In addition, for unit resources, we need to make sure that the allocation on the + // wildcard resource and the indexed resource are consistent, meaning the same + // instance ids should be allocated. + + // In the format of: + // key: original resource id, + // value: [resource id, parsed pg format resource data] + absl::flat_hash_map>> + pg_resource_map; + for (const auto &[resource_id, demand] : resource_demands.Resources()) { - auto allocation = TryAllocate(resource_id, demand); - if (allocation) { - // Even if allocation failed we need to remember partial allocations to correctly - // free resources. - allocations[resource_id] = std::move(*allocation); + auto data = ParsePgFormattedResource(resource_id.Binary(), + /*for_wildcard_resource*/ true, + /*for_indexed_resource*/ true); + + if (data) { + // Aggregate based on resource type + ResourceID original_resource_id{data->original_resource}; + pg_resource_map[original_resource_id].push_back( + std::make_pair(resource_id, data.value())); } else { - // Allocation failed. Restore partially allocated resources. - for (const auto &[resource_id, allocation] : allocations) { - Free(resource_id, allocation); + // Directly allocate the resources if the resource is not with a placement group + auto allocation = TryAllocate(resource_id, demand); + if (allocation) { + // Even if allocation failed we need to remember partial allocations to + // correctly free resources. + allocations[resource_id] = std::move(*allocation); + } else { + // Allocation failed. Restore partially allocated resources. + for (const auto &[resource_id, allocation] : allocations) { + Free(resource_id, allocation); + } + return std::nullopt; + } + } + } + + // Handle the resource allocation for resources with placement group + for (const auto &[original_resource_id, resource_id_vector] : pg_resource_map) { + // Assuming exactly 1 placement group and at most 1 bundle index can be specified in + // the resource requirement for a single resource type + std::vector wildcard_allocation; + const ResourceID *wildcard_resource_id = nullptr; + + // Allocate indexed resource + if (resource_id_vector.size() == 1) { + // The case where no bundle index is specified + // Iterate through the bundles to find the first one with enough space + bool found = false; + wildcard_resource_id = &resource_id_vector[0].first; + auto index_resources = pg_indexed_resources_.find(original_resource_id); + if (index_resources != pg_indexed_resources_.end()) { + for (ResourceID indexed_resource_id : index_resources->second) { + if (Has(indexed_resource_id)) { + auto allocation = TryAllocate( + indexed_resource_id, resource_demands.Get(resource_id_vector[0].first)); + + if (allocation) { + // Found the allocation in a bundle + wildcard_allocation = *allocation; + allocations[indexed_resource_id] = std::move(*allocation); + found = true; + break; + } + } + } + } + + if (!found) { + // No bundle can fit the required resources, allocation failed + for (const auto &[resource_id, allocation] : allocations) { + Free(resource_id, allocation); + } + return std::nullopt; + } + } else { + // The case where the bundle index is specified + // The each resource type, both the wildcard resource and the indexed resource + // should be in the resource_demand + for (const std::pair &pair : + resource_id_vector) { + if (pair.second.bundle_index != -1) { + // This is the indexed resource + auto allocation = TryAllocate(pair.first, resource_demands.Get(pair.first)); + + if (allocation) { + wildcard_allocation = *allocation; + allocations[pair.first] = std::move(*allocation); + } else { + // The corresponding bundle cannot hold the required resources. + // Allocation failed + for (const auto &[resource_id, allocation] : allocations) { + Free(resource_id, allocation); + } + return std::nullopt; + } + } else { + // This is the wildcard resource + wildcard_resource_id = &pair.first; + } } - return std::nullopt; } + + // Allocate wildcard resource, should be consistent with the indexed resource + RAY_CHECK(wildcard_resource_id != nullptr); + RAY_CHECK(!wildcard_allocation.empty()); + AllocateWithReference(wildcard_allocation, *wildcard_resource_id); + allocations[*wildcard_resource_id] = std::move(wildcard_allocation); } return std::make_optional>>( @@ -139,7 +266,7 @@ std::optional> NodeResourceInstanceSet::TryAllocate( // // As long as remaining_demand is greater than 1., // allocate full unit-capacity instances until the remaining_demand becomes fractional. - // Then try to find the best fit for the fractional remaining_resources. Best fist means + // Then try to find the best fit for the fractional remaining_resources. Best fit means // allocating the resource instance with the smallest available capacity greater than // remaining_demand if (remaining_demand >= 1.) { @@ -186,6 +313,20 @@ std::optional> NodeResourceInstanceSet::TryAllocate( return std::make_optional>(std::move(allocation)); } +void NodeResourceInstanceSet::AllocateWithReference( + const std::vector &ref_allocation, ResourceID resource_id) { + std::vector available = Get(resource_id); + RAY_CHECK(!available.empty()); + RAY_CHECK_EQ(available.size(), ref_allocation.size()); + + for (int i = 0; i < ref_allocation.size(); i++) { + RAY_CHECK_GE(available[i], ref_allocation[i]); + available[i] -= ref_allocation[i]; + } + + Set(resource_id, std::move(available)); +} + void NodeResourceInstanceSet::Free(ResourceID resource_id, const std::vector &allocation) { std::vector available = Get(resource_id); @@ -244,7 +385,7 @@ std::vector NodeResourceInstanceSet::Subtract( std::string NodeResourceInstanceSet::DebugString() const { std::stringstream buffer; - buffer << "{"; + buffer << "{{"; bool first = true; for (const auto &[id, quantity] : resources_) { if (!first) { @@ -253,7 +394,27 @@ std::string NodeResourceInstanceSet::DebugString() const { first = false; buffer << id.Binary() << ": " << FixedPointVectorToString(quantity); } - buffer << "}"; + buffer << "}, {"; + + first = true; + for (const auto &[original_id, indexed_ids] : pg_indexed_resources_) { + if (!first) { + buffer << ", "; + } + first = false; + + buffer << original_id.Binary() << ": {"; + bool firstInSet = true; + for (const auto &index_id : indexed_ids) { + if (!firstInSet) { + buffer << ", "; + } + firstInSet = false; + buffer << index_id.Binary(); + } + buffer << "}"; + } + buffer << "}}"; return buffer.str(); } diff --git a/src/ray/common/scheduling/resource_instance_set.h b/src/ray/common/scheduling/resource_instance_set.h index ee65ef77c842..3463638ce597 100644 --- a/src/ray/common/scheduling/resource_instance_set.h +++ b/src/ray/common/scheduling/resource_instance_set.h @@ -87,6 +87,12 @@ class NodeResourceInstanceSet { return resources_; } + /// Only for testing. + const absl::flat_hash_map> + &PgIndexedResources() const { + return pg_indexed_resources_; + } + private: /// Allocate enough capacity across the instances of a resource to satisfy "demand". /// @@ -108,8 +114,26 @@ class NodeResourceInstanceSet { std::optional> TryAllocate(ResourceID resource_id, FixedPoint demand); + /// Allocate resource to the resource_id based on a provided reference allocation. + /// The function is used for placement group allocation. Making the allocation of + /// the wildcard resource be identical to the indexed resource allocation. + /// + /// The function assumes and also verifies that (1) the resource_id exists in the + /// node; (2) the available resources with resource_id on the node can satisfy the + /// provided ref_allocation. + /// + /// \param ref_allocation: The reference allocation used to allocate the resource_id + /// \param resource_id: The id of the resource to be allocated + void AllocateWithReference(const std::vector &ref_allocation, + ResourceID resource_id); + /// Map from the resource IDs to the resource instance values. absl::flat_hash_map> resources_; + + /// Map from the original resource IDs to the actual resource IDs for all the indexed + /// placement group resources. This map should be treated as a derived map from the + /// resources_ map and should always be consistent with the _resource map. + absl::flat_hash_map> pg_indexed_resources_; }; } // namespace ray diff --git a/src/ray/common/test/resource_instance_set_test.cc b/src/ray/common/test/resource_instance_set_test.cc index 2c5d40a942cb..d2546e0a68df 100644 --- a/src/ray/common/test/resource_instance_set_test.cc +++ b/src/ray/common/test/resource_instance_set_test.cc @@ -93,35 +93,722 @@ TEST_F(NodeResourceInstanceSetTest, TestOperator) { ASSERT_FALSE(r1 == r3); } -TEST_F(NodeResourceInstanceSetTest, TestTryAllocate) { +TEST_F(NodeResourceInstanceSetTest, TestTryAllocateOneResourceWithoutPlacementGroup) { + // 1. Test non-unit resource + { + // Allocation succeed when demand is smaller than available + NodeResourceInstanceSet r1 = NodeResourceInstanceSet(NodeResourceSet({{"CPU", 2}})); + ResourceSet success_request = ResourceSet({{"CPU", FixedPoint(1)}}); + auto allocations = r1.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID("CPU")], + std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(ResourceID("CPU")), std::vector({FixedPoint(1)})); + + // Allocation failed when demand is larger than available + ResourceSet fail_request = ResourceSet({{"CPU", FixedPoint(2)}}); + allocations = r1.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + // Make sure nothing is allocated when allocation fails. + ASSERT_EQ(r1.Get(ResourceID("CPU")), std::vector({FixedPoint(1)})); + + // Allocation succeed when demand equals available + success_request = ResourceSet({{"CPU", FixedPoint(1)}}); + allocations = r1.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID("CPU")], + std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(ResourceID("CPU")), std::vector({FixedPoint(0)})); + } + + // 2. Test unit resource + { + // Succees allocation with demand > 1 + NodeResourceInstanceSet r2 = NodeResourceInstanceSet(NodeResourceSet({{"GPU", 4}})); + ResourceSet success_request = ResourceSet({{"GPU", FixedPoint(2)}}); + auto allocations = r2.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID("GPU")], + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ(r2.Get(ResourceID("GPU")), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(1)})); + + // Failed allocation when demand > available + ResourceSet fail_request = ResourceSet({{"GPU", FixedPoint(3)}}); + allocations = r2.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + // Make sure nothing is allocated when allocation fails. + ASSERT_EQ(r2.Get(ResourceID("GPU")), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(1)})); + + // Success allocation with fractional demand + // Should be allocated with best fit + success_request = ResourceSet({{"GPU", FixedPoint(0.4)}}); + allocations = r2.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID("GPU")], + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0.4), FixedPoint(0)})); + ASSERT_EQ(r2.Get(ResourceID("GPU")), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0.6), FixedPoint(1)})); + + success_request = ResourceSet({{"GPU", FixedPoint(0.7)}}); + allocations = r2.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID("GPU")], + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0.7)})); + ASSERT_EQ(r2.Get(ResourceID("GPU")), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0.6), FixedPoint(0.3)})); + + success_request = ResourceSet({{"GPU", FixedPoint(0.3)}}); + allocations = r2.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID("GPU")], + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0.3)})); + ASSERT_EQ(r2.Get(ResourceID("GPU")), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0.6), FixedPoint(0)})); + } + + // 3. Test implicit resource + { + NodeResourceInstanceSet r3 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + ResourceSet success_request = + ResourceSet({{std::string(kImplicitResourcePrefix) + "a", FixedPoint(0.3)}}); + auto allocations = r3.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID(std::string(kImplicitResourcePrefix) + "a")], + std::vector({FixedPoint(0.3)})); + ASSERT_EQ(r3.Get(ResourceID(std::string(kImplicitResourcePrefix) + "a")), + std::vector({FixedPoint(0.7)})); + } +} + +TEST_F(NodeResourceInstanceSetTest, TestTryAllocateMultipleResourcesWithoutPg) { + // Case 1: Partial failure will not allocate anything NodeResourceInstanceSet r1 = NodeResourceInstanceSet(NodeResourceSet({{"CPU", 2}, {"GPU", 2}})); NodeResourceInstanceSet r2 = NodeResourceInstanceSet(NodeResourceSet({{"CPU", 2}, {"GPU", 2}})); - // Allocation fails. - auto allocations = - r1.TryAllocate(ResourceSet({{"CPU", FixedPoint(1)}, {"GPU", FixedPoint(3)}})); + ResourceSet fail_request = + ResourceSet({{"CPU", FixedPoint(1)}, + {"GPU", FixedPoint(3)}, + {std::string(kImplicitResourcePrefix) + "a", FixedPoint(0.3)}}); + auto allocations = r1.TryAllocate(fail_request); ASSERT_FALSE(allocations); - // Make sure nothing is allocated when allocation fails. ASSERT_TRUE(r1 == r2); - allocations = r1.TryAllocate( + // Case 2: All success, will allocate all the resources + ResourceSet success_request = ResourceSet({{"CPU", FixedPoint(1)}, {"GPU", FixedPoint(1)}, - {std::string(kImplicitResourcePrefix) + "a", FixedPoint(0.3)}})); + {std::string(kImplicitResourcePrefix) + "a", FixedPoint(0.3)}}); + allocations = r1.TryAllocate(success_request); ASSERT_EQ(allocations->size(), 3); ASSERT_EQ((*allocations)[ResourceID("CPU")], std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(ResourceID("CPU")), std::vector({FixedPoint(1)})); ASSERT_EQ((*allocations)[ResourceID("GPU")], std::vector({FixedPoint(1), FixedPoint(0)})); - ASSERT_EQ((*allocations)[ResourceID(std::string(kImplicitResourcePrefix) + "a")], - std::vector({FixedPoint(0.3)})); - ASSERT_EQ(r1.Get(ResourceID("CPU")), std::vector({FixedPoint(1)})); ASSERT_EQ(r1.Get(ResourceID("GPU")), std::vector({FixedPoint(0), FixedPoint(1)})); + ASSERT_EQ((*allocations)[ResourceID(std::string(kImplicitResourcePrefix) + "a")], + std::vector({FixedPoint(0.3)})); ASSERT_EQ(r1.Get(ResourceID(std::string(kImplicitResourcePrefix) + "a")), std::vector({FixedPoint(0.7)})); } +TEST_F(NodeResourceInstanceSetTest, TestTryAllocateWithSinglePgResourceAndBundleIndex) { + // 1. Test non unit resource + { + // Success allocation when the index bundle have enough resources + ResourceID cpu_resource("CPU"); + ResourceID pg_cpu_wildcard_resource("CPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_0_resource( + "CPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_1_resource( + "CPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + NodeResourceInstanceSet r1 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + r1.Set(cpu_resource, std::vector({FixedPoint(1)})); + r1.Set(pg_cpu_wildcard_resource, std::vector({FixedPoint(4)})); + r1.Set(pg_cpu_index_0_resource, std::vector({FixedPoint(2)})); + r1.Set(pg_cpu_index_1_resource, std::vector({FixedPoint(2)})); + + ResourceSet success_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(1)}, + {pg_cpu_index_1_resource, FixedPoint(1)}}); + auto allocations = r1.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 2); + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + (*allocations)[pg_cpu_index_1_resource]); + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(1)})); + + // Failed allocation when the index bundle doesn't have enough resources, even though + // the pg still has enough resources + ResourceSet fail_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(2)}, + {pg_cpu_index_1_resource, FixedPoint(2)}}); + allocations = r1.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(1)})); + } + + // 2. Test unit resource + { + // Success allocation when the index bundle have enough resources + ResourceID gpu_resource("GPU"); + ResourceID pg_gpu_wildcard_resource("GPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_0_resource( + "GPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_1_resource( + "GPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + NodeResourceInstanceSet r2 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + r2.Set( + gpu_resource, + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1)})); + r2.Set( + pg_gpu_wildcard_resource, + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(1), FixedPoint(1), FixedPoint(0)})); + r2.Set( + pg_gpu_index_0_resource, + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0), FixedPoint(0)})); + r2.Set(pg_gpu_index_1_resource, + std::vector({{FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)}})); + + ResourceSet success_request = ResourceSet({{pg_gpu_wildcard_resource, FixedPoint(1)}, + {pg_gpu_index_1_resource, FixedPoint(1)}}); + auto allocations = r2.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 2); + // Make sure the allocations are consistent between wildcard resource and indexed + // resource + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_1_resource]); + ASSERT_EQ( + (*allocations)[pg_gpu_wildcard_resource], + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ( + r2.Get(gpu_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1)})); + ASSERT_EQ( + r2.Get(pg_gpu_wildcard_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(1), FixedPoint(0)})); + ASSERT_EQ( + r2.Get(pg_gpu_index_0_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ( + r2.Get(pg_gpu_index_1_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(0)})); + + // Failed allocation when the index bundle doesn't have enough resources, even though + // the pg still has enough resources + ResourceSet fail_request = ResourceSet({{pg_gpu_wildcard_resource, FixedPoint(2)}, + {pg_gpu_index_1_resource, FixedPoint(2)}}); + allocations = r2.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + ASSERT_EQ( + r2.Get(gpu_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1)})); + ASSERT_EQ( + r2.Get(pg_gpu_wildcard_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(1), FixedPoint(0)})); + ASSERT_EQ( + r2.Get(pg_gpu_index_0_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ( + r2.Get(pg_gpu_index_1_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(0)})); + } +} + +TEST_F(NodeResourceInstanceSetTest, TestTryAllocateWithMultiplePgResourceAndBundleIndex) { + // Case 1: Partial failure will not allocate anything + ResourceID cpu_resource("CPU"); + ResourceID pg_cpu_wildcard_resource("CPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_0_resource("CPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_1_resource("CPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID gpu_resource("GPU"); + ResourceID pg_gpu_wildcard_resource("GPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_0_resource("GPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_1_resource("GPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + + NodeResourceInstanceSet r1 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + r1.Set(cpu_resource, std::vector({FixedPoint(1)})); + r1.Set(pg_cpu_wildcard_resource, std::vector({FixedPoint(4)})); + r1.Set(pg_cpu_index_0_resource, std::vector({FixedPoint(2)})); + r1.Set(pg_cpu_index_1_resource, std::vector({FixedPoint(2)})); + r1.Set( + gpu_resource, + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1)})); + r1.Set( + pg_gpu_wildcard_resource, + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(1), FixedPoint(1), FixedPoint(0)})); + r1.Set( + pg_gpu_index_0_resource, + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0), FixedPoint(0)})); + r1.Set( + pg_gpu_index_1_resource, + std::vector( + {{FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(1), FixedPoint(0)}})); + + ResourceSet fail_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(1)}, + {pg_gpu_wildcard_resource, FixedPoint(3)}, + {pg_cpu_index_1_resource, FixedPoint(1)}, + {pg_gpu_index_1_resource, FixedPoint(3)}}); + auto allocations = r1.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(4)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ( + r1.Get(gpu_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1)})); + ASSERT_EQ( + r1.Get(pg_gpu_wildcard_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(1), FixedPoint(1), FixedPoint(0)})); + ASSERT_EQ( + r1.Get(pg_gpu_index_0_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ( + r1.Get(pg_gpu_index_1_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(1), FixedPoint(0)})); + + // Case 2: All success, will allocate all the resources + ResourceSet success_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(1)}, + {pg_gpu_wildcard_resource, FixedPoint(1)}, + {pg_cpu_index_1_resource, FixedPoint(1)}, + {pg_gpu_index_1_resource, FixedPoint(1)}}); + allocations = r1.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 4); + // Make sure the allocations are consistent between wildcard resource and indexed + // resource + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + (*allocations)[pg_cpu_index_1_resource]); + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + std::vector({FixedPoint(1)})); + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_1_resource]); + ASSERT_EQ( + (*allocations)[pg_gpu_wildcard_resource], + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ( + r1.Get(gpu_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1)})); + ASSERT_EQ( + r1.Get(pg_gpu_wildcard_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(1), FixedPoint(0)})); + ASSERT_EQ( + r1.Get(pg_gpu_index_0_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ( + r1.Get(pg_gpu_index_1_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(0)})); +} + +TEST_F(NodeResourceInstanceSetTest, TestTryAllocateSinglePgResourceAndNoBundleIndex) { + // 1. Test non unit resource + { + // Success allocation when found a bundle withe enough available resources + // Make sure it is the first available bundle + ResourceID cpu_resource("CPU"); + ResourceID pg_cpu_wildcard_resource("CPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_0_resource( + "CPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_1_resource( + "CPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_2_resource( + "CPU_group_2_4482dec0faaf5ead891ff1659a9501000000"); + NodeResourceInstanceSet r1 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + r1.Set(cpu_resource, std::vector({FixedPoint(1)})); + r1.Set(pg_cpu_wildcard_resource, std::vector({FixedPoint(5)})); + r1.Set(pg_cpu_index_0_resource, std::vector({FixedPoint(1)})); + r1.Set(pg_cpu_index_1_resource, std::vector({FixedPoint(2)})); + r1.Set(pg_cpu_index_2_resource, std::vector({FixedPoint(2)})); + + ResourceSet success_request = + ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(2)}}); + auto allocations = r1.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 2); + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + (*allocations)[pg_cpu_index_1_resource]); + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), std::vector({FixedPoint(2)})); + + // Failed allocation when no index bundle have enough resources, even though the pg + // still has enough resources + ResourceSet fail_request = ResourceSet({ + {pg_cpu_wildcard_resource, FixedPoint(3)}, + }); + allocations = r1.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), std::vector({FixedPoint(2)})); + } + + // 2. Test unit resource + { + // Success allocation when found a bundle with enough available resources + // Make sure it is the first available bundle + ResourceID gpu_resource("GPU"); + ResourceID pg_gpu_wildcard_resource("GPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_0_resource( + "GPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_1_resource( + "GPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_2_resource( + "GPU_group_2_4482dec0faaf5ead891ff1659a9501000000"); + NodeResourceInstanceSet r2 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + r2.Set(gpu_resource, + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1)})); + r2.Set(pg_gpu_wildcard_resource, + std::vector({FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + r2.Set(pg_gpu_index_0_resource, + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + r2.Set(pg_gpu_index_1_resource, + std::vector({{FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)}})); + r2.Set(pg_gpu_index_2_resource, + std::vector({{FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)}})); + + ResourceSet success_request = + ResourceSet({{pg_gpu_wildcard_resource, FixedPoint(2)}}); + auto allocations = r2.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 2); + // Make sure the allocations are consistent between wildcard resource and indexed + // resource + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_1_resource]); + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(gpu_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1)})); + ASSERT_EQ(r2.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_0_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + + // Failed allocation when the index bundle doesn't have enough resources, even though + // the pg still has enough resources + ResourceSet fail_request = ResourceSet({{pg_gpu_wildcard_resource, FixedPoint(3)}}); + allocations = r2.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + ASSERT_EQ(r2.Get(gpu_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1)})); + ASSERT_EQ(r2.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_0_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + } +} + +TEST_F(NodeResourceInstanceSetTest, TestTryAllocateMultiplePgResourceAndNoBundleIndex) { + // Case 1: Partial failure will not allocate anything + ResourceID cpu_resource("CPU"); + ResourceID pg_cpu_wildcard_resource("CPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_0_resource("CPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_1_resource("CPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_2_resource("CPU_group_2_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID gpu_resource("GPU"); + ResourceID pg_gpu_wildcard_resource("GPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_0_resource("GPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_1_resource("GPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_2_resource("GPU_group_2_4482dec0faaf5ead891ff1659a9501000000"); + + NodeResourceInstanceSet r1 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + r1.Set(cpu_resource, std::vector({FixedPoint(1)})); + r1.Set(pg_cpu_wildcard_resource, std::vector({FixedPoint(5)})); + r1.Set(pg_cpu_index_0_resource, std::vector({FixedPoint(1)})); + r1.Set(pg_cpu_index_1_resource, std::vector({FixedPoint(2)})); + r1.Set(pg_cpu_index_2_resource, std::vector({FixedPoint(2)})); + r1.Set(gpu_resource, + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1)})); + r1.Set(pg_gpu_wildcard_resource, + std::vector({FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + r1.Set(pg_gpu_index_0_resource, + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + r1.Set(pg_gpu_index_1_resource, + std::vector({{FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)}})); + r1.Set(pg_gpu_index_2_resource, + std::vector({{FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)}})); + + ResourceSet fail_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(2)}, + {pg_gpu_wildcard_resource, FixedPoint(3)}}); + auto allocations = r1.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(5)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(gpu_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_0_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + + // Case 2: All success, will allocate all the resources + ResourceSet success_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(2)}, + {pg_gpu_wildcard_resource, FixedPoint(2)}}); + allocations = r1.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 4); + // Make sure the allocations are consistent between wildcard resource and indexed + // resource + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + (*allocations)[pg_cpu_index_1_resource]); + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + std::vector({FixedPoint(2)})); + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_1_resource]); + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(gpu_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_0_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); +} + TEST_F(NodeResourceInstanceSetTest, TestFree) { NodeResourceInstanceSet r1; r1.Set(ResourceID("GPU"), std::vector({FixedPoint(1), FixedPoint(0.3)})); diff --git a/src/ray/raylet/placement_group_resource_manager_test.cc b/src/ray/raylet/placement_group_resource_manager_test.cc index e650898a6e92..c6e57e45abdd 100644 --- a/src/ray/raylet/placement_group_resource_manager_test.cc +++ b/src/ray/raylet/placement_group_resource_manager_test.cc @@ -78,6 +78,8 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test { TEST_F(NewPlacementGroupResourceManagerTest, TestGetOriginalResourceNameFromWildcardResource) { + scheduling::ResourceID resourceId = scheduling::ResourceID("GPU"); + ASSERT_EQ(resourceId, ResourceID::GPU()); ASSERT_EQ(GetOriginalResourceNameFromWildcardResource( "CPU_group_0_4482dec0faaf5ead891ff1659a9501000000"), ""); @@ -191,25 +193,20 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewPrepareBundleDuringDraining) ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundles(bundle2_specs)); // Prepared bundles can still be committed. new_placement_group_resource_manager_->CommitBundles(bundle1_specs); - absl::flat_hash_map remaining_resources = { + absl::flat_hash_map remaining_resources_total = { {"CPU_group_" + group1_id.Hex(), 1.0}, {"CPU_group_1_" + group1_id.Hex(), 1.0}, {"CPU", 2.0}, {"bundle_group_1_" + group1_id.Hex(), 1000}, {"bundle_group_" + group1_id.Hex(), 1000}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - unit_resource, resource_instances)); + absl::flat_hash_map remaining_resources_avail = { + {"CPU_group_" + group1_id.Hex(), 1.0}, + {"CPU_group_1_" + group1_id.Hex(), 1.0}, + {"CPU", 1.0}, + {"bundle_group_1_" + group1_id.Hex(), 1000}, + {"bundle_group_" + group1_id.Hex(), 1000}}; auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -225,25 +222,20 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) { ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); new_placement_group_resource_manager_->CommitBundles(bundle_specs); /// 4. check remaining resources is correct. - absl::flat_hash_map remaining_resources = { + absl::flat_hash_map remaining_resources_total = { {"CPU_group_" + group_id.Hex(), 1.0}, {"CPU_group_1_" + group_id.Hex(), 1.0}, {"CPU", 1.0}, {"bundle_group_1_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 1000}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - unit_resource, resource_instances)); + absl::flat_hash_map remaining_resources_avail = { + {"CPU_group_" + group_id.Hex(), 1.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU", 0.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 1000}}; auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -263,11 +255,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) { /// 4. return bundle resource. ASSERT_TRUE(new_placement_group_resource_manager_->ReturnBundle(bundle_spec).ok()); /// 5. check remaining resources is correct. - auto remaining_resource_scheduler = std::make_shared( - io_context, scheduling::NodeID("remaining"), unit_resource, is_node_available_fn_); auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(unit_resource, unit_resource); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -292,7 +281,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu new_placement_group_resource_manager_->CommitBundles( ConvertSingleSpecToVectorPtrs(second_bundle_spec)); /// 4. check remaining resources is correct after commit phase. - absl::flat_hash_map remaining_resources = { + absl::flat_hash_map remaining_resources_total = { {"CPU_group_" + group_id.Hex(), 2.0}, {"CPU_group_1_" + group_id.Hex(), 1.0}, {"CPU_group_2_" + group_id.Hex(), 1.0}, @@ -300,58 +289,41 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu {"bundle_group_1_" + group_id.Hex(), 1000}, {"bundle_group_2_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 2000}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - init_unit_resource, resource_instances)); + absl::flat_hash_map remaining_resources_avail = { + {"CPU_group_" + group_id.Hex(), 2.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU_group_2_" + group_id.Hex(), 1.0}, + {"CPU", 0.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_2_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 2000}}; auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); - + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); CheckRemainingResourceCorrect(remaining_resource_instance); /// 5. return second bundle. ASSERT_TRUE( new_placement_group_resource_manager_->ReturnBundle(second_bundle_spec).ok()); /// 6. check remaining resources is correct after return second bundle. - remaining_resources = {{"CPU_group_" + group_id.Hex(), 2.0}, - {"CPU_group_1_" + group_id.Hex(), 1.0}, - {"CPU", 2.0}, - {"bundle_group_1_" + group_id.Hex(), 1000}, - {"bundle_group_" + group_id.Hex(), 2000}}; - remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - {{"CPU_group_" + group_id.Hex(), 1.0}, - {"CPU", 1.0}, - {"bundle_group_" + group_id.Hex(), 1000}}, - resource_instances)); + remaining_resources_total = {{"CPU_group_" + group_id.Hex(), 2.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU", 2.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 2000}}; + remaining_resources_avail = {{"CPU_group_" + group_id.Hex(), 1.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU", 1.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 1000}}; remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); CheckRemainingResourceCorrect(remaining_resource_instance); /// 7. return first bundle. ASSERT_TRUE( new_placement_group_resource_manager_->ReturnBundle(first_bundle_spec).ok()); /// 8. check remaining resources is correct after all bundle returned. - remaining_resources = {{"CPU", 2.0}}; - remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); + remaining_resources_total = {{"CPU", 2.0}}; remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_total); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -370,20 +342,10 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare) ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); } /// 4. check remaining resources is correct. - absl::flat_hash_map remaining_resources = {{"CPU", 3.0}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - unit_resource, resource_instances)); + absl::flat_hash_map remaining_resources_total = {{"CPU", 3.0}}; + absl::flat_hash_map remaining_resources_avail = {{"CPU", 2.0}}; auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -405,25 +367,20 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( ConvertSingleSpecToVectorPtrs(bundle_spec))); /// 4. check remaining resources is correct. - absl::flat_hash_map remaining_resources = { + absl::flat_hash_map remaining_resources_total = { {"CPU_group_" + group_id.Hex(), 1.0}, {"CPU_group_1_" + group_id.Hex(), 1.0}, {"CPU", 3.0}, {"bundle_group_1_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 1000}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - unit_resource, resource_instances)); + absl::flat_hash_map remaining_resources_avail = { + {"CPU_group_" + group_id.Hex(), 1.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU", 2.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 1000}}; auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); CheckRemainingResourceCorrect(remaining_resource_instance); ASSERT_TRUE(new_placement_group_resource_manager_->ReturnBundle(bundle_spec).ok()); // 5. prepare bundle -> commit bundle -> commit bundle. @@ -443,14 +400,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) new_placement_group_resource_manager_->CommitBundles( ConvertSingleSpecToVectorPtrs(bundle_spec)); // 8. check remaining resources is correct. - remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - available_resource, - is_node_available_fn_); remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(available_resource, available_resource); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -470,14 +421,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) { ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); // 4. check remaining resources is correct. absl::flat_hash_map remaining_resources = {{"CPU", 3.0}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources, remaining_resources); CheckRemainingResourceCorrect(remaining_resource_instance); // 5. re-init the local available resource with 4 CPUs. available_resource = {std::make_pair("CPU", 4.0)}; @@ -488,32 +433,32 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) { ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); new_placement_group_resource_manager_->CommitBundles(bundle_specs); // 7. re-check remaining resources is correct. - remaining_resources = {{"CPU_group_" + group_id.Hex(), 4.0}, - {"CPU_group_1_" + group_id.Hex(), 1.0}, - {"CPU_group_2_" + group_id.Hex(), 1.0}, - {"CPU_group_3_" + group_id.Hex(), 1.0}, - {"CPU_group_4_" + group_id.Hex(), 1.0}, - {"CPU", 4.0}, - {"bundle_group_1_" + group_id.Hex(), 1000}, - {"bundle_group_2_" + group_id.Hex(), 1000}, - {"bundle_group_3_" + group_id.Hex(), 1000}, - {"bundle_group_4_" + group_id.Hex(), 1000}, - {"bundle_group_" + group_id.Hex(), 4000}}; - remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - absl::flat_hash_map allocating_resource; - allocating_resource.insert({"CPU", 4.0}); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - allocating_resource, resource_instances)); + absl::flat_hash_map remaining_resources_total = { + {"CPU_group_" + group_id.Hex(), 4.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU_group_2_" + group_id.Hex(), 1.0}, + {"CPU_group_3_" + group_id.Hex(), 1.0}, + {"CPU_group_4_" + group_id.Hex(), 1.0}, + {"CPU", 4.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_2_" + group_id.Hex(), 1000}, + {"bundle_group_3_" + group_id.Hex(), 1000}, + {"bundle_group_4_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 4000}}; + absl::flat_hash_map remaining_resources_avail = { + {"CPU_group_" + group_id.Hex(), 4.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU_group_2_" + group_id.Hex(), 1.0}, + {"CPU_group_3_" + group_id.Hex(), 1.0}, + {"CPU_group_4_" + group_id.Hex(), 1.0}, + {"CPU", 0.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_2_" + group_id.Hex(), 1000}, + {"bundle_group_3_" + group_id.Hex(), 1000}, + {"bundle_group_4_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 4000}}; remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); RAY_LOG(INFO) << "The current local resource view: " << cluster_resource_scheduler_->DebugString(); CheckRemainingResourceCorrect(remaining_resource_instance); @@ -523,11 +468,11 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) { // 1. create a placement group spec with 4 bundles and each required 1 CPU. auto group_id = PlacementGroupID::Of(JobID::FromInt(1)); absl::flat_hash_map unit_resource; - unit_resource.insert({"CPU", 1.0}); + unit_resource.insert({"GPU", 2.0}); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 4); // 2. init local available resource with 4 CPUs. absl::flat_hash_map available_resource = { - std::make_pair("CPU", 4.0)}; + std::make_pair("GPU", 10.0)}; InitLocalAvailableResource(available_resource); // 3. prepare resources for the four bundles and make sure it succeeds. ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); @@ -536,33 +481,32 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) { // make sure it keeps Idempotency. new_placement_group_resource_manager_->CommitBundles(bundle_specs); // 5. check remaining resources is correct. - absl::flat_hash_map remaining_resources = { - {"CPU_group_" + group_id.Hex(), 4.0}, - {"CPU_group_1_" + group_id.Hex(), 1.0}, - {"CPU_group_2_" + group_id.Hex(), 1.0}, - {"CPU_group_3_" + group_id.Hex(), 1.0}, - {"CPU_group_4_" + group_id.Hex(), 1.0}, - {"CPU", 4.0}, + absl::flat_hash_map remaining_resources_total = { + {"GPU_group_" + group_id.Hex(), 8.0}, + {"GPU_group_1_" + group_id.Hex(), 2.0}, + {"GPU_group_2_" + group_id.Hex(), 2.0}, + {"GPU_group_3_" + group_id.Hex(), 2.0}, + {"GPU_group_4_" + group_id.Hex(), 2.0}, + {"GPU", 10.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_2_" + group_id.Hex(), 1000}, + {"bundle_group_3_" + group_id.Hex(), 1000}, + {"bundle_group_4_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 4000}}; + absl::flat_hash_map remaining_resources_avail = { + {"GPU_group_" + group_id.Hex(), 8.0}, + {"GPU_group_1_" + group_id.Hex(), 2.0}, + {"GPU_group_2_" + group_id.Hex(), 2.0}, + {"GPU_group_3_" + group_id.Hex(), 2.0}, + {"GPU_group_4_" + group_id.Hex(), 2.0}, + {"GPU", 2.0}, {"bundle_group_1_" + group_id.Hex(), 1000}, {"bundle_group_2_" + group_id.Hex(), 1000}, {"bundle_group_3_" + group_id.Hex(), 1000}, {"bundle_group_4_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 4000}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - absl::flat_hash_map allocating_resource; - allocating_resource.insert({"CPU", 4.0}); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - allocating_resource, resource_instances)); auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); RAY_LOG(INFO) << "The current local resource view: " << cluster_resource_scheduler_->DebugString(); CheckRemainingResourceCorrect(remaining_resource_instance); From b26680936c2adfd6c850eb5cf92d15a368dc6abd Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Thu, 17 Oct 2024 13:01:40 -0700 Subject: [PATCH 2/9] Fix CI test build issue Signed-off-by: Mengjin Yan --- src/ray/common/scheduling/resource_instance_set.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/common/scheduling/resource_instance_set.cc b/src/ray/common/scheduling/resource_instance_set.cc index 74cf15375b9a..8bebc3a1812b 100644 --- a/src/ray/common/scheduling/resource_instance_set.cc +++ b/src/ray/common/scheduling/resource_instance_set.cc @@ -319,7 +319,7 @@ void NodeResourceInstanceSet::AllocateWithReference( RAY_CHECK(!available.empty()); RAY_CHECK_EQ(available.size(), ref_allocation.size()); - for (int i = 0; i < ref_allocation.size(); i++) { + for (size_t i = 0; i < ref_allocation.size(); i++) { RAY_CHECK_GE(available[i], ref_allocation[i]); available[i] -= ref_allocation[i]; } From e9ced9f3ac96e9c0c2be28e44d1e31531d07d970 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Fri, 25 Oct 2024 09:50:12 -0700 Subject: [PATCH 3/9] Fix the bug where the pg_id is ignored when allocating indexed resources in the case where the bundle id is not specified Signed-off-by: Mengjin Yan --- .../scheduling/resource_instance_set.cc | 72 ++++++++++++------- .../common/scheduling/resource_instance_set.h | 16 +++-- 2 files changed, 58 insertions(+), 30 deletions(-) diff --git a/src/ray/common/scheduling/resource_instance_set.cc b/src/ray/common/scheduling/resource_instance_set.cc index 8bebc3a1812b..547eb8f8132f 100644 --- a/src/ray/common/scheduling/resource_instance_set.cc +++ b/src/ray/common/scheduling/resource_instance_set.cc @@ -52,11 +52,15 @@ void NodeResourceInstanceSet::Remove(ResourceID resource_id) { /*for_indexed_resource=*/true); if (data) { ResourceID original_resource_id(data->original_resource); - absl::flat_hash_set &resource_set = + absl::flat_hash_map> &pg_resource_map = pg_indexed_resources_[original_resource_id]; + absl::flat_hash_set &resource_set = pg_resource_map[data->group_id]; resource_set.erase(resource_id); if (resource_set.empty()) { + pg_resource_map.erase(data->group_id); + } + if (pg_resource_map.empty()) { pg_indexed_resources_.erase(original_resource_id); } } @@ -92,7 +96,8 @@ NodeResourceInstanceSet &NodeResourceInstanceSet::Set(ResourceID resource_id, /*for_wildcard_resource=*/false, /*for_indexed_resource=*/true); if (data) { - pg_indexed_resources_[ResourceID(data->original_resource)].emplace(resource_id); + pg_indexed_resources_[ResourceID(data->original_resource)][data->group_id].emplace( + resource_id); } } return *this; @@ -168,26 +173,33 @@ NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { // the resource requirement for a single resource type std::vector wildcard_allocation; const ResourceID *wildcard_resource_id = nullptr; + const std::string *pg_id = nullptr; // Allocate indexed resource if (resource_id_vector.size() == 1) { // The case where no bundle index is specified - // Iterate through the bundles to find the first one with enough space + // Iterate through the bundles with the same original resource and pg_id to find + // the first one with enough space bool found = false; wildcard_resource_id = &resource_id_vector[0].first; - auto index_resources = pg_indexed_resources_.find(original_resource_id); - if (index_resources != pg_indexed_resources_.end()) { - for (ResourceID indexed_resource_id : index_resources->second) { - if (Has(indexed_resource_id)) { - auto allocation = TryAllocate( - indexed_resource_id, resource_demands.Get(resource_id_vector[0].first)); - - if (allocation) { - // Found the allocation in a bundle - wildcard_allocation = *allocation; - allocations[indexed_resource_id] = std::move(*allocation); - found = true; - break; + pg_id = &resource_id_vector[0].second.group_id; + + auto pg_index_resources = pg_indexed_resources_.find(original_resource_id); + if (pg_index_resources != pg_indexed_resources_.end()) { + auto index_resources = pg_index_resources->second.find(*pg_id); + if (index_resources != pg_index_resources->second.end()) { + for (ResourceID indexed_resource_id : index_resources->second) { + if (Has(indexed_resource_id)) { + auto allocation = TryAllocate( + indexed_resource_id, resource_demands.Get(resource_id_vector[0].first)); + + if (allocation) { + // Found the allocation in a bundle + wildcard_allocation = *allocation; + allocations[indexed_resource_id] = std::move(*allocation); + found = true; + break; + } } } } @@ -202,7 +214,7 @@ NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { } } else { // The case where the bundle index is specified - // The each resource type, both the wildcard resource and the indexed resource + // For each resource type, both the wildcard resource and the indexed resource // should be in the resource_demand for (const std::pair &pair : resource_id_vector) { @@ -385,7 +397,7 @@ std::vector NodeResourceInstanceSet::Subtract( std::string NodeResourceInstanceSet::DebugString() const { std::stringstream buffer; - buffer << "{{"; + buffer << "{resources_:{"; bool first = true; for (const auto &[id, quantity] : resources_) { if (!first) { @@ -394,23 +406,33 @@ std::string NodeResourceInstanceSet::DebugString() const { first = false; buffer << id.Binary() << ": " << FixedPointVectorToString(quantity); } - buffer << "}, {"; + buffer << "}, pg_indexed_resources_:{"; first = true; - for (const auto &[original_id, indexed_ids] : pg_indexed_resources_) { + for (const auto &[original_id, pg_resource_id_map] : pg_indexed_resources_) { if (!first) { buffer << ", "; } first = false; buffer << original_id.Binary() << ": {"; - bool firstInSet = true; - for (const auto &index_id : indexed_ids) { - if (!firstInSet) { + bool firstInMap = true; + for (const auto &[pg_id, indexed_ids] : pg_resource_id_map) { + if (!firstInMap) { buffer << ", "; } - firstInSet = false; - buffer << index_id.Binary(); + firstInMap = false; + + buffer << pg_id << ": {"; + bool firstInSet = true; + for (const auto &index_id : indexed_ids) { + if (!firstInSet) { + buffer << ", "; + } + firstInSet = false; + buffer << index_id.Binary(); + } + buffer << "}"; } buffer << "}"; } diff --git a/src/ray/common/scheduling/resource_instance_set.h b/src/ray/common/scheduling/resource_instance_set.h index 3463638ce597..4a3b833a25e3 100644 --- a/src/ray/common/scheduling/resource_instance_set.h +++ b/src/ray/common/scheduling/resource_instance_set.h @@ -88,7 +88,9 @@ class NodeResourceInstanceSet { } /// Only for testing. - const absl::flat_hash_map> + const absl::flat_hash_map< + ResourceID, + absl::flat_hash_map>> &PgIndexedResources() const { return pg_indexed_resources_; } @@ -130,10 +132,14 @@ class NodeResourceInstanceSet { /// Map from the resource IDs to the resource instance values. absl::flat_hash_map> resources_; - /// Map from the original resource IDs to the actual resource IDs for all the indexed - /// placement group resources. This map should be treated as a derived map from the - /// resources_ map and should always be consistent with the _resource map. - absl::flat_hash_map> pg_indexed_resources_; + /// This is a derived map from the resources_ map. The map aggregates all the current + /// placement group indexed resources in resources_ by their original resource id and + /// pd id. The key of the map is the original resource id. The value is a map of pg id + /// to the corresponding placement group indexed resource ids. This map should always + /// be consistent with the _resource map. + absl::flat_hash_map>> + pg_indexed_resources_; }; } // namespace ray From 795ea6cf5ff1f236d0d5935c86825fb5a58c4380 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Mon, 28 Oct 2024 13:17:27 -0700 Subject: [PATCH 4/9] Update src/ray/common/scheduling/resource_instance_set.cc Co-authored-by: SangBin Cho Signed-off-by: Mengjin Yan --- src/ray/common/scheduling/resource_instance_set.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/common/scheduling/resource_instance_set.cc b/src/ray/common/scheduling/resource_instance_set.cc index 547eb8f8132f..26f9d4ba09fd 100644 --- a/src/ray/common/scheduling/resource_instance_set.cc +++ b/src/ray/common/scheduling/resource_instance_set.cc @@ -184,7 +184,7 @@ NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { wildcard_resource_id = &resource_id_vector[0].first; pg_id = &resource_id_vector[0].second.group_id; - auto pg_index_resources = pg_indexed_resources_.find(original_resource_id); + auto pg_index_resources_it = pg_indexed_resources_.find(original_resource_id); if (pg_index_resources != pg_indexed_resources_.end()) { auto index_resources = pg_index_resources->second.find(*pg_id); if (index_resources != pg_index_resources->second.end()) { From 8bd2969c156deb613397d30d6df8bcab6aafbfba Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Mon, 28 Oct 2024 13:17:39 -0700 Subject: [PATCH 5/9] Update src/ray/common/scheduling/resource_instance_set.cc Co-authored-by: SangBin Cho Signed-off-by: Mengjin Yan --- src/ray/common/scheduling/resource_instance_set.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/common/scheduling/resource_instance_set.cc b/src/ray/common/scheduling/resource_instance_set.cc index 26f9d4ba09fd..2426c7ee0a8f 100644 --- a/src/ray/common/scheduling/resource_instance_set.cc +++ b/src/ray/common/scheduling/resource_instance_set.cc @@ -186,7 +186,7 @@ NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { auto pg_index_resources_it = pg_indexed_resources_.find(original_resource_id); if (pg_index_resources != pg_indexed_resources_.end()) { - auto index_resources = pg_index_resources->second.find(*pg_id); + auto index_resources_it = pg_index_resources->second.find(*pg_id); if (index_resources != pg_index_resources->second.end()) { for (ResourceID indexed_resource_id : index_resources->second) { if (Has(indexed_resource_id)) { From e0c0b153008513f17a394e6d96217c7edb1fc4df Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Wed, 30 Oct 2024 13:31:43 -0700 Subject: [PATCH 6/9] Fix comments Signed-off-by: Mengjin Yan --- .../scheduling/resource_instance_set.cc | 131 ++++++++++-------- .../common/scheduling/resource_instance_set.h | 46 ++++-- 2 files changed, 108 insertions(+), 69 deletions(-) diff --git a/src/ray/common/scheduling/resource_instance_set.cc b/src/ray/common/scheduling/resource_instance_set.cc index 2426c7ee0a8f..3b31e1bdb9d1 100644 --- a/src/ray/common/scheduling/resource_instance_set.cc +++ b/src/ray/common/scheduling/resource_instance_set.cc @@ -52,16 +52,20 @@ void NodeResourceInstanceSet::Remove(ResourceID resource_id) { /*for_indexed_resource=*/true); if (data) { ResourceID original_resource_id(data->original_resource); - absl::flat_hash_map> &pg_resource_map = - pg_indexed_resources_[original_resource_id]; - absl::flat_hash_set &resource_set = pg_resource_map[data->group_id]; - resource_set.erase(resource_id); - if (resource_set.empty()) { - pg_resource_map.erase(data->group_id); - } - if (pg_resource_map.empty()) { - pg_indexed_resources_.erase(original_resource_id); + auto pg_resource_map_it = pg_indexed_resources_.find(original_resource_id); + if (pg_resource_map_it != pg_indexed_resources_.end()) { + auto resource_set_it = pg_resource_map_it->second.find(data->group_id); + + if (resource_set_it != pg_resource_map_it->second.end()) { + resource_set_it->second.erase(resource_id); + if (resource_set_it->second.empty()) { + pg_resource_map_it->second.erase(data->group_id); + } + if (pg_resource_map_it->second.empty()) { + pg_indexed_resources_.erase(original_resource_id); + } + } } } } @@ -92,6 +96,13 @@ NodeResourceInstanceSet &NodeResourceInstanceSet::Set(ResourceID resource_id, resources_[resource_id] = std::move(instances); // Popluate the pg_indexed_resources_map_ + // TODO: The parsing of the resource_id String can be costly and impact the task + // creation throughput if the parting is required every time we allocate resources + // for a task and updating the available resources. The current benchmark shows no + // observable impact for now but in the furture, an idea of improvement is to add + // the placement group id as well as the bundle index inside the ResourceID class. + // And instead of parse the String, leveraging the fields in the ResourceID class + // directly. auto data = ParsePgFormattedResource(resource_id.Binary(), /*for_wildcard_resource=*/false, /*for_indexed_resource=*/true); @@ -125,19 +136,46 @@ NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { absl::flat_hash_map> allocations; // During resource allocation with a placement group, no matter whether the allocation - // requirement specifies a bundle index, we need to generate the allocation on both + // requirement specifies a bundle index, we generate the allocation on both // the wildcard resource and the indexed resource. The resource_demand shouldn't be // assigned across bundles. And If no bundle index is specified, we will iterate // through the bundles and find the first bundle that can fit the required resources. - // In addition, for unit resources, we need to make sure that the allocation on the + // In addition, for unit resources, we make sure that the allocation on the // wildcard resource and the indexed resource are consistent, meaning the same // instance ids should be allocated. - - // In the format of: - // key: original resource id, - // value: [resource id, parsed pg format resource data] - absl::flat_hash_map>> + // + // For example, considering the GPU resource on a host. Assuming the host has 3 GPUs + // and 1 placement group with 2 bundles. The bundle with index 1 contains 1 GPU and + // the bundle with index 2 contains 2 GPU. + // + // The current node resource can be as follows: + // resource id: total, available + // GPU: [1, 1, 1], [0, 0, 0] + // GPU_: [1, 1, 1], [1, 1, 1] + // GPU_1_: [1, 0, 0], [1, 0, 0] + // GPU_2_: [0, 1, 1], [0, 1, 1] + // + // Now, we want to allocate a task with 2 GPUs and in the placement group , + // reflecting in the following resource demand: + // GPU_ : 2 + // + // We will iterate though all the bundles in the placement group and bundle with + // index=2 has the required capacity. So we will allocate the task to the 2 GPUs in + // bundle 2 in placement group and the same allocation should be reflected in + // the wildcard GPU resource. So the allocation will be: + // GPU_ : [0, 1, 1] + // GPU_2_ : [0, 1, 1] + // + // And as a result, after the allocation, current node resource will be: + // resource id: total, available + // GPU: [1, 1, 1], [0, 0, 0] + // GPU_: [1, 1, 1], [1, 0, 0] + // GPU_1_: [1, 0, 0], [1, 0, 0] + // GPU_2_: [0, 1, 1], [0, 0, 0] + + absl::flat_hash_map>> pg_resource_map; for (const auto &[resource_id, demand] : resource_demands.Resources()) { @@ -171,7 +209,9 @@ NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { for (const auto &[original_resource_id, resource_id_vector] : pg_resource_map) { // Assuming exactly 1 placement group and at most 1 bundle index can be specified in // the resource requirement for a single resource type - std::vector wildcard_allocation; + // Also assuming the wildcard resource id will always exist in the resource_demands + // no matter how the resource requirement is specified in task + std::optional> wildcard_allocation; const ResourceID *wildcard_resource_id = nullptr; const std::string *pg_id = nullptr; @@ -185,10 +225,10 @@ NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { pg_id = &resource_id_vector[0].second.group_id; auto pg_index_resources_it = pg_indexed_resources_.find(original_resource_id); - if (pg_index_resources != pg_indexed_resources_.end()) { - auto index_resources_it = pg_index_resources->second.find(*pg_id); - if (index_resources != pg_index_resources->second.end()) { - for (ResourceID indexed_resource_id : index_resources->second) { + if (pg_index_resources_it != pg_indexed_resources_.end()) { + auto index_resources_it = pg_index_resources_it->second.find(*pg_id); + if (index_resources_it != pg_index_resources_it->second.end()) { + for (ResourceID indexed_resource_id : index_resources_it->second) { if (Has(indexed_resource_id)) { auto allocation = TryAllocate( indexed_resource_id, resource_demands.Get(resource_id_vector[0].first)); @@ -220,11 +260,10 @@ NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { resource_id_vector) { if (pair.second.bundle_index != -1) { // This is the indexed resource - auto allocation = TryAllocate(pair.first, resource_demands.Get(pair.first)); + wildcard_allocation = TryAllocate(pair.first, resource_demands.Get(pair.first)); - if (allocation) { - wildcard_allocation = *allocation; - allocations[pair.first] = std::move(*allocation); + if (wildcard_allocation) { + allocations[pair.first] = *wildcard_allocation; } else { // The corresponding bundle cannot hold the required resources. // Allocation failed @@ -242,9 +281,9 @@ NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { // Allocate wildcard resource, should be consistent with the indexed resource RAY_CHECK(wildcard_resource_id != nullptr); - RAY_CHECK(!wildcard_allocation.empty()); - AllocateWithReference(wildcard_allocation, *wildcard_resource_id); - allocations[*wildcard_resource_id] = std::move(wildcard_allocation); + RAY_CHECK(!(*wildcard_allocation).empty()); + AllocateWithReference(*wildcard_allocation, *wildcard_resource_id); + allocations[*wildcard_resource_id] = std::move(*wildcard_allocation); } return std::make_optional>>( @@ -397,7 +436,7 @@ std::vector NodeResourceInstanceSet::Subtract( std::string NodeResourceInstanceSet::DebugString() const { std::stringstream buffer; - buffer << "{resources_:{"; + buffer << "{"; bool first = true; for (const auto &[id, quantity] : resources_) { if (!first) { @@ -406,37 +445,7 @@ std::string NodeResourceInstanceSet::DebugString() const { first = false; buffer << id.Binary() << ": " << FixedPointVectorToString(quantity); } - buffer << "}, pg_indexed_resources_:{"; - - first = true; - for (const auto &[original_id, pg_resource_id_map] : pg_indexed_resources_) { - if (!first) { - buffer << ", "; - } - first = false; - - buffer << original_id.Binary() << ": {"; - bool firstInMap = true; - for (const auto &[pg_id, indexed_ids] : pg_resource_id_map) { - if (!firstInMap) { - buffer << ", "; - } - firstInMap = false; - - buffer << pg_id << ": {"; - bool firstInSet = true; - for (const auto &index_id : indexed_ids) { - if (!firstInSet) { - buffer << ", "; - } - firstInSet = false; - buffer << index_id.Binary(); - } - buffer << "}"; - } - buffer << "}"; - } - buffer << "}}"; + buffer << "}"; return buffer.str(); } diff --git a/src/ray/common/scheduling/resource_instance_set.h b/src/ray/common/scheduling/resource_instance_set.h index 4a3b833a25e3..debb67112f4d 100644 --- a/src/ray/common/scheduling/resource_instance_set.h +++ b/src/ray/common/scheduling/resource_instance_set.h @@ -87,14 +87,6 @@ class NodeResourceInstanceSet { return resources_; } - /// Only for testing. - const absl::flat_hash_map< - ResourceID, - absl::flat_hash_map>> - &PgIndexedResources() const { - return pg_indexed_resources_; - } - private: /// Allocate enough capacity across the instances of a resource to satisfy "demand". /// @@ -109,6 +101,44 @@ class NodeResourceInstanceSet { /// Thus, we will allocate a bunch of full instances and /// at most a fractional instance. /// + /// During resource allocation with a placement group, no matter whether the + /// allocation requirement specifies a bundle index, we generate the + /// allocation on both the wildcard resource and the indexed resource. The + /// resource_demand won't be assigned across bundles. And If no bundle index is + /// specified, we will iterate through the bundles and find the first bundle that can + /// fit the required resources. In addition, for unit resources, we make sure + /// that the allocation on the wildcard resource and the indexed resource are + /// consistent, meaning the same instance ids should be allocated. + /// + /// For example, considering the GPU resource on a host. Assuming the host has 3 GPUs + /// and 1 placement group with 2 bundles. The bundle with index 1 contains 1 GPU and + /// the bundle with index 2 contains 2 GPU. + /// + /// The current node resource can be as follows: + /// resource id: total, available + /// GPU: [1, 1, 1], [0, 0, 0] + /// GPU_: [1, 1, 1], [1, 1, 1] + /// GPU_1_: [1, 0, 0], [1, 0, 0] + /// GPU_2_: [0, 1, 1], [0, 1, 1] + /// + /// Now, we want to allocate a task with 2 GPUs and in the placement group , + /// reflecting in the following resource demand: + /// GPU_ : 2 + /// + /// We will iterate though all the bundles in the placement group and bundle with + /// index=2 has the required capacity. So we will allocate the task to the 2 GPUs in + /// bundle 2 in placement group and the same allocation should be reflected in + /// the wildcard GPU resource. So the allocation will be: + /// GPU_ : [0, 1, 1] + /// GPU_2_ : [0, 1, 1] + /// + /// And as a result, after the allocation, current node resource will be: + /// resource id: total, available + /// GPU: [1, 1, 1], [0, 0, 0] + /// GPU_: [1, 1, 1], [1, 0, 0] + /// GPU_1_: [1, 0, 0], [1, 0, 0] + /// GPU_2_: [0, 1, 1], [0, 0, 0] + /// /// \param resource_id: The id of the resource to be allocated. /// \param demand: The resource amount to be allocated. /// From 4377f8b69ce719ea1036c7efec4b9c141bda87ad Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Thu, 31 Oct 2024 17:23:14 -0700 Subject: [PATCH 7/9] Update src/ray/common/scheduling/resource_instance_set.h Co-authored-by: Jiajun Yao Signed-off-by: Mengjin Yan --- src/ray/common/scheduling/resource_instance_set.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/common/scheduling/resource_instance_set.h b/src/ray/common/scheduling/resource_instance_set.h index debb67112f4d..f8a557dc2083 100644 --- a/src/ray/common/scheduling/resource_instance_set.h +++ b/src/ray/common/scheduling/resource_instance_set.h @@ -166,7 +166,7 @@ class NodeResourceInstanceSet { /// placement group indexed resources in resources_ by their original resource id and /// pd id. The key of the map is the original resource id. The value is a map of pg id /// to the corresponding placement group indexed resource ids. This map should always - /// be consistent with the _resource map. + /// be consistent with the resources_ map. absl::flat_hash_map>> pg_indexed_resources_; From af502640a0801692d682738b6626e1c43575edce Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Thu, 31 Oct 2024 17:34:11 -0700 Subject: [PATCH 8/9] Fix comment Signed-off-by: Mengjin Yan --- src/ray/common/bundle_spec.cc | 6 +++--- src/ray/common/bundle_spec.h | 2 +- .../common/scheduling/resource_instance_set.cc | 16 +++++++++------- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/ray/common/bundle_spec.cc b/src/ray/common/bundle_spec.cc index 6b11249db270..833ec7c5a82e 100644 --- a/src/ray/common/bundle_spec.cc +++ b/src/ray/common/bundle_spec.cc @@ -95,15 +95,15 @@ std::string BundleSpecification::DebugString() const { } std::string FormatPlacementGroupResource(const std::string &original_resource_name, - const std::string &group_id_str, + const std::string &group_id_hex, int64_t bundle_index) { std::stringstream os; if (bundle_index >= 0) { os << original_resource_name << kGroupKeyword << std::to_string(bundle_index) << "_" - << group_id_str; + << group_id_hex; } else { RAY_CHECK(bundle_index == -1) << "Invalid index " << bundle_index; - os << original_resource_name << kGroupKeyword << group_id_str; + os << original_resource_name << kGroupKeyword << group_id_hex; } std::string result = os.str(); RAY_DCHECK(GetOriginalResourceName(result) == original_resource_name) diff --git a/src/ray/common/bundle_spec.h b/src/ray/common/bundle_spec.h index 12a6d90eddb6..1330d4676fa3 100644 --- a/src/ray/common/bundle_spec.h +++ b/src/ray/common/bundle_spec.h @@ -110,7 +110,7 @@ struct PgFormattedResourceData { /// [original_resource_name]_group_[bundle_index]_[group_id_str] /// \return The corresponding formatted placement group resource string. std::string FormatPlacementGroupResource(const std::string &original_resource_name, - const std::string &group_id_str, + const std::string &group_id_hex, int64_t bundle_index = -1); /// Format a placement group resource, e.g., CPU -> CPU_group_i diff --git a/src/ray/common/scheduling/resource_instance_set.cc b/src/ray/common/scheduling/resource_instance_set.cc index 3b31e1bdb9d1..20bbb04f84ca 100644 --- a/src/ray/common/scheduling/resource_instance_set.cc +++ b/src/ray/common/scheduling/resource_instance_set.cc @@ -96,13 +96,15 @@ NodeResourceInstanceSet &NodeResourceInstanceSet::Set(ResourceID resource_id, resources_[resource_id] = std::move(instances); // Popluate the pg_indexed_resources_map_ - // TODO: The parsing of the resource_id String can be costly and impact the task - // creation throughput if the parting is required every time we allocate resources - // for a task and updating the available resources. The current benchmark shows no - // observable impact for now but in the furture, an idea of improvement is to add - // the placement group id as well as the bundle index inside the ResourceID class. - // And instead of parse the String, leveraging the fields in the ResourceID class - // directly. + // TODO (myan): The parsing of the resource_id String can be costly and impact the + // task creation throughput if the parting is required every time we allocate + // resources for a task and updating the available resources. The current benchmark + // shows no observable impact for now. But in the furture, ideas of improvement are: + // (1) to add the placement group id as well as the bundle index inside the + // ResourceID class. And instead of parse the String, leveraging the fields in the + // ResourceID class directly; (2) to update the pg resource id format to start with + // a special prefix so that we can do "startwith" instead of regex match which is + // less costly auto data = ParsePgFormattedResource(resource_id.Binary(), /*for_wildcard_resource=*/false, /*for_indexed_resource=*/true); From 444423aff8243d6d81f4d30aa0006791aac3cb82 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Mon, 4 Nov 2024 22:58:23 -0800 Subject: [PATCH 9/9] Fix test flakiness Signed-off-by: Mengjin Yan --- .../scheduling/resource_instance_set.cc | 10 +- .../common/test/resource_instance_set_test.cc | 371 ++++++++++++------ 2 files changed, 253 insertions(+), 128 deletions(-) diff --git a/src/ray/common/scheduling/resource_instance_set.cc b/src/ray/common/scheduling/resource_instance_set.cc index 20bbb04f84ca..c9f73d09a727 100644 --- a/src/ray/common/scheduling/resource_instance_set.cc +++ b/src/ray/common/scheduling/resource_instance_set.cc @@ -232,13 +232,11 @@ NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { if (index_resources_it != pg_index_resources_it->second.end()) { for (ResourceID indexed_resource_id : index_resources_it->second) { if (Has(indexed_resource_id)) { - auto allocation = TryAllocate( - indexed_resource_id, resource_demands.Get(resource_id_vector[0].first)); - - if (allocation) { + wildcard_allocation = TryAllocate( + indexed_resource_id, resource_demands.Get(*wildcard_resource_id)); + if (wildcard_allocation) { // Found the allocation in a bundle - wildcard_allocation = *allocation; - allocations[indexed_resource_id] = std::move(*allocation); + allocations[indexed_resource_id] = *wildcard_allocation; found = true; break; } diff --git a/src/ray/common/test/resource_instance_set_test.cc b/src/ray/common/test/resource_instance_set_test.cc index d2546e0a68df..ba969f54509c 100644 --- a/src/ray/common/test/resource_instance_set_test.cc +++ b/src/ray/common/test/resource_instance_set_test.cc @@ -383,7 +383,7 @@ TEST_F(NodeResourceInstanceSetTest, TestTryAllocateWithMultiplePgResourceAndBund r1.Set( pg_gpu_index_1_resource, std::vector( - {{FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(1), FixedPoint(0)}})); + {FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(1), FixedPoint(0)})); ResourceSet fail_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(1)}, {pg_gpu_wildcard_resource, FixedPoint(3)}, @@ -457,7 +457,6 @@ TEST_F(NodeResourceInstanceSetTest, TestTryAllocateSinglePgResourceAndNoBundleIn // 1. Test non unit resource { // Success allocation when found a bundle withe enough available resources - // Make sure it is the first available bundle ResourceID cpu_resource("CPU"); ResourceID pg_cpu_wildcard_resource("CPU_group_4482dec0faaf5ead891ff1659a9501000000"); ResourceID pg_cpu_index_0_resource( @@ -478,34 +477,56 @@ TEST_F(NodeResourceInstanceSetTest, TestTryAllocateSinglePgResourceAndNoBundleIn ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(2)}}); auto allocations = r1.TryAllocate(success_request); ASSERT_EQ(allocations->size(), 2); - ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], - (*allocations)[pg_cpu_index_1_resource]); ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], std::vector({FixedPoint(2)})); + if (allocations->find(pg_cpu_index_1_resource) != allocations->end()) { + ASSERT_EQ((*allocations)[pg_cpu_index_1_resource], + std::vector({FixedPoint(2)})); + } else { + ASSERT_EQ((*allocations)[pg_cpu_index_2_resource], + std::vector({FixedPoint(2)})); + } ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(1)})); - ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(0)})); - ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), std::vector({FixedPoint(2)})); + if (allocations->find(pg_cpu_index_1_resource) != allocations->end()) { + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), + std::vector({FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), + std::vector({FixedPoint(2)})); + } else { + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), + std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), + std::vector({FixedPoint(0)})); + } // Failed allocation when no index bundle have enough resources, even though the pg // still has enough resources ResourceSet fail_request = ResourceSet({ {pg_cpu_wildcard_resource, FixedPoint(3)}, }); - allocations = r1.TryAllocate(fail_request); - ASSERT_FALSE(allocations); + auto failed_allocations = r1.TryAllocate(fail_request); + ASSERT_FALSE(failed_allocations); ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(1)})); - ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(0)})); - ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), std::vector({FixedPoint(2)})); + if (allocations->find(pg_cpu_index_1_resource) != allocations->end()) { + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), + std::vector({FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), + std::vector({FixedPoint(2)})); + } else { + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), + std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), + std::vector({FixedPoint(0)})); + } } // 2. Test unit resource { // Success allocation when found a bundle with enough available resources - // Make sure it is the first available bundle ResourceID gpu_resource("GPU"); ResourceID pg_gpu_wildcard_resource("GPU_group_4482dec0faaf5ead891ff1659a9501000000"); ResourceID pg_gpu_index_0_resource( @@ -538,19 +559,19 @@ TEST_F(NodeResourceInstanceSetTest, TestTryAllocateSinglePgResourceAndNoBundleIn FixedPoint(0), FixedPoint(0)})); r2.Set(pg_gpu_index_1_resource, - std::vector({{FixedPoint(0), - FixedPoint(1), - FixedPoint(1), - FixedPoint(0), - FixedPoint(0), - FixedPoint(0)}})); + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); r2.Set(pg_gpu_index_2_resource, - std::vector({{FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(1), - FixedPoint(1), - FixedPoint(0)}})); + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); ResourceSet success_request = ResourceSet({{pg_gpu_wildcard_resource, FixedPoint(2)}}); @@ -558,15 +579,27 @@ TEST_F(NodeResourceInstanceSetTest, TestTryAllocateSinglePgResourceAndNoBundleIn ASSERT_EQ(allocations->size(), 2); // Make sure the allocations are consistent between wildcard resource and indexed // resource - ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], - (*allocations)[pg_gpu_index_1_resource]); - ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], - std::vector({FixedPoint(0), - FixedPoint(1), - FixedPoint(1), - FixedPoint(0), - FixedPoint(0), - FixedPoint(0)})); + if (allocations->find(pg_gpu_index_1_resource) != allocations->end()) { + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_1_resource]); + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + } else { + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_2_resource]); + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + } ASSERT_EQ(r2.Get(gpu_resource), std::vector({FixedPoint(0), FixedPoint(0), @@ -574,13 +607,6 @@ TEST_F(NodeResourceInstanceSetTest, TestTryAllocateSinglePgResourceAndNoBundleIn FixedPoint(0), FixedPoint(0), FixedPoint(1)})); - ASSERT_EQ(r2.Get(pg_gpu_wildcard_resource), - std::vector({FixedPoint(1), - FixedPoint(0), - FixedPoint(0), - FixedPoint(1), - FixedPoint(1), - FixedPoint(0)})); ASSERT_EQ(r2.Get(pg_gpu_index_0_resource), std::vector({FixedPoint(1), FixedPoint(0), @@ -588,26 +614,57 @@ TEST_F(NodeResourceInstanceSetTest, TestTryAllocateSinglePgResourceAndNoBundleIn FixedPoint(0), FixedPoint(0), FixedPoint(0)})); - ASSERT_EQ(r2.Get(pg_gpu_index_1_resource), - std::vector({FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(0)})); - ASSERT_EQ(r2.Get(pg_gpu_index_2_resource), - std::vector({FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(1), - FixedPoint(1), - FixedPoint(0)})); + if (allocations->find(pg_gpu_index_1_resource) != allocations->end()) { + ASSERT_EQ(r2.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + } else { + ASSERT_EQ(r2.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + } // Failed allocation when the index bundle doesn't have enough resources, even though // the pg still has enough resources ResourceSet fail_request = ResourceSet({{pg_gpu_wildcard_resource, FixedPoint(3)}}); - allocations = r2.TryAllocate(fail_request); - ASSERT_FALSE(allocations); + auto failed_allocations = r2.TryAllocate(fail_request); + ASSERT_FALSE(failed_allocations); ASSERT_EQ(r2.Get(gpu_resource), std::vector({FixedPoint(0), FixedPoint(0), @@ -615,13 +672,6 @@ TEST_F(NodeResourceInstanceSetTest, TestTryAllocateSinglePgResourceAndNoBundleIn FixedPoint(0), FixedPoint(0), FixedPoint(1)})); - ASSERT_EQ(r2.Get(pg_gpu_wildcard_resource), - std::vector({FixedPoint(1), - FixedPoint(0), - FixedPoint(0), - FixedPoint(1), - FixedPoint(1), - FixedPoint(0)})); ASSERT_EQ(r2.Get(pg_gpu_index_0_resource), std::vector({FixedPoint(1), FixedPoint(0), @@ -629,20 +679,51 @@ TEST_F(NodeResourceInstanceSetTest, TestTryAllocateSinglePgResourceAndNoBundleIn FixedPoint(0), FixedPoint(0), FixedPoint(0)})); - ASSERT_EQ(r2.Get(pg_gpu_index_1_resource), - std::vector({FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(0)})); - ASSERT_EQ(r2.Get(pg_gpu_index_2_resource), - std::vector({FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(1), - FixedPoint(1), - FixedPoint(0)})); + if (allocations->find(pg_gpu_index_1_resource) != allocations->end()) { + ASSERT_EQ(r2.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + } else { + ASSERT_EQ(r2.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + } } } @@ -688,19 +769,19 @@ TEST_F(NodeResourceInstanceSetTest, TestTryAllocateMultiplePgResourceAndNoBundle FixedPoint(0), FixedPoint(0)})); r1.Set(pg_gpu_index_1_resource, - std::vector({{FixedPoint(0), - FixedPoint(1), - FixedPoint(1), - FixedPoint(0), - FixedPoint(0), - FixedPoint(0)}})); + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); r1.Set(pg_gpu_index_2_resource, - std::vector({{FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(1), - FixedPoint(1), - FixedPoint(0)}})); + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); ResourceSet fail_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(2)}, {pg_gpu_wildcard_resource, FixedPoint(3)}}); @@ -754,24 +835,46 @@ TEST_F(NodeResourceInstanceSetTest, TestTryAllocateMultiplePgResourceAndNoBundle ASSERT_EQ(allocations->size(), 4); // Make sure the allocations are consistent between wildcard resource and indexed // resource - ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], - (*allocations)[pg_cpu_index_1_resource]); + if (allocations->find(pg_cpu_index_1_resource) != allocations->end()) { + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + (*allocations)[pg_cpu_index_1_resource]); + } else { + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + (*allocations)[pg_cpu_index_2_resource]); + } ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], std::vector({FixedPoint(2)})); - ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], - (*allocations)[pg_gpu_index_1_resource]); - ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], - std::vector({FixedPoint(0), - FixedPoint(1), - FixedPoint(1), - FixedPoint(0), - FixedPoint(0), - FixedPoint(0)})); + if (allocations->find(pg_gpu_index_1_resource) != allocations->end()) { + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_1_resource]); + } else { + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_2_resource]); + } ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(1)})); - ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(0)})); - ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), std::vector({FixedPoint(2)})); + if (allocations->find(pg_cpu_index_1_resource) != allocations->end()) { + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), std::vector({FixedPoint(2)})); + } else { + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), std::vector({FixedPoint(0)})); + } ASSERT_EQ(r1.Get(gpu_resource), std::vector({FixedPoint(0), FixedPoint(0), @@ -779,13 +882,6 @@ TEST_F(NodeResourceInstanceSetTest, TestTryAllocateMultiplePgResourceAndNoBundle FixedPoint(0), FixedPoint(0), FixedPoint(1)})); - ASSERT_EQ(r1.Get(pg_gpu_wildcard_resource), - std::vector({FixedPoint(1), - FixedPoint(0), - FixedPoint(0), - FixedPoint(1), - FixedPoint(1), - FixedPoint(0)})); ASSERT_EQ(r1.Get(pg_gpu_index_0_resource), std::vector({FixedPoint(1), FixedPoint(0), @@ -793,20 +889,51 @@ TEST_F(NodeResourceInstanceSetTest, TestTryAllocateMultiplePgResourceAndNoBundle FixedPoint(0), FixedPoint(0), FixedPoint(0)})); - ASSERT_EQ(r1.Get(pg_gpu_index_1_resource), - std::vector({FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(0)})); - ASSERT_EQ(r1.Get(pg_gpu_index_2_resource), - std::vector({FixedPoint(0), - FixedPoint(0), - FixedPoint(0), - FixedPoint(1), - FixedPoint(1), - FixedPoint(0)})); + if (allocations->find(pg_gpu_index_1_resource) != allocations->end()) { + ASSERT_EQ(r1.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + } else { + ASSERT_EQ(r1.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + } } TEST_F(NodeResourceInstanceSetTest, TestFree) {