diff --git a/src/ray/common/bundle_spec.cc b/src/ray/common/bundle_spec.cc index 8c410263e39d..833ec7c5a82e 100644 --- a/src/ray/common/bundle_spec.cc +++ b/src/ray/common/bundle_spec.cc @@ -95,15 +95,15 @@ std::string BundleSpecification::DebugString() const { } std::string FormatPlacementGroupResource(const std::string &original_resource_name, - const PlacementGroupID &group_id, + const std::string &group_id_hex, int64_t bundle_index) { std::stringstream os; if (bundle_index >= 0) { os << original_resource_name << kGroupKeyword << std::to_string(bundle_index) << "_" - << group_id.Hex(); + << group_id_hex; } else { RAY_CHECK(bundle_index == -1) << "Invalid index " << bundle_index; - os << original_resource_name << kGroupKeyword << group_id.Hex(); + os << original_resource_name << kGroupKeyword << group_id_hex; } std::string result = os.str(); RAY_DCHECK(GetOriginalResourceName(result) == original_resource_name) @@ -112,6 +112,13 @@ std::string FormatPlacementGroupResource(const std::string &original_resource_na return result; } +std::string FormatPlacementGroupResource(const std::string &original_resource_name, + const PlacementGroupID &group_id, + int64_t bundle_index) { + return FormatPlacementGroupResource( + original_resource_name, group_id.Hex(), bundle_index); +} + std::string GetOriginalResourceName(const std::string &resource) { auto data = ParsePgFormattedResource( resource, /*for_wildcard_resource*/ true, /*for_indexed_resource*/ true); @@ -146,6 +153,7 @@ std::optional ParsePgFormattedResource( match_groups.size() == 3) { data.original_resource = match_groups[1].str(); data.bundle_index = -1; + data.group_id = match_groups[2].str(); return data; } } @@ -157,6 +165,7 @@ std::optional ParsePgFormattedResource( match_groups.size() == 4) { data.original_resource = match_groups[1].str(); data.bundle_index = stoi(match_groups[2].str()); + data.group_id = match_groups[3].str(); return data; } } diff --git a/src/ray/common/bundle_spec.h b/src/ray/common/bundle_spec.h index dad44cd2f5db..1330d4676fa3 100644 --- a/src/ray/common/bundle_spec.h +++ b/src/ray/common/bundle_spec.h @@ -97,8 +97,22 @@ struct PgFormattedResourceData { std::string original_resource; /// -1 if it is a wildcard resource. int64_t bundle_index; + std::string group_id; }; +/// Format a placement group resource with provided parameters. +/// +/// \param original_resource_name The original resource name of the pg resource. +/// \param group_id_str The group id in string format. +/// \param bundle_index The bundle index. If -1, generate the wildcard pg resource. +/// E.g., [original_resource_name]_group_[group_id_str]. +/// If >=0, generate the indexed pg resource. E.g., +/// [original_resource_name]_group_[bundle_index]_[group_id_str] +/// \return The corresponding formatted placement group resource string. +std::string FormatPlacementGroupResource(const std::string &original_resource_name, + const std::string &group_id_hex, + int64_t bundle_index = -1); + /// Format a placement group resource, e.g., CPU -> CPU_group_i std::string FormatPlacementGroupResource(const std::string &original_resource_name, const PlacementGroupID &group_id, diff --git a/src/ray/common/scheduling/resource_instance_set.cc b/src/ray/common/scheduling/resource_instance_set.cc index 976deee39383..c9f73d09a727 100644 --- a/src/ray/common/scheduling/resource_instance_set.cc +++ b/src/ray/common/scheduling/resource_instance_set.cc @@ -16,7 +16,9 @@ #include #include +#include +#include "ray/common/bundle_spec.h" #include "ray/util/logging.h" namespace ray { @@ -43,6 +45,29 @@ bool NodeResourceInstanceSet::Has(ResourceID resource_id) const { void NodeResourceInstanceSet::Remove(ResourceID resource_id) { resources_.erase(resource_id); + + // Remove from the pg_indexed_resources_ as well + auto data = ParsePgFormattedResource(resource_id.Binary(), + /*for_wildcard_resource=*/false, + /*for_indexed_resource=*/true); + if (data) { + ResourceID original_resource_id(data->original_resource); + + auto pg_resource_map_it = pg_indexed_resources_.find(original_resource_id); + if (pg_resource_map_it != pg_indexed_resources_.end()) { + auto resource_set_it = pg_resource_map_it->second.find(data->group_id); + + if (resource_set_it != pg_resource_map_it->second.end()) { + resource_set_it->second.erase(resource_id); + if (resource_set_it->second.empty()) { + pg_resource_map_it->second.erase(data->group_id); + } + if (pg_resource_map_it->second.empty()) { + pg_indexed_resources_.erase(original_resource_id); + } + } + } + } } const std::vector &NodeResourceInstanceSet::Get( @@ -69,6 +94,24 @@ NodeResourceInstanceSet &NodeResourceInstanceSet::Set(ResourceID resource_id, resources_.erase(resource_id); } else { resources_[resource_id] = std::move(instances); + + // Popluate the pg_indexed_resources_map_ + // TODO (myan): The parsing of the resource_id String can be costly and impact the + // task creation throughput if the parting is required every time we allocate + // resources for a task and updating the available resources. The current benchmark + // shows no observable impact for now. But in the furture, ideas of improvement are: + // (1) to add the placement group id as well as the bundle index inside the + // ResourceID class. And instead of parse the String, leveraging the fields in the + // ResourceID class directly; (2) to update the pg resource id format to start with + // a special prefix so that we can do "startwith" instead of regex match which is + // less costly + auto data = ParsePgFormattedResource(resource_id.Binary(), + /*for_wildcard_resource=*/false, + /*for_indexed_resource=*/true); + if (data) { + pg_indexed_resources_[ResourceID(data->original_resource)][data->group_id].emplace( + resource_id); + } } return *this; } @@ -93,21 +136,156 @@ bool NodeResourceInstanceSet::operator==(const NodeResourceInstanceSet &other) c std::optional>> NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { absl::flat_hash_map> allocations; + + // During resource allocation with a placement group, no matter whether the allocation + // requirement specifies a bundle index, we generate the allocation on both + // the wildcard resource and the indexed resource. The resource_demand shouldn't be + // assigned across bundles. And If no bundle index is specified, we will iterate + // through the bundles and find the first bundle that can fit the required resources. + // In addition, for unit resources, we make sure that the allocation on the + // wildcard resource and the indexed resource are consistent, meaning the same + // instance ids should be allocated. + // + // For example, considering the GPU resource on a host. Assuming the host has 3 GPUs + // and 1 placement group with 2 bundles. The bundle with index 1 contains 1 GPU and + // the bundle with index 2 contains 2 GPU. + // + // The current node resource can be as follows: + // resource id: total, available + // GPU: [1, 1, 1], [0, 0, 0] + // GPU_: [1, 1, 1], [1, 1, 1] + // GPU_1_: [1, 0, 0], [1, 0, 0] + // GPU_2_: [0, 1, 1], [0, 1, 1] + // + // Now, we want to allocate a task with 2 GPUs and in the placement group , + // reflecting in the following resource demand: + // GPU_ : 2 + // + // We will iterate though all the bundles in the placement group and bundle with + // index=2 has the required capacity. So we will allocate the task to the 2 GPUs in + // bundle 2 in placement group and the same allocation should be reflected in + // the wildcard GPU resource. So the allocation will be: + // GPU_ : [0, 1, 1] + // GPU_2_ : [0, 1, 1] + // + // And as a result, after the allocation, current node resource will be: + // resource id: total, available + // GPU: [1, 1, 1], [0, 0, 0] + // GPU_: [1, 1, 1], [1, 0, 0] + // GPU_1_: [1, 0, 0], [1, 0, 0] + // GPU_2_: [0, 1, 1], [0, 0, 0] + + absl::flat_hash_map>> + pg_resource_map; + for (const auto &[resource_id, demand] : resource_demands.Resources()) { - auto allocation = TryAllocate(resource_id, demand); - if (allocation) { - // Even if allocation failed we need to remember partial allocations to correctly - // free resources. - allocations[resource_id] = std::move(*allocation); + auto data = ParsePgFormattedResource(resource_id.Binary(), + /*for_wildcard_resource*/ true, + /*for_indexed_resource*/ true); + + if (data) { + // Aggregate based on resource type + ResourceID original_resource_id{data->original_resource}; + pg_resource_map[original_resource_id].push_back( + std::make_pair(resource_id, data.value())); } else { - // Allocation failed. Restore partially allocated resources. - for (const auto &[resource_id, allocation] : allocations) { - Free(resource_id, allocation); + // Directly allocate the resources if the resource is not with a placement group + auto allocation = TryAllocate(resource_id, demand); + if (allocation) { + // Even if allocation failed we need to remember partial allocations to + // correctly free resources. + allocations[resource_id] = std::move(*allocation); + } else { + // Allocation failed. Restore partially allocated resources. + for (const auto &[resource_id, allocation] : allocations) { + Free(resource_id, allocation); + } + return std::nullopt; } - return std::nullopt; } } + // Handle the resource allocation for resources with placement group + for (const auto &[original_resource_id, resource_id_vector] : pg_resource_map) { + // Assuming exactly 1 placement group and at most 1 bundle index can be specified in + // the resource requirement for a single resource type + // Also assuming the wildcard resource id will always exist in the resource_demands + // no matter how the resource requirement is specified in task + std::optional> wildcard_allocation; + const ResourceID *wildcard_resource_id = nullptr; + const std::string *pg_id = nullptr; + + // Allocate indexed resource + if (resource_id_vector.size() == 1) { + // The case where no bundle index is specified + // Iterate through the bundles with the same original resource and pg_id to find + // the first one with enough space + bool found = false; + wildcard_resource_id = &resource_id_vector[0].first; + pg_id = &resource_id_vector[0].second.group_id; + + auto pg_index_resources_it = pg_indexed_resources_.find(original_resource_id); + if (pg_index_resources_it != pg_indexed_resources_.end()) { + auto index_resources_it = pg_index_resources_it->second.find(*pg_id); + if (index_resources_it != pg_index_resources_it->second.end()) { + for (ResourceID indexed_resource_id : index_resources_it->second) { + if (Has(indexed_resource_id)) { + wildcard_allocation = TryAllocate( + indexed_resource_id, resource_demands.Get(*wildcard_resource_id)); + if (wildcard_allocation) { + // Found the allocation in a bundle + allocations[indexed_resource_id] = *wildcard_allocation; + found = true; + break; + } + } + } + } + } + + if (!found) { + // No bundle can fit the required resources, allocation failed + for (const auto &[resource_id, allocation] : allocations) { + Free(resource_id, allocation); + } + return std::nullopt; + } + } else { + // The case where the bundle index is specified + // For each resource type, both the wildcard resource and the indexed resource + // should be in the resource_demand + for (const std::pair &pair : + resource_id_vector) { + if (pair.second.bundle_index != -1) { + // This is the indexed resource + wildcard_allocation = TryAllocate(pair.first, resource_demands.Get(pair.first)); + + if (wildcard_allocation) { + allocations[pair.first] = *wildcard_allocation; + } else { + // The corresponding bundle cannot hold the required resources. + // Allocation failed + for (const auto &[resource_id, allocation] : allocations) { + Free(resource_id, allocation); + } + return std::nullopt; + } + } else { + // This is the wildcard resource + wildcard_resource_id = &pair.first; + } + } + } + + // Allocate wildcard resource, should be consistent with the indexed resource + RAY_CHECK(wildcard_resource_id != nullptr); + RAY_CHECK(!(*wildcard_allocation).empty()); + AllocateWithReference(*wildcard_allocation, *wildcard_resource_id); + allocations[*wildcard_resource_id] = std::move(*wildcard_allocation); + } + return std::make_optional>>( std::move(allocations)); } @@ -139,7 +317,7 @@ std::optional> NodeResourceInstanceSet::TryAllocate( // // As long as remaining_demand is greater than 1., // allocate full unit-capacity instances until the remaining_demand becomes fractional. - // Then try to find the best fit for the fractional remaining_resources. Best fist means + // Then try to find the best fit for the fractional remaining_resources. Best fit means // allocating the resource instance with the smallest available capacity greater than // remaining_demand if (remaining_demand >= 1.) { @@ -186,6 +364,20 @@ std::optional> NodeResourceInstanceSet::TryAllocate( return std::make_optional>(std::move(allocation)); } +void NodeResourceInstanceSet::AllocateWithReference( + const std::vector &ref_allocation, ResourceID resource_id) { + std::vector available = Get(resource_id); + RAY_CHECK(!available.empty()); + RAY_CHECK_EQ(available.size(), ref_allocation.size()); + + for (size_t i = 0; i < ref_allocation.size(); i++) { + RAY_CHECK_GE(available[i], ref_allocation[i]); + available[i] -= ref_allocation[i]; + } + + Set(resource_id, std::move(available)); +} + void NodeResourceInstanceSet::Free(ResourceID resource_id, const std::vector &allocation) { std::vector available = Get(resource_id); diff --git a/src/ray/common/scheduling/resource_instance_set.h b/src/ray/common/scheduling/resource_instance_set.h index ee65ef77c842..f8a557dc2083 100644 --- a/src/ray/common/scheduling/resource_instance_set.h +++ b/src/ray/common/scheduling/resource_instance_set.h @@ -101,6 +101,44 @@ class NodeResourceInstanceSet { /// Thus, we will allocate a bunch of full instances and /// at most a fractional instance. /// + /// During resource allocation with a placement group, no matter whether the + /// allocation requirement specifies a bundle index, we generate the + /// allocation on both the wildcard resource and the indexed resource. The + /// resource_demand won't be assigned across bundles. And If no bundle index is + /// specified, we will iterate through the bundles and find the first bundle that can + /// fit the required resources. In addition, for unit resources, we make sure + /// that the allocation on the wildcard resource and the indexed resource are + /// consistent, meaning the same instance ids should be allocated. + /// + /// For example, considering the GPU resource on a host. Assuming the host has 3 GPUs + /// and 1 placement group with 2 bundles. The bundle with index 1 contains 1 GPU and + /// the bundle with index 2 contains 2 GPU. + /// + /// The current node resource can be as follows: + /// resource id: total, available + /// GPU: [1, 1, 1], [0, 0, 0] + /// GPU_: [1, 1, 1], [1, 1, 1] + /// GPU_1_: [1, 0, 0], [1, 0, 0] + /// GPU_2_: [0, 1, 1], [0, 1, 1] + /// + /// Now, we want to allocate a task with 2 GPUs and in the placement group , + /// reflecting in the following resource demand: + /// GPU_ : 2 + /// + /// We will iterate though all the bundles in the placement group and bundle with + /// index=2 has the required capacity. So we will allocate the task to the 2 GPUs in + /// bundle 2 in placement group and the same allocation should be reflected in + /// the wildcard GPU resource. So the allocation will be: + /// GPU_ : [0, 1, 1] + /// GPU_2_ : [0, 1, 1] + /// + /// And as a result, after the allocation, current node resource will be: + /// resource id: total, available + /// GPU: [1, 1, 1], [0, 0, 0] + /// GPU_: [1, 1, 1], [1, 0, 0] + /// GPU_1_: [1, 0, 0], [1, 0, 0] + /// GPU_2_: [0, 1, 1], [0, 0, 0] + /// /// \param resource_id: The id of the resource to be allocated. /// \param demand: The resource amount to be allocated. /// @@ -108,8 +146,30 @@ class NodeResourceInstanceSet { std::optional> TryAllocate(ResourceID resource_id, FixedPoint demand); + /// Allocate resource to the resource_id based on a provided reference allocation. + /// The function is used for placement group allocation. Making the allocation of + /// the wildcard resource be identical to the indexed resource allocation. + /// + /// The function assumes and also verifies that (1) the resource_id exists in the + /// node; (2) the available resources with resource_id on the node can satisfy the + /// provided ref_allocation. + /// + /// \param ref_allocation: The reference allocation used to allocate the resource_id + /// \param resource_id: The id of the resource to be allocated + void AllocateWithReference(const std::vector &ref_allocation, + ResourceID resource_id); + /// Map from the resource IDs to the resource instance values. absl::flat_hash_map> resources_; + + /// This is a derived map from the resources_ map. The map aggregates all the current + /// placement group indexed resources in resources_ by their original resource id and + /// pd id. The key of the map is the original resource id. The value is a map of pg id + /// to the corresponding placement group indexed resource ids. This map should always + /// be consistent with the resources_ map. + absl::flat_hash_map>> + pg_indexed_resources_; }; } // namespace ray diff --git a/src/ray/common/test/resource_instance_set_test.cc b/src/ray/common/test/resource_instance_set_test.cc index 2c5d40a942cb..ba969f54509c 100644 --- a/src/ray/common/test/resource_instance_set_test.cc +++ b/src/ray/common/test/resource_instance_set_test.cc @@ -93,35 +93,849 @@ TEST_F(NodeResourceInstanceSetTest, TestOperator) { ASSERT_FALSE(r1 == r3); } -TEST_F(NodeResourceInstanceSetTest, TestTryAllocate) { +TEST_F(NodeResourceInstanceSetTest, TestTryAllocateOneResourceWithoutPlacementGroup) { + // 1. Test non-unit resource + { + // Allocation succeed when demand is smaller than available + NodeResourceInstanceSet r1 = NodeResourceInstanceSet(NodeResourceSet({{"CPU", 2}})); + ResourceSet success_request = ResourceSet({{"CPU", FixedPoint(1)}}); + auto allocations = r1.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID("CPU")], + std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(ResourceID("CPU")), std::vector({FixedPoint(1)})); + + // Allocation failed when demand is larger than available + ResourceSet fail_request = ResourceSet({{"CPU", FixedPoint(2)}}); + allocations = r1.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + // Make sure nothing is allocated when allocation fails. + ASSERT_EQ(r1.Get(ResourceID("CPU")), std::vector({FixedPoint(1)})); + + // Allocation succeed when demand equals available + success_request = ResourceSet({{"CPU", FixedPoint(1)}}); + allocations = r1.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID("CPU")], + std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(ResourceID("CPU")), std::vector({FixedPoint(0)})); + } + + // 2. Test unit resource + { + // Succees allocation with demand > 1 + NodeResourceInstanceSet r2 = NodeResourceInstanceSet(NodeResourceSet({{"GPU", 4}})); + ResourceSet success_request = ResourceSet({{"GPU", FixedPoint(2)}}); + auto allocations = r2.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID("GPU")], + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ(r2.Get(ResourceID("GPU")), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(1)})); + + // Failed allocation when demand > available + ResourceSet fail_request = ResourceSet({{"GPU", FixedPoint(3)}}); + allocations = r2.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + // Make sure nothing is allocated when allocation fails. + ASSERT_EQ(r2.Get(ResourceID("GPU")), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(1)})); + + // Success allocation with fractional demand + // Should be allocated with best fit + success_request = ResourceSet({{"GPU", FixedPoint(0.4)}}); + allocations = r2.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID("GPU")], + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0.4), FixedPoint(0)})); + ASSERT_EQ(r2.Get(ResourceID("GPU")), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0.6), FixedPoint(1)})); + + success_request = ResourceSet({{"GPU", FixedPoint(0.7)}}); + allocations = r2.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID("GPU")], + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0.7)})); + ASSERT_EQ(r2.Get(ResourceID("GPU")), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0.6), FixedPoint(0.3)})); + + success_request = ResourceSet({{"GPU", FixedPoint(0.3)}}); + allocations = r2.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID("GPU")], + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0.3)})); + ASSERT_EQ(r2.Get(ResourceID("GPU")), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0.6), FixedPoint(0)})); + } + + // 3. Test implicit resource + { + NodeResourceInstanceSet r3 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + ResourceSet success_request = + ResourceSet({{std::string(kImplicitResourcePrefix) + "a", FixedPoint(0.3)}}); + auto allocations = r3.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 1); + ASSERT_EQ((*allocations)[ResourceID(std::string(kImplicitResourcePrefix) + "a")], + std::vector({FixedPoint(0.3)})); + ASSERT_EQ(r3.Get(ResourceID(std::string(kImplicitResourcePrefix) + "a")), + std::vector({FixedPoint(0.7)})); + } +} + +TEST_F(NodeResourceInstanceSetTest, TestTryAllocateMultipleResourcesWithoutPg) { + // Case 1: Partial failure will not allocate anything NodeResourceInstanceSet r1 = NodeResourceInstanceSet(NodeResourceSet({{"CPU", 2}, {"GPU", 2}})); NodeResourceInstanceSet r2 = NodeResourceInstanceSet(NodeResourceSet({{"CPU", 2}, {"GPU", 2}})); - // Allocation fails. - auto allocations = - r1.TryAllocate(ResourceSet({{"CPU", FixedPoint(1)}, {"GPU", FixedPoint(3)}})); + ResourceSet fail_request = + ResourceSet({{"CPU", FixedPoint(1)}, + {"GPU", FixedPoint(3)}, + {std::string(kImplicitResourcePrefix) + "a", FixedPoint(0.3)}}); + auto allocations = r1.TryAllocate(fail_request); ASSERT_FALSE(allocations); - // Make sure nothing is allocated when allocation fails. ASSERT_TRUE(r1 == r2); - allocations = r1.TryAllocate( + // Case 2: All success, will allocate all the resources + ResourceSet success_request = ResourceSet({{"CPU", FixedPoint(1)}, {"GPU", FixedPoint(1)}, - {std::string(kImplicitResourcePrefix) + "a", FixedPoint(0.3)}})); + {std::string(kImplicitResourcePrefix) + "a", FixedPoint(0.3)}}); + allocations = r1.TryAllocate(success_request); ASSERT_EQ(allocations->size(), 3); ASSERT_EQ((*allocations)[ResourceID("CPU")], std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(ResourceID("CPU")), std::vector({FixedPoint(1)})); ASSERT_EQ((*allocations)[ResourceID("GPU")], std::vector({FixedPoint(1), FixedPoint(0)})); - ASSERT_EQ((*allocations)[ResourceID(std::string(kImplicitResourcePrefix) + "a")], - std::vector({FixedPoint(0.3)})); - ASSERT_EQ(r1.Get(ResourceID("CPU")), std::vector({FixedPoint(1)})); ASSERT_EQ(r1.Get(ResourceID("GPU")), std::vector({FixedPoint(0), FixedPoint(1)})); + ASSERT_EQ((*allocations)[ResourceID(std::string(kImplicitResourcePrefix) + "a")], + std::vector({FixedPoint(0.3)})); ASSERT_EQ(r1.Get(ResourceID(std::string(kImplicitResourcePrefix) + "a")), std::vector({FixedPoint(0.7)})); } +TEST_F(NodeResourceInstanceSetTest, TestTryAllocateWithSinglePgResourceAndBundleIndex) { + // 1. Test non unit resource + { + // Success allocation when the index bundle have enough resources + ResourceID cpu_resource("CPU"); + ResourceID pg_cpu_wildcard_resource("CPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_0_resource( + "CPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_1_resource( + "CPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + NodeResourceInstanceSet r1 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + r1.Set(cpu_resource, std::vector({FixedPoint(1)})); + r1.Set(pg_cpu_wildcard_resource, std::vector({FixedPoint(4)})); + r1.Set(pg_cpu_index_0_resource, std::vector({FixedPoint(2)})); + r1.Set(pg_cpu_index_1_resource, std::vector({FixedPoint(2)})); + + ResourceSet success_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(1)}, + {pg_cpu_index_1_resource, FixedPoint(1)}}); + auto allocations = r1.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 2); + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + (*allocations)[pg_cpu_index_1_resource]); + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(1)})); + + // Failed allocation when the index bundle doesn't have enough resources, even though + // the pg still has enough resources + ResourceSet fail_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(2)}, + {pg_cpu_index_1_resource, FixedPoint(2)}}); + allocations = r1.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(1)})); + } + + // 2. Test unit resource + { + // Success allocation when the index bundle have enough resources + ResourceID gpu_resource("GPU"); + ResourceID pg_gpu_wildcard_resource("GPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_0_resource( + "GPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_1_resource( + "GPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + NodeResourceInstanceSet r2 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + r2.Set( + gpu_resource, + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1)})); + r2.Set( + pg_gpu_wildcard_resource, + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(1), FixedPoint(1), FixedPoint(0)})); + r2.Set( + pg_gpu_index_0_resource, + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0), FixedPoint(0)})); + r2.Set(pg_gpu_index_1_resource, + std::vector({{FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)}})); + + ResourceSet success_request = ResourceSet({{pg_gpu_wildcard_resource, FixedPoint(1)}, + {pg_gpu_index_1_resource, FixedPoint(1)}}); + auto allocations = r2.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 2); + // Make sure the allocations are consistent between wildcard resource and indexed + // resource + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_1_resource]); + ASSERT_EQ( + (*allocations)[pg_gpu_wildcard_resource], + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ( + r2.Get(gpu_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1)})); + ASSERT_EQ( + r2.Get(pg_gpu_wildcard_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(1), FixedPoint(0)})); + ASSERT_EQ( + r2.Get(pg_gpu_index_0_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ( + r2.Get(pg_gpu_index_1_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(0)})); + + // Failed allocation when the index bundle doesn't have enough resources, even though + // the pg still has enough resources + ResourceSet fail_request = ResourceSet({{pg_gpu_wildcard_resource, FixedPoint(2)}, + {pg_gpu_index_1_resource, FixedPoint(2)}}); + allocations = r2.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + ASSERT_EQ( + r2.Get(gpu_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1)})); + ASSERT_EQ( + r2.Get(pg_gpu_wildcard_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(1), FixedPoint(0)})); + ASSERT_EQ( + r2.Get(pg_gpu_index_0_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ( + r2.Get(pg_gpu_index_1_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(0)})); + } +} + +TEST_F(NodeResourceInstanceSetTest, TestTryAllocateWithMultiplePgResourceAndBundleIndex) { + // Case 1: Partial failure will not allocate anything + ResourceID cpu_resource("CPU"); + ResourceID pg_cpu_wildcard_resource("CPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_0_resource("CPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_1_resource("CPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID gpu_resource("GPU"); + ResourceID pg_gpu_wildcard_resource("GPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_0_resource("GPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_1_resource("GPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + + NodeResourceInstanceSet r1 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + r1.Set(cpu_resource, std::vector({FixedPoint(1)})); + r1.Set(pg_cpu_wildcard_resource, std::vector({FixedPoint(4)})); + r1.Set(pg_cpu_index_0_resource, std::vector({FixedPoint(2)})); + r1.Set(pg_cpu_index_1_resource, std::vector({FixedPoint(2)})); + r1.Set( + gpu_resource, + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1)})); + r1.Set( + pg_gpu_wildcard_resource, + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(1), FixedPoint(1), FixedPoint(0)})); + r1.Set( + pg_gpu_index_0_resource, + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0), FixedPoint(0)})); + r1.Set( + pg_gpu_index_1_resource, + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(1), FixedPoint(0)})); + + ResourceSet fail_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(1)}, + {pg_gpu_wildcard_resource, FixedPoint(3)}, + {pg_cpu_index_1_resource, FixedPoint(1)}, + {pg_gpu_index_1_resource, FixedPoint(3)}}); + auto allocations = r1.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(4)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ( + r1.Get(gpu_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1)})); + ASSERT_EQ( + r1.Get(pg_gpu_wildcard_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(1), FixedPoint(1), FixedPoint(0)})); + ASSERT_EQ( + r1.Get(pg_gpu_index_0_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ( + r1.Get(pg_gpu_index_1_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(1), FixedPoint(0)})); + + // Case 2: All success, will allocate all the resources + ResourceSet success_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(1)}, + {pg_gpu_wildcard_resource, FixedPoint(1)}, + {pg_cpu_index_1_resource, FixedPoint(1)}, + {pg_gpu_index_1_resource, FixedPoint(1)}}); + allocations = r1.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 4); + // Make sure the allocations are consistent between wildcard resource and indexed + // resource + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + (*allocations)[pg_cpu_index_1_resource]); + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + std::vector({FixedPoint(1)})); + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_1_resource]); + ASSERT_EQ( + (*allocations)[pg_gpu_wildcard_resource], + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ( + r1.Get(gpu_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1)})); + ASSERT_EQ( + r1.Get(pg_gpu_wildcard_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(1), FixedPoint(0)})); + ASSERT_EQ( + r1.Get(pg_gpu_index_0_resource), + std::vector( + {FixedPoint(1), FixedPoint(1), FixedPoint(0), FixedPoint(0), FixedPoint(0)})); + ASSERT_EQ( + r1.Get(pg_gpu_index_1_resource), + std::vector( + {FixedPoint(0), FixedPoint(0), FixedPoint(0), FixedPoint(1), FixedPoint(0)})); +} + +TEST_F(NodeResourceInstanceSetTest, TestTryAllocateSinglePgResourceAndNoBundleIndex) { + // 1. Test non unit resource + { + // Success allocation when found a bundle withe enough available resources + ResourceID cpu_resource("CPU"); + ResourceID pg_cpu_wildcard_resource("CPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_0_resource( + "CPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_1_resource( + "CPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_2_resource( + "CPU_group_2_4482dec0faaf5ead891ff1659a9501000000"); + NodeResourceInstanceSet r1 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + r1.Set(cpu_resource, std::vector({FixedPoint(1)})); + r1.Set(pg_cpu_wildcard_resource, std::vector({FixedPoint(5)})); + r1.Set(pg_cpu_index_0_resource, std::vector({FixedPoint(1)})); + r1.Set(pg_cpu_index_1_resource, std::vector({FixedPoint(2)})); + r1.Set(pg_cpu_index_2_resource, std::vector({FixedPoint(2)})); + + ResourceSet success_request = + ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(2)}}); + auto allocations = r1.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 2); + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + std::vector({FixedPoint(2)})); + if (allocations->find(pg_cpu_index_1_resource) != allocations->end()) { + ASSERT_EQ((*allocations)[pg_cpu_index_1_resource], + std::vector({FixedPoint(2)})); + } else { + ASSERT_EQ((*allocations)[pg_cpu_index_2_resource], + std::vector({FixedPoint(2)})); + } + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(1)})); + if (allocations->find(pg_cpu_index_1_resource) != allocations->end()) { + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), + std::vector({FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), + std::vector({FixedPoint(2)})); + } else { + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), + std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), + std::vector({FixedPoint(0)})); + } + + // Failed allocation when no index bundle have enough resources, even though the pg + // still has enough resources + ResourceSet fail_request = ResourceSet({ + {pg_cpu_wildcard_resource, FixedPoint(3)}, + }); + auto failed_allocations = r1.TryAllocate(fail_request); + ASSERT_FALSE(failed_allocations); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(1)})); + if (allocations->find(pg_cpu_index_1_resource) != allocations->end()) { + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), + std::vector({FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), + std::vector({FixedPoint(2)})); + } else { + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), + std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), + std::vector({FixedPoint(0)})); + } + } + + // 2. Test unit resource + { + // Success allocation when found a bundle with enough available resources + ResourceID gpu_resource("GPU"); + ResourceID pg_gpu_wildcard_resource("GPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_0_resource( + "GPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_1_resource( + "GPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_2_resource( + "GPU_group_2_4482dec0faaf5ead891ff1659a9501000000"); + NodeResourceInstanceSet r2 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + r2.Set(gpu_resource, + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1)})); + r2.Set(pg_gpu_wildcard_resource, + std::vector({FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + r2.Set(pg_gpu_index_0_resource, + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + r2.Set(pg_gpu_index_1_resource, + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + r2.Set(pg_gpu_index_2_resource, + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + + ResourceSet success_request = + ResourceSet({{pg_gpu_wildcard_resource, FixedPoint(2)}}); + auto allocations = r2.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 2); + // Make sure the allocations are consistent between wildcard resource and indexed + // resource + if (allocations->find(pg_gpu_index_1_resource) != allocations->end()) { + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_1_resource]); + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + } else { + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_2_resource]); + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + } + ASSERT_EQ(r2.Get(gpu_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1)})); + ASSERT_EQ(r2.Get(pg_gpu_index_0_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + if (allocations->find(pg_gpu_index_1_resource) != allocations->end()) { + ASSERT_EQ(r2.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + } else { + ASSERT_EQ(r2.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + } + + // Failed allocation when the index bundle doesn't have enough resources, even though + // the pg still has enough resources + ResourceSet fail_request = ResourceSet({{pg_gpu_wildcard_resource, FixedPoint(3)}}); + auto failed_allocations = r2.TryAllocate(fail_request); + ASSERT_FALSE(failed_allocations); + ASSERT_EQ(r2.Get(gpu_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1)})); + ASSERT_EQ(r2.Get(pg_gpu_index_0_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + if (allocations->find(pg_gpu_index_1_resource) != allocations->end()) { + ASSERT_EQ(r2.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + } else { + ASSERT_EQ(r2.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r2.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + } + } +} + +TEST_F(NodeResourceInstanceSetTest, TestTryAllocateMultiplePgResourceAndNoBundleIndex) { + // Case 1: Partial failure will not allocate anything + ResourceID cpu_resource("CPU"); + ResourceID pg_cpu_wildcard_resource("CPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_0_resource("CPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_1_resource("CPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_cpu_index_2_resource("CPU_group_2_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID gpu_resource("GPU"); + ResourceID pg_gpu_wildcard_resource("GPU_group_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_0_resource("GPU_group_0_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_1_resource("GPU_group_1_4482dec0faaf5ead891ff1659a9501000000"); + ResourceID pg_gpu_index_2_resource("GPU_group_2_4482dec0faaf5ead891ff1659a9501000000"); + + NodeResourceInstanceSet r1 = NodeResourceInstanceSet( + NodeResourceSet(absl::flat_hash_map{})); + r1.Set(cpu_resource, std::vector({FixedPoint(1)})); + r1.Set(pg_cpu_wildcard_resource, std::vector({FixedPoint(5)})); + r1.Set(pg_cpu_index_0_resource, std::vector({FixedPoint(1)})); + r1.Set(pg_cpu_index_1_resource, std::vector({FixedPoint(2)})); + r1.Set(pg_cpu_index_2_resource, std::vector({FixedPoint(2)})); + r1.Set(gpu_resource, + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1)})); + r1.Set(pg_gpu_wildcard_resource, + std::vector({FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + r1.Set(pg_gpu_index_0_resource, + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + r1.Set(pg_gpu_index_1_resource, + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + r1.Set(pg_gpu_index_2_resource, + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + + ResourceSet fail_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(2)}, + {pg_gpu_wildcard_resource, FixedPoint(3)}}); + auto allocations = r1.TryAllocate(fail_request); + ASSERT_FALSE(allocations); + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(5)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(gpu_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_0_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + + // Case 2: All success, will allocate all the resources + ResourceSet success_request = ResourceSet({{pg_cpu_wildcard_resource, FixedPoint(2)}, + {pg_gpu_wildcard_resource, FixedPoint(2)}}); + allocations = r1.TryAllocate(success_request); + ASSERT_EQ(allocations->size(), 4); + // Make sure the allocations are consistent between wildcard resource and indexed + // resource + if (allocations->find(pg_cpu_index_1_resource) != allocations->end()) { + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + (*allocations)[pg_cpu_index_1_resource]); + } else { + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + (*allocations)[pg_cpu_index_2_resource]); + } + ASSERT_EQ((*allocations)[pg_cpu_wildcard_resource], + std::vector({FixedPoint(2)})); + if (allocations->find(pg_gpu_index_1_resource) != allocations->end()) { + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_1_resource]); + } else { + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + ASSERT_EQ((*allocations)[pg_gpu_wildcard_resource], + (*allocations)[pg_gpu_index_2_resource]); + } + ASSERT_EQ(r1.Get(cpu_resource), std::vector({FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_cpu_wildcard_resource), std::vector({FixedPoint(3)})); + ASSERT_EQ(r1.Get(pg_cpu_index_0_resource), std::vector({FixedPoint(1)})); + if (allocations->find(pg_cpu_index_1_resource) != allocations->end()) { + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), std::vector({FixedPoint(2)})); + } else { + ASSERT_EQ(r1.Get(pg_cpu_index_1_resource), std::vector({FixedPoint(2)})); + ASSERT_EQ(r1.Get(pg_cpu_index_2_resource), std::vector({FixedPoint(0)})); + } + ASSERT_EQ(r1.Get(gpu_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1)})); + ASSERT_EQ(r1.Get(pg_gpu_index_0_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + if (allocations->find(pg_gpu_index_1_resource) != allocations->end()) { + ASSERT_EQ(r1.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0)})); + } else { + ASSERT_EQ(r1.Get(pg_gpu_wildcard_resource), + std::vector({FixedPoint(1), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_1_resource), + std::vector({FixedPoint(0), + FixedPoint(1), + FixedPoint(1), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + ASSERT_EQ(r1.Get(pg_gpu_index_2_resource), + std::vector({FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0), + FixedPoint(0)})); + } +} + TEST_F(NodeResourceInstanceSetTest, TestFree) { NodeResourceInstanceSet r1; r1.Set(ResourceID("GPU"), std::vector({FixedPoint(1), FixedPoint(0.3)})); diff --git a/src/ray/raylet/placement_group_resource_manager_test.cc b/src/ray/raylet/placement_group_resource_manager_test.cc index e650898a6e92..c6e57e45abdd 100644 --- a/src/ray/raylet/placement_group_resource_manager_test.cc +++ b/src/ray/raylet/placement_group_resource_manager_test.cc @@ -78,6 +78,8 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test { TEST_F(NewPlacementGroupResourceManagerTest, TestGetOriginalResourceNameFromWildcardResource) { + scheduling::ResourceID resourceId = scheduling::ResourceID("GPU"); + ASSERT_EQ(resourceId, ResourceID::GPU()); ASSERT_EQ(GetOriginalResourceNameFromWildcardResource( "CPU_group_0_4482dec0faaf5ead891ff1659a9501000000"), ""); @@ -191,25 +193,20 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewPrepareBundleDuringDraining) ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundles(bundle2_specs)); // Prepared bundles can still be committed. new_placement_group_resource_manager_->CommitBundles(bundle1_specs); - absl::flat_hash_map remaining_resources = { + absl::flat_hash_map remaining_resources_total = { {"CPU_group_" + group1_id.Hex(), 1.0}, {"CPU_group_1_" + group1_id.Hex(), 1.0}, {"CPU", 2.0}, {"bundle_group_1_" + group1_id.Hex(), 1000}, {"bundle_group_" + group1_id.Hex(), 1000}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - unit_resource, resource_instances)); + absl::flat_hash_map remaining_resources_avail = { + {"CPU_group_" + group1_id.Hex(), 1.0}, + {"CPU_group_1_" + group1_id.Hex(), 1.0}, + {"CPU", 1.0}, + {"bundle_group_1_" + group1_id.Hex(), 1000}, + {"bundle_group_" + group1_id.Hex(), 1000}}; auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -225,25 +222,20 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) { ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); new_placement_group_resource_manager_->CommitBundles(bundle_specs); /// 4. check remaining resources is correct. - absl::flat_hash_map remaining_resources = { + absl::flat_hash_map remaining_resources_total = { {"CPU_group_" + group_id.Hex(), 1.0}, {"CPU_group_1_" + group_id.Hex(), 1.0}, {"CPU", 1.0}, {"bundle_group_1_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 1000}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - unit_resource, resource_instances)); + absl::flat_hash_map remaining_resources_avail = { + {"CPU_group_" + group_id.Hex(), 1.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU", 0.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 1000}}; auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -263,11 +255,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) { /// 4. return bundle resource. ASSERT_TRUE(new_placement_group_resource_manager_->ReturnBundle(bundle_spec).ok()); /// 5. check remaining resources is correct. - auto remaining_resource_scheduler = std::make_shared( - io_context, scheduling::NodeID("remaining"), unit_resource, is_node_available_fn_); auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(unit_resource, unit_resource); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -292,7 +281,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu new_placement_group_resource_manager_->CommitBundles( ConvertSingleSpecToVectorPtrs(second_bundle_spec)); /// 4. check remaining resources is correct after commit phase. - absl::flat_hash_map remaining_resources = { + absl::flat_hash_map remaining_resources_total = { {"CPU_group_" + group_id.Hex(), 2.0}, {"CPU_group_1_" + group_id.Hex(), 1.0}, {"CPU_group_2_" + group_id.Hex(), 1.0}, @@ -300,58 +289,41 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu {"bundle_group_1_" + group_id.Hex(), 1000}, {"bundle_group_2_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 2000}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - init_unit_resource, resource_instances)); + absl::flat_hash_map remaining_resources_avail = { + {"CPU_group_" + group_id.Hex(), 2.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU_group_2_" + group_id.Hex(), 1.0}, + {"CPU", 0.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_2_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 2000}}; auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); - + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); CheckRemainingResourceCorrect(remaining_resource_instance); /// 5. return second bundle. ASSERT_TRUE( new_placement_group_resource_manager_->ReturnBundle(second_bundle_spec).ok()); /// 6. check remaining resources is correct after return second bundle. - remaining_resources = {{"CPU_group_" + group_id.Hex(), 2.0}, - {"CPU_group_1_" + group_id.Hex(), 1.0}, - {"CPU", 2.0}, - {"bundle_group_1_" + group_id.Hex(), 1000}, - {"bundle_group_" + group_id.Hex(), 2000}}; - remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - {{"CPU_group_" + group_id.Hex(), 1.0}, - {"CPU", 1.0}, - {"bundle_group_" + group_id.Hex(), 1000}}, - resource_instances)); + remaining_resources_total = {{"CPU_group_" + group_id.Hex(), 2.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU", 2.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 2000}}; + remaining_resources_avail = {{"CPU_group_" + group_id.Hex(), 1.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU", 1.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 1000}}; remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); CheckRemainingResourceCorrect(remaining_resource_instance); /// 7. return first bundle. ASSERT_TRUE( new_placement_group_resource_manager_->ReturnBundle(first_bundle_spec).ok()); /// 8. check remaining resources is correct after all bundle returned. - remaining_resources = {{"CPU", 2.0}}; - remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); + remaining_resources_total = {{"CPU", 2.0}}; remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_total); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -370,20 +342,10 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare) ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); } /// 4. check remaining resources is correct. - absl::flat_hash_map remaining_resources = {{"CPU", 3.0}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - unit_resource, resource_instances)); + absl::flat_hash_map remaining_resources_total = {{"CPU", 3.0}}; + absl::flat_hash_map remaining_resources_avail = {{"CPU", 2.0}}; auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -405,25 +367,20 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( ConvertSingleSpecToVectorPtrs(bundle_spec))); /// 4. check remaining resources is correct. - absl::flat_hash_map remaining_resources = { + absl::flat_hash_map remaining_resources_total = { {"CPU_group_" + group_id.Hex(), 1.0}, {"CPU_group_1_" + group_id.Hex(), 1.0}, {"CPU", 3.0}, {"bundle_group_1_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 1000}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - unit_resource, resource_instances)); + absl::flat_hash_map remaining_resources_avail = { + {"CPU_group_" + group_id.Hex(), 1.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU", 2.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 1000}}; auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); CheckRemainingResourceCorrect(remaining_resource_instance); ASSERT_TRUE(new_placement_group_resource_manager_->ReturnBundle(bundle_spec).ok()); // 5. prepare bundle -> commit bundle -> commit bundle. @@ -443,14 +400,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) new_placement_group_resource_manager_->CommitBundles( ConvertSingleSpecToVectorPtrs(bundle_spec)); // 8. check remaining resources is correct. - remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - available_resource, - is_node_available_fn_); remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(available_resource, available_resource); CheckRemainingResourceCorrect(remaining_resource_instance); } @@ -470,14 +421,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) { ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); // 4. check remaining resources is correct. absl::flat_hash_map remaining_resources = {{"CPU", 3.0}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources, remaining_resources); CheckRemainingResourceCorrect(remaining_resource_instance); // 5. re-init the local available resource with 4 CPUs. available_resource = {std::make_pair("CPU", 4.0)}; @@ -488,32 +433,32 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) { ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); new_placement_group_resource_manager_->CommitBundles(bundle_specs); // 7. re-check remaining resources is correct. - remaining_resources = {{"CPU_group_" + group_id.Hex(), 4.0}, - {"CPU_group_1_" + group_id.Hex(), 1.0}, - {"CPU_group_2_" + group_id.Hex(), 1.0}, - {"CPU_group_3_" + group_id.Hex(), 1.0}, - {"CPU_group_4_" + group_id.Hex(), 1.0}, - {"CPU", 4.0}, - {"bundle_group_1_" + group_id.Hex(), 1000}, - {"bundle_group_2_" + group_id.Hex(), 1000}, - {"bundle_group_3_" + group_id.Hex(), 1000}, - {"bundle_group_4_" + group_id.Hex(), 1000}, - {"bundle_group_" + group_id.Hex(), 4000}}; - remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - absl::flat_hash_map allocating_resource; - allocating_resource.insert({"CPU", 4.0}); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - allocating_resource, resource_instances)); + absl::flat_hash_map remaining_resources_total = { + {"CPU_group_" + group_id.Hex(), 4.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU_group_2_" + group_id.Hex(), 1.0}, + {"CPU_group_3_" + group_id.Hex(), 1.0}, + {"CPU_group_4_" + group_id.Hex(), 1.0}, + {"CPU", 4.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_2_" + group_id.Hex(), 1000}, + {"bundle_group_3_" + group_id.Hex(), 1000}, + {"bundle_group_4_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 4000}}; + absl::flat_hash_map remaining_resources_avail = { + {"CPU_group_" + group_id.Hex(), 4.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU_group_2_" + group_id.Hex(), 1.0}, + {"CPU_group_3_" + group_id.Hex(), 1.0}, + {"CPU_group_4_" + group_id.Hex(), 1.0}, + {"CPU", 0.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_2_" + group_id.Hex(), 1000}, + {"bundle_group_3_" + group_id.Hex(), 1000}, + {"bundle_group_4_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 4000}}; remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); RAY_LOG(INFO) << "The current local resource view: " << cluster_resource_scheduler_->DebugString(); CheckRemainingResourceCorrect(remaining_resource_instance); @@ -523,11 +468,11 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) { // 1. create a placement group spec with 4 bundles and each required 1 CPU. auto group_id = PlacementGroupID::Of(JobID::FromInt(1)); absl::flat_hash_map unit_resource; - unit_resource.insert({"CPU", 1.0}); + unit_resource.insert({"GPU", 2.0}); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 4); // 2. init local available resource with 4 CPUs. absl::flat_hash_map available_resource = { - std::make_pair("CPU", 4.0)}; + std::make_pair("GPU", 10.0)}; InitLocalAvailableResource(available_resource); // 3. prepare resources for the four bundles and make sure it succeeds. ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); @@ -536,33 +481,32 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) { // make sure it keeps Idempotency. new_placement_group_resource_manager_->CommitBundles(bundle_specs); // 5. check remaining resources is correct. - absl::flat_hash_map remaining_resources = { - {"CPU_group_" + group_id.Hex(), 4.0}, - {"CPU_group_1_" + group_id.Hex(), 1.0}, - {"CPU_group_2_" + group_id.Hex(), 1.0}, - {"CPU_group_3_" + group_id.Hex(), 1.0}, - {"CPU_group_4_" + group_id.Hex(), 1.0}, - {"CPU", 4.0}, + absl::flat_hash_map remaining_resources_total = { + {"GPU_group_" + group_id.Hex(), 8.0}, + {"GPU_group_1_" + group_id.Hex(), 2.0}, + {"GPU_group_2_" + group_id.Hex(), 2.0}, + {"GPU_group_3_" + group_id.Hex(), 2.0}, + {"GPU_group_4_" + group_id.Hex(), 2.0}, + {"GPU", 10.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_2_" + group_id.Hex(), 1000}, + {"bundle_group_3_" + group_id.Hex(), 1000}, + {"bundle_group_4_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 4000}}; + absl::flat_hash_map remaining_resources_avail = { + {"GPU_group_" + group_id.Hex(), 8.0}, + {"GPU_group_1_" + group_id.Hex(), 2.0}, + {"GPU_group_2_" + group_id.Hex(), 2.0}, + {"GPU_group_3_" + group_id.Hex(), 2.0}, + {"GPU_group_4_" + group_id.Hex(), 2.0}, + {"GPU", 2.0}, {"bundle_group_1_" + group_id.Hex(), 1000}, {"bundle_group_2_" + group_id.Hex(), 1000}, {"bundle_group_3_" + group_id.Hex(), 1000}, {"bundle_group_4_" + group_id.Hex(), 1000}, {"bundle_group_" + group_id.Hex(), 4000}}; - auto remaining_resource_scheduler = - std::make_shared(io_context, - scheduling::NodeID("remaining"), - remaining_resources, - is_node_available_fn_); - std::shared_ptr resource_instances = - std::make_shared(); - absl::flat_hash_map allocating_resource; - allocating_resource.insert({"CPU", 4.0}); - ASSERT_TRUE( - remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources( - allocating_resource, resource_instances)); auto remaining_resource_instance = - remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources( - scheduling::NodeID("remaining")); + ResourceMapToNodeResources(remaining_resources_total, remaining_resources_avail); RAY_LOG(INFO) << "The current local resource view: " << cluster_resource_scheduler_->DebugString(); CheckRemainingResourceCorrect(remaining_resource_instance);