diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a072f6d0b..569366ffb 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -55,6 +55,7 @@ option(BUILD_SHARED_LIBS "Build libwholegraph shared libraries" ON) option(CMAKE_CUDA_LINEINFO "Enable the -lineinfo option for nvcc (useful for cuda-memcheck / profiler" OFF) option(BUILD_TESTS "Configure CMake to build tests" ON) option(CUDA_STATIC_RUNTIME "Statically link the CUDA toolkit runtime and libraries" OFF) +option(BUILD_BENCHMARKS "Configure CMake to build benchmark" ON) ############################################################################## # - Set options based on user defined one ----------------------------------- @@ -203,6 +204,11 @@ if(BUILD_TESTS AND CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME) add_subdirectory(tests) endif() +# optionally build benchmark +if (BUILD_BENCHMARKS AND CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME) + add_subdirectory(bench) +endif() + ############################################################################## # - code checker ------------------------------------------------------------- diff --git a/cpp/bench/CMakeLists.txt b/cpp/bench/CMakeLists.txt new file mode 100644 index 000000000..9296f90fe --- /dev/null +++ b/cpp/bench/CMakeLists.txt @@ -0,0 +1,70 @@ +#============================================================================= +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#============================================================================= +# option(BUILD_BENCHMARKS "Build wholegraph C++ benchmark tests" ON) +message(VERBOSE "WHOLEGRAPH: Building wholegraph C++ benchmarks: ${BUILD_BENCHMARKS}") + +function(ConfigureBench) + + set(options OPTIONAL) + set(oneValueArgs NAME) + set(multiValueArgs PATH TARGETS CONFIGURATIONS) + cmake_parse_arguments(ConfigureBench "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) + + set(BENCH_NAME ${ConfigureBench_NAME}) + + add_executable(${BENCH_NAME} ${ConfigureBench_PATH}) + + target_include_directories(${BENCH_NAME} PRIVATE "$/src") + target_link_libraries( + ${BENCH_NAME} + PRIVATE wholegraph + raft::raft + rmm::rmm + pthread + ) + + set_target_properties( + ${BENCH_NAME} + PROPERTIES # set target compile options + INSTALL_RPATH "\$ORIGIN/../../../lib" + CXX_STANDARD 17 + CXX_STANDARD_REQUIRED ON + CUDA_ARCHITECTURES "${CMAKE_CUDA_ARCHITECTURES}" + POSITION_INDEPENDENT_CODE ON + RUNTIME_OUTPUT_DIRECTORY "$" + INTERFACE_POSITION_INDEPENDENT_CODE ON + ) + target_compile_options(${BENCH_NAME} PUBLIC $<$:-Wall -Werror + -Wno-error=deprecated-declarations>) + + install( + TARGETS ${BENCH_NAME} + COMPONENT testing + DESTINATION bin/gbench/libwholegraph + EXCLUDE_FROM_ALL + ) + + +endfunction() + +if(BUILD_BENCHMARKS) + ConfigureBench( + NAME GATHER_SCATTER_BENCH + PATH wholememory_ops/gather_scatter_bench.cu + common/wholegraph_benchmark.cpp + ) + +endif() diff --git a/cpp/bench/common/wholegraph_benchmark.cpp b/cpp/bench/common/wholegraph_benchmark.cpp new file mode 100644 index 000000000..c2f4c008d --- /dev/null +++ b/cpp/bench/common/wholegraph_benchmark.cpp @@ -0,0 +1,132 @@ +#============================================================================= +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#============================================================================= +#include "wholegraph_benchmark.hpp" + +#include "wholememory/communicator.hpp" +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace wholegraph::bench { + +template +void host_get_random_integer_indices(void* indices, + wholememory_array_description_t indice_desc, + int64_t max_indices) +{ + IndexT* indices_ptr = static_cast(indices); + std::experimental::reseed(); + for (int64_t i = 0; i < indice_desc.size; i++) { + IndexT random_index = std::experimental::randint(0, max_indices - 1); + indices_ptr[i + indice_desc.storage_offset] = random_index; + } +} + +void host_random_init_integer_indices(void* indices, + wholememory_array_description_t indices_desc, + int64_t max_indices) +{ + if (indices_desc.dtype == WHOLEMEMORY_DT_INT) { + host_get_random_integer_indices(indices, indices_desc, max_indices); + } else { + host_get_random_integer_indices(indices, indices_desc, max_indices); + } +} + +void MultiProcessMeasurePerformance(std::function run_fn, + wholememory_comm_t& wm_comm, + const PerformanceMeter& meter, + const std::function& barrier_fn) +{ + barrier_fn(); + // warm up + struct timeval tv_warmup_s; + gettimeofday(&tv_warmup_s, nullptr); + int64_t target_warmup_time = 1000LL * 1000LL * meter.warmup_seconds; + while (true) { + struct timeval tv_warmup_c; + gettimeofday(&tv_warmup_c, nullptr); + int64_t time_warmup = TIME_DIFF_US(tv_warmup_s, tv_warmup_c); + if (time_warmup >= target_warmup_time) break; + run_fn(); + WHOLEMEMORY_CHECK_NOTHROW(cudaDeviceSynchronize() == cudaSuccess); + } + WHOLEMEMORY_CHECK_NOTHROW(cudaDeviceSynchronize() == cudaSuccess); + barrier_fn(); + + // run + struct timeval tv_run_s, tv_run_e; + int64_t max_run_us = 1000LL * 1000LL * meter.max_run_seconds; + gettimeofday(&tv_run_s, nullptr); + int real_run_count = 0; + for (int i = 0; i < meter.run_count; i++) { + run_fn(); + real_run_count++; + struct timeval tv_run_c; + gettimeofday(&tv_run_c, nullptr); + int64_t time_run_used = TIME_DIFF_US(tv_run_s, tv_run_c); + if (time_run_used >= max_run_us || real_run_count >= meter.run_count) break; + if (meter.sync) { WHOLEMEMORY_CHECK_NOTHROW(cudaDeviceSynchronize() == cudaSuccess); } + } + WHOLEMEMORY_CHECK_NOTHROW(cudaDeviceSynchronize() == cudaSuccess); + gettimeofday(&tv_run_e, nullptr); + int64_t real_time_used_us = TIME_DIFF_US(tv_run_s, tv_run_e); + double single_run_time_us = real_time_used_us; + single_run_time_us /= real_run_count; + barrier_fn(); + + for (size_t i = 0; i < meter.metrics_.size(); i++) { + double metric_value = meter.metrics_[i].value; + if (meter.metrics_[i].invert) { + metric_value *= single_run_time_us; + metric_value /= 1e6; + } else { + metric_value /= single_run_time_us; + metric_value *= 1e6; + } + + std::vector recv_vec(wm_comm->world_size); + wm_comm->host_allgather(&metric_value, recv_vec.data(), 1, WHOLEMEMORY_DT_DOUBLE); + double min_metric, max_metric, avg_metric; + min_metric = max_metric = recv_vec[0]; + avg_metric = 0.0; + for (int j = 0; j < wm_comm->world_size; j++) { + min_metric = std::min(min_metric, recv_vec[j]); + max_metric = std::max(max_metric, recv_vec[j]); + avg_metric += recv_vec[j]; + } + avg_metric /= wm_comm->world_size; + if (wm_comm->world_rank == 0) { + fprintf(stderr, + "== Metric: %20s: min=%.2lf %s,, max=%.2lf %s,, avg=%.2lf %s\n", + meter.metrics_[i].name.c_str(), + min_metric, + meter.metrics_[i].unit.c_str(), + max_metric, + meter.metrics_[i].unit.c_str(), + avg_metric, + meter.metrics_[i].unit.c_str()); + } + } +} + +} // namespace wholegraph::bench diff --git a/cpp/bench/common/wholegraph_benchmark.hpp b/cpp/bench/common/wholegraph_benchmark.hpp new file mode 100644 index 000000000..7ac85ba59 --- /dev/null +++ b/cpp/bench/common/wholegraph_benchmark.hpp @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include +#include +#include + +#include +#include +#include + +#include "error.hpp" + +#include +#include +namespace wholegraph::bench { + +#define TIME_DIFF_US(TVS, TVE) \ + ((TVE.tv_sec - TVS.tv_sec) * 1000ULL * 1000ULL + (TVE.tv_usec - TVS.tv_usec)) + +void host_random_init_integer_indices(void* indices, + wholememory_array_description_t indices_desc, + int64_t max_indices); + +struct Metric { + Metric(const std::string& metrics_name, + const std::string& metrics_unit, + const double metrics_value, + bool inv) + { + name = metrics_name; + unit = metrics_unit; + value = metrics_value; + invert = inv; + } + std::string name; + std::string unit; + double value; + bool invert; +}; + +struct PerformanceMeter { + PerformanceMeter& SetSync() + { + sync = true; + return *this; + } + bool sync = false; + + PerformanceMeter& SetWarmupTime(float w) + { + warmup_seconds = w; + return *this; + } + float warmup_seconds = 0.05f; + + std::vector metrics_; + + PerformanceMeter& AddMetrics(const std::string& metrics_name, + const std::string& unit, + double value, + bool inv = false) + { + metrics_.emplace_back(metrics_name, unit, value, inv); + return *this; + } + + PerformanceMeter& SetRunCount(int count) + { + run_count = count; + return *this; + } + int run_count = 100; + + PerformanceMeter& SetMaxRunSeconds(float sec) + { + max_run_seconds = sec; + return *this; + } + float max_run_seconds = 10; + + PerformanceMeter& SetName(const std::string& n) + { + name = n; + return *this; + } + std::string name; +}; + +void MultiProcessMeasurePerformance(std::function run_fn, + wholememory_comm_t& wm_comm, + const PerformanceMeter& meter, + const std::function& barrier_fn); + +} // namespace wholegraph::bench diff --git a/cpp/bench/wholememory_ops/gather_scatter_bench.cu b/cpp/bench/wholememory_ops/gather_scatter_bench.cu new file mode 100644 index 000000000..d9ad7e072 --- /dev/null +++ b/cpp/bench/wholememory_ops/gather_scatter_bench.cu @@ -0,0 +1,453 @@ +/* + * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include "../common/wholegraph_benchmark.hpp" +#include "parallel_utils.hpp" +#include "wholememory/communicator.hpp" +#include "wholememory/env_func_ptrs.hpp" +#include "wholememory/initialize.hpp" + +#include "../../tests/wholememory/wholememory_test_utils.hpp" +namespace wholegraph::bench::gather_scatter { + +typedef struct GatherScatterBenchParam { + wholememory_matrix_description_t get_embedding_desc() const + { + int64_t embedding_entry_count = get_embedding_entry_count(); + int64_t matrix_sizes[2] = {embedding_entry_count, embedding_dim}; + return wholememory_create_matrix_desc( + matrix_sizes, embedding_stride, embedding_storage_offset, embedding_type); + } + wholememory_array_description_t get_indices_desc() const + { + int64_t indices_count = get_indices_count(); + return wholememory_create_array_desc(indices_count, indices_storage_offset, indices_type); + } + wholememory_matrix_description_t get_output_desc() const + { + int64_t indices_count = get_indices_count(); + int64_t output_sizes[2] = {indices_count, embedding_dim}; + return wholememory_create_matrix_desc( + output_sizes, output_stride, output_storage_offset, output_type); + } + + int64_t get_embedding_granularity() const + { + return embedding_stride * wholememory_dtype_get_element_size(embedding_type); + } + + int64_t get_embedding_table_size() const { return embedding_table_size; } + int64_t get_gather_size() const { return gather_size; } + + wholememory_memory_type_t get_memory_type() const { return memory_type; } + + wholememory_memory_location_t get_memory_location() const { return memory_location; } + int get_loop_count() const { return loop_count; } + std::string get_test_type() const { return test_type; } + + int64_t get_embedding_dim() const { return embedding_dim; } + wholememory_dtype_t get_embedding_type() const { return embedding_type; } + + GatherScatterBenchParam& set_memory_type(wholememory_memory_type_t new_memory_type) + { + memory_type = new_memory_type; + return *this; + } + GatherScatterBenchParam& set_memory_location(wholememory_memory_location_t new_memory_location) + { + memory_location = new_memory_location; + return *this; + } + GatherScatterBenchParam& set_embedding_table_size(int64_t new_embedding_table_size) + { + int64_t entry_size = wholememory_dtype_get_element_size(embedding_type) * get_embedding_dim(); + embedding_table_size = (new_embedding_table_size + entry_size - 1) / entry_size * entry_size; + return *this; + } + GatherScatterBenchParam& set_gather_size(int64_t new_gather_size) + { + int64_t entry_size = wholememory_dtype_get_element_size(embedding_type) * get_embedding_dim(); + gather_size = (new_gather_size + entry_size - 1) / entry_size * entry_size; + return *this; + } + GatherScatterBenchParam& set_embedding_dim(int64_t new_embedding_dim) + { + embedding_dim = new_embedding_dim; + if (embedding_stride != embedding_dim) embedding_stride = embedding_dim; + if (output_stride != embedding_dim) output_stride = embedding_dim; + int64_t entry_size = wholememory_dtype_get_element_size(embedding_type) * embedding_dim; + embedding_table_size = (embedding_table_size + entry_size - 1) / entry_size * entry_size; + gather_size = (gather_size + entry_size - 1) / entry_size * entry_size; + return *this; + } + + GatherScatterBenchParam& set_loop_count(int new_loop_count) + { + loop_count = new_loop_count; + return *this; + } + + GatherScatterBenchParam& set_test_type(std::string new_test_type) + { + test_type = new_test_type; + return *this; + } + + private: + int64_t get_embedding_entry_count() const + { + return embedding_table_size / wholememory_dtype_get_element_size(embedding_type) / + embedding_dim; + } + int64_t get_indices_count() const + { + return gather_size / wholememory_dtype_get_element_size(embedding_type) / embedding_dim; + } + + GatherScatterBenchParam& set_embedding_stride(int64_t new_embedding_stride) + { + embedding_stride = new_embedding_stride; + return *this; + } + GatherScatterBenchParam& set_output_stride(int64_t new_output_stride) + { + output_stride = new_output_stride; + return *this; + } + GatherScatterBenchParam& set_embedding_type(wholememory_dtype_t new_embedding_type) + { + embedding_type = new_embedding_type; + return *this; + } + GatherScatterBenchParam& set_indices_type(wholememory_dtype_t new_indices_type) + { + indices_type = new_indices_type; + return *this; + } + GatherScatterBenchParam& set_output_type(wholememory_dtype_t new_output_type) + { + output_type = new_output_type; + return *this; + } + wholememory_memory_type_t memory_type = WHOLEMEMORY_MT_CHUNKED; + wholememory_memory_location_t memory_location = WHOLEMEMORY_ML_DEVICE; + int64_t embedding_table_size = 1024000LL; + int64_t gather_size = 1024; + int64_t embedding_dim = 32; + int loop_count = 20; + std::string test_type = "gather"; // gather or scatter + + int64_t embedding_stride = 32; + int64_t output_stride = 32; + wholememory_dtype_t embedding_type = WHOLEMEMORY_DT_FLOAT; + wholememory_dtype_t indices_type = WHOLEMEMORY_DT_INT64; + wholememory_dtype_t output_type = WHOLEMEMORY_DT_FLOAT; + int64_t embedding_storage_offset = 0; + int64_t indices_storage_offset = 0; + int64_t output_storage_offset = 0; +} GatherScatterBenchParam; + +std::string get_memory_type_string(wholememory_memory_type_t memory_type) +{ + std::string str; + switch (memory_type) { + case WHOLEMEMORY_MT_NONE: str = "WHOLEMEMORY_MT_NONE"; break; + case WHOLEMEMORY_MT_CONTINUOUS: str = "WHOLEMEMORY_MT_CONTINUOUS"; break; + case WHOLEMEMORY_MT_CHUNKED: str = "WHOLEMEMORY_MT_CHUNKED"; break; + case WHOLEMEMORY_MT_DISTRIBUTED: str = "WHOLEMEMORY_MT_DISTRIBUTED"; break; + default: break; + } + return str; +} + +std::string get_memory_location_string(wholememory_memory_location_t memory_location) +{ + std::string str; + switch (memory_location) { + case WHOLEMEMORY_ML_NONE: str = "WHOLEMEMORY_ML_NONE"; break; + case WHOLEMEMORY_ML_DEVICE: str = "WHOLEMEMORY_ML_DEVICE"; break; + case WHOLEMEMORY_ML_HOST: str = "WHOLEMEMORY_ML_HOST"; break; + default: break; + } + return str; +} + +void gather_scatter_benchmark(GatherScatterBenchParam& params) +{ + int g_dev_count = ForkGetDeviceCount(); + WHOLEMEMORY_CHECK_NOTHROW(g_dev_count >= 1); + std::vector> pipes; + CreatePipes(&pipes, g_dev_count); + MultiProcessRun( + g_dev_count, + [¶ms, &pipes](int world_rank, int world_size) { + WHOLEMEMORY_CHECK_NOTHROW(wholememory_init(0) == WHOLEMEMORY_SUCCESS); + + WM_CUDA_CHECK_NO_THROW(cudaSetDevice(world_rank)); + + wholememory_comm_t wm_comm = create_communicator_by_pipes(pipes, world_rank, world_size); + + auto embedding_desc = params.get_embedding_desc(); + auto indices_desc = params.get_indices_desc(); + auto output_desc = params.get_output_desc(); + std::string test_type = params.get_test_type(); + size_t embedding_entry_size = params.get_embedding_granularity(); + + wholememory_tensor_t embedding_tensor; + wholememory_tensor_description_t embedding_tensor_desc; + wholememory_copy_matrix_desc_to_tensor(&embedding_tensor_desc, &embedding_desc); + WHOLEMEMORY_CHECK_NOTHROW(wholememory_create_tensor(&embedding_tensor, + &embedding_tensor_desc, + wm_comm, + params.get_memory_type(), + params.get_memory_location()) == + WHOLEMEMORY_SUCCESS); + + cudaStream_t stream; + WM_CUDA_CHECK_NO_THROW(cudaStreamCreate(&stream)); + + void *dev_indices = nullptr, *dev_gather_buffer = nullptr; + void* host_indices = nullptr; + size_t gather_buffer_size = params.get_gather_size(); + size_t indices_buffer_size = wholememory_get_memory_size_from_array(&indices_desc); + + WM_CUDA_CHECK_NO_THROW(cudaMallocHost(&host_indices, indices_buffer_size)); + WM_CUDA_CHECK_NO_THROW(cudaMalloc(&dev_indices, indices_buffer_size)); + WM_CUDA_CHECK_NO_THROW(cudaMalloc(&dev_gather_buffer, gather_buffer_size)); + + wholegraph::bench::host_random_init_integer_indices( + host_indices, indices_desc, embedding_desc.sizes[0]); + WM_CUDA_CHECK_NO_THROW(cudaMemcpyAsync(dev_indices, + host_indices, + wholememory_get_memory_size_from_array(&indices_desc), + cudaMemcpyHostToDevice, + stream)); + WM_CUDA_CHECK_NO_THROW(cudaStreamSynchronize(stream)); + WHOLEMEMORY_CHECK_NOTHROW(wholememory_communicator_barrier(wm_comm) == WHOLEMEMORY_SUCCESS); + + wholememory_tensor_t indices_tensor, output_tensor; + wholememory_tensor_description_t indices_tensor_desc, output_tensor_desc; + wholememory_copy_array_desc_to_tensor(&indices_tensor_desc, &indices_desc); + wholememory_copy_matrix_desc_to_tensor(&output_tensor_desc, &output_desc); + WHOLEMEMORY_CHECK_NOTHROW( + wholememory_make_tensor_from_pointer(&indices_tensor, dev_indices, &indices_tensor_desc) == + WHOLEMEMORY_SUCCESS); + WHOLEMEMORY_CHECK_NOTHROW(wholememory_make_tensor_from_pointer( + &output_tensor, dev_gather_buffer, &output_tensor_desc) == + WHOLEMEMORY_SUCCESS); + WM_CUDA_CHECK_NO_THROW(cudaStreamSynchronize(stream)); + WHOLEMEMORY_CHECK_NOTHROW(wholememory_communicator_barrier(wm_comm) == WHOLEMEMORY_SUCCESS); + + const auto barrier_fn = [&wm_comm]() -> void { + WHOLEMEMORY_CHECK_NOTHROW(wholememory_communicator_barrier(wm_comm) == WHOLEMEMORY_SUCCESS); + }; + + double emb_size_mb = (double)params.get_embedding_table_size() / 1024.0 / 1024.0; + double gather_size_mb = (double)params.get_gather_size() / 1024.0 / 1024.0; + if (world_rank == 0) { + printf( + "%s, world_size=%d, memoryType=%s, memoryLocation=%s, elt_size=%ld, embeddingDim=%ld, " + "embeddingTableSize=%.2lf MB, gatherSize=%.2lf MB\n", + test_type.c_str(), + world_size, + get_memory_type_string(params.get_memory_type()).c_str(), + get_memory_location_string(params.get_memory_location()).c_str(), + wholememory_dtype_get_element_size(params.get_embedding_type()), + params.get_embedding_dim(), + emb_size_mb, + gather_size_mb); + } + + PerformanceMeter meter; + meter.AddMetrics("Bandwidth", "GB/s", gather_buffer_size / 1000.0 / 1000.0 / 1000.0, false) + .SetMaxRunSeconds(1000) + .SetRunCount(params.get_loop_count()); + + if (test_type.compare("gather") == 0) { + MultiProcessMeasurePerformance( + [&] { + wholememory_gather(embedding_tensor, + indices_tensor, + output_tensor, + wholememory::get_cached_env_func(), + stream); + }, + wm_comm, + meter, + barrier_fn); + + } else if (test_type.compare("scatter") == 0) { + MultiProcessMeasurePerformance( + [&] { + wholememory_scatter(output_tensor, + indices_tensor, + embedding_tensor, + wholememory::get_cached_env_func(), + stream); + }, + wm_comm, + meter, + barrier_fn); + } else { + printf("Invalid test function, should be: gather or scatter\n"); + exit(EXIT_FAILURE); + } + wholememory::drop_cached_env_func(); + + WHOLEMEMORY_CHECK_NOTHROW(wholememory_destroy_tensor(indices_tensor) == WHOLEMEMORY_SUCCESS); + WHOLEMEMORY_CHECK_NOTHROW(wholememory_destroy_tensor(output_tensor) == WHOLEMEMORY_SUCCESS); + + WM_CUDA_CHECK_NO_THROW(cudaFreeHost(host_indices)); + WM_CUDA_CHECK_NO_THROW(cudaFree(dev_indices)); + WM_CUDA_CHECK_NO_THROW(cudaFree(dev_gather_buffer)); + + WHOLEMEMORY_CHECK_NOTHROW(wholememory_destroy_tensor(embedding_tensor) == + WHOLEMEMORY_SUCCESS); + + WHOLEMEMORY_CHECK_NOTHROW(wholememory::destroy_all_communicators() == WHOLEMEMORY_SUCCESS); + + WHOLEMEMORY_CHECK_NOTHROW(wholememory_finalize() == WHOLEMEMORY_SUCCESS); + }, + true); +} + +} // namespace wholegraph::bench::gather_scatter + +int main(int argc, char** argv) +{ + wholegraph::bench::gather_scatter::GatherScatterBenchParam params; + const char* optstr = "ht:l:e:g:d:c:f:"; + struct option opts[] = { + {"help", no_argument, NULL, 'h'}, + {"memory_type", + required_argument, + NULL, + 't'}, // 0: None, 1: Continuous, 2: Chunked, 3 Distributed + {"memory_location", required_argument, NULL, 'l'}, // 0: None, 1: Device, 2: Host + {"embedding_table_size", required_argument, NULL, 'e'}, + {"gather_size", required_argument, NULL, 'g'}, + {"embedding_dim", required_argument, NULL, 'd'}, + {"loop_count", required_argument, NULL, 'c'}, + {"test_type", required_argument, NULL, 'f'} // test_type: gather or scatter + }; + + const char* usage = + "Usage: %s [options]\n" + "Options:\n" + " -h, --help display this help and exit\n" + " -t, --memory_type specify wholememory type, 0: None, 1: Continuous, 2: Chunked, 3: " + "Distributed\n" + " -l, --memory_location specify wholememory location, 0: None, 1: Device, 2: Host\n" + " -e, --embedding_table_size specify embedding table size\n" + " -g, --gather_size specify gather size\n" + " -d, --embedding_dim specify embedding dimension\n" + " -c, --loop_count specify loop count\n" + " -f, --test_type specify test type: gather or scatter\n"; + + int c; + bool has_option = false; + while ((c = getopt_long(argc, argv, optstr, opts, NULL)) != -1) { + has_option = true; + switch (c) { + char* endptr; + long val; + case 'h': printf(usage, argv[0]); exit(EXIT_SUCCESS); + case 't': + val = strtol(optarg, &endptr, 10); + if (*endptr != '\0' || val < 0 || val > 3) { + printf("Invalid argument for option -t\n"); + printf(usage, argv[0]); + exit(EXIT_FAILURE); + } + params.set_memory_type(static_cast(val)); + break; + case 'l': + val = strtol(optarg, &endptr, 10); + if (*endptr != '\0' || val < 0 || val > 2) { + printf("Invalid argument for option -l\n"); + printf(usage, argv[0]); + exit(EXIT_FAILURE); + } + params.set_memory_location(static_cast(val)); + break; + case 'e': + val = std::stoll(optarg); + if (val < 0) { + printf("Negative value, invalid argument for option -e\n"); + printf(usage, argv[0]); + exit(EXIT_FAILURE); + } + params.set_embedding_table_size(val); + break; + case 'g': + val = std::stoll(optarg); + if (val < 0) { + printf("Negative value, invalid argument for option -g\n"); + printf(usage, argv[0]); + exit(EXIT_FAILURE); + } + params.set_gather_size(val); + break; + case 'd': + val = std::stoll(optarg); + if (val < 0) { + printf("Negative value, invalid argument for option -d\n"); + printf(usage, argv[0]); + exit(EXIT_FAILURE); + } + params.set_embedding_dim(val); + break; + case 'c': + val = std::stoi(optarg); + if (val < 0) { + printf("Negative value, invalid argument for option -c\n"); + printf(usage, argv[0]); + exit(EXIT_FAILURE); + } + params.set_loop_count(val); + break; + case 'f': + if (strcmp(optarg, "gather") == 0) { + params.set_test_type("gather"); + } else if (strcmp(optarg, "scatter") == 0) { + params.set_test_type("scatter"); + } else { + printf("Invalid argument for option -f\n"); + printf(usage, argv[0]); + exit(EXIT_FAILURE); + } + break; + default: + printf("Invalid or unrecognized option\n"); + printf(usage, argv[0]); + exit(EXIT_FAILURE); + } + } + if (!has_option) { printf("No option or argument is passed, use the default param\n"); } + wholegraph::bench::gather_scatter::gather_scatter_benchmark(params); + return 0; +} \ No newline at end of file diff --git a/cpp/src/wholememory/env_func_ptrs.cpp b/cpp/src/wholememory/env_func_ptrs.cpp index a9ff5dd0d..0b65ddc7d 100644 --- a/cpp/src/wholememory/env_func_ptrs.cpp +++ b/cpp/src/wholememory/env_func_ptrs.cpp @@ -15,7 +15,10 @@ */ #include +#include #include +#include +#include #include "cuda_macros.hpp" #include "error.hpp" @@ -101,29 +104,273 @@ static wholememory_env_func_t default_env_func = { wholememory_env_func_t* get_default_env_func() { return &default_env_func; } +class ChunkedMemoryPool { + public: + ChunkedMemoryPool(); + ~ChunkedMemoryPool(); + void* CachedMalloc(size_t size); + void CachedFree(void* ptr, size_t size); + void EmptyCache(); + virtual void* MallocFnImpl(size_t size) = 0; + virtual void FreeFnImpl(void* ptr) = 0; + + private: + static constexpr int kBucketCount = 64; + std::vector> mutexes_; + std::vector> sized_pool_; +}; +static size_t GetChunkIndex(size_t size) +{ + if (size == 0) return 0; + int power = 0; + size_t shifted_size = size; + while (shifted_size) { + shifted_size >>= 1; + power++; + } + if ((size & (size - 1)) == 0) { + return power - 1; + } else { + return power; + } +} +ChunkedMemoryPool::ChunkedMemoryPool() +{ + sized_pool_.resize(kBucketCount); + mutexes_.resize(kBucketCount); + for (int i = 0; i < kBucketCount; i++) { + mutexes_[i] = std::make_unique(); + } +} +ChunkedMemoryPool::~ChunkedMemoryPool() {} +void* ChunkedMemoryPool::CachedMalloc(size_t size) +{ + size_t chunked_index = GetChunkIndex(size); + std::unique_lock mlock(*mutexes_[chunked_index]); + if (!sized_pool_[chunked_index].empty()) { + void* ptr = sized_pool_[chunked_index].front(); + sized_pool_[chunked_index].pop(); + return ptr; + } else { + return MallocFnImpl(1ULL << chunked_index); + } + return nullptr; +} +void ChunkedMemoryPool::CachedFree(void* ptr, size_t size) +{ + size_t chunked_index = GetChunkIndex(size); + std::unique_lock mlock(*mutexes_[chunked_index]); + sized_pool_[chunked_index].push(ptr); +} +void ChunkedMemoryPool::EmptyCache() +{ + for (int i = 0; i < kBucketCount; i++) { + std::unique_lock mlock(*mutexes_[i]); + while (!sized_pool_[i].empty()) { + FreeFnImpl(sized_pool_[i].front()); + sized_pool_[i].pop(); + } + } +} +class DeviceChunkedMemoryPool : public ChunkedMemoryPool { + public: + explicit DeviceChunkedMemoryPool(int device_id); + ~DeviceChunkedMemoryPool(); + void* MallocFnImpl(size_t size) override; + void FreeFnImpl(void* ptr) override; + + protected: + int device_id_ = -1; +}; +DeviceChunkedMemoryPool::DeviceChunkedMemoryPool(int device_id) : device_id_(device_id) {} +DeviceChunkedMemoryPool::~DeviceChunkedMemoryPool() {} +void* DeviceChunkedMemoryPool::MallocFnImpl(size_t size) +{ + int old_dev; + void* ptr; + WM_CUDA_CHECK(cudaGetDevice(&old_dev)); + WM_CUDA_CHECK(cudaSetDevice(device_id_)); + WM_CUDA_CHECK(cudaMalloc(&ptr, size)); + WM_CUDA_CHECK(cudaSetDevice(old_dev)); + return ptr; +} +void DeviceChunkedMemoryPool::FreeFnImpl(void* ptr) +{ + int old_dev; + WM_CUDA_CHECK(cudaGetDevice(&old_dev)); + WM_CUDA_CHECK(cudaSetDevice(device_id_)); + WM_CUDA_CHECK(cudaFree(ptr)); + WM_CUDA_CHECK(cudaSetDevice(old_dev)); +} + +class PinnedChunkedMemoryPool : public ChunkedMemoryPool { + public: + PinnedChunkedMemoryPool() = default; + ~PinnedChunkedMemoryPool() = default; + void* MallocFnImpl(size_t size) override; + void FreeFnImpl(void* ptr) override; +}; +void* PinnedChunkedMemoryPool::MallocFnImpl(size_t size) +{ + void* ptr; + WM_CUDA_CHECK(cudaMallocHost(&ptr, size)); + return ptr; +} +void PinnedChunkedMemoryPool::FreeFnImpl(void* ptr) { WM_CUDA_CHECK(cudaFreeHost(ptr)); } + +class HostChunkedMemoryPool : public ChunkedMemoryPool { + public: + HostChunkedMemoryPool() = default; + ~HostChunkedMemoryPool() = default; + void* MallocFnImpl(size_t size) override; + void FreeFnImpl(void* ptr) override; +}; +void* HostChunkedMemoryPool::MallocFnImpl(size_t size) { return malloc(size); } +void HostChunkedMemoryPool::FreeFnImpl(void* ptr) { free(ptr); } class CachedAllocator { public: - CachedAllocator() { WHOLEMEMORY_FAIL_NOTHROW("Not implemented."); }; - ~CachedAllocator() { DropCaches(); } - void* MallocHost() { return nullptr; } - void* MallocDevice() { return nullptr; } - void* MallocPinned() { return nullptr; } - void FreeHost() {} - void FreeDevice() {} - void FreePinned() {} - void DropCaches() {} + void* MallocHost(size_t size); + void* MallocDevice(size_t size); + void* MallocPinned(size_t size); + void FreeHost(void* ptr, size_t size); + void FreeDevice(void* ptr, size_t size); + void FreePinned(void* ptr, size_t size); + void DropCaches(); + static CachedAllocator* GetInst(); private: - std::mutex mu_; + CachedAllocator() + { + device_chunked_mem_pools_.resize(kMaxSupportedDeviceCount); + for (int i = 0; i < kMaxSupportedDeviceCount; i++) { + device_chunked_mem_pools_[i] = std::make_unique(i); + } + pinned_chunked_mem_pool_ = std::make_unique(); + host_chunked_mem_pool_ = std::make_unique(); + } + ~CachedAllocator() {} + CachedAllocator(const CachedAllocator& ca) = delete; + const CachedAllocator& operator=(const CachedAllocator& ca) = delete; + + static CachedAllocator ca_inst_; + std::vector> device_chunked_mem_pools_; + std::unique_ptr pinned_chunked_mem_pool_; + std::unique_ptr host_chunked_mem_pool_; + static constexpr int kMaxSupportedDeviceCount = 16; }; -#define K_MAX_DEVICE_COUNT (16) +CachedAllocator CachedAllocator::ca_inst_; +CachedAllocator* CachedAllocator::GetInst() { return &ca_inst_; } + +void* CachedAllocator::MallocHost(size_t size) +{ + return host_chunked_mem_pool_->CachedMalloc(size); +} +void CachedAllocator::FreeHost(void* ptr, size_t size) +{ + host_chunked_mem_pool_->CachedFree(ptr, size); +} +void* CachedAllocator::MallocDevice(size_t size) +{ + int dev_id; + WM_CUDA_CHECK(cudaGetDevice(&dev_id)); + return device_chunked_mem_pools_[dev_id]->CachedMalloc(size); +} +void CachedAllocator::FreeDevice(void* ptr, size_t size) +{ + int dev_id; + WM_CUDA_CHECK(cudaGetDevice(&dev_id)); + device_chunked_mem_pools_[dev_id]->CachedFree(ptr, size); +} +void* CachedAllocator::MallocPinned(size_t size) +{ + return pinned_chunked_mem_pool_->CachedMalloc(size); +} +void CachedAllocator::FreePinned(void* ptr, size_t size) +{ + pinned_chunked_mem_pool_->CachedFree(ptr, size); +} +void CachedAllocator::DropCaches() +{ + for (int i = 0; i < kMaxSupportedDeviceCount; i++) { + device_chunked_mem_pools_[i]->EmptyCache(); + } + pinned_chunked_mem_pool_->EmptyCache(); + host_chunked_mem_pool_->EmptyCache(); +} + +void* cached_malloc_func(wholememory_tensor_description_t* tensor_description, + wholememory_memory_allocation_type_t memory_allocation_type, + void* memory_context, + void* /*global_context*/) +{ + auto* default_memory_context = static_cast(memory_context); + void* ptr = nullptr; + CachedAllocator* cached_inst = CachedAllocator::GetInst(); + int devid; + WM_CUDA_CHECK(cudaGetDevice((&devid))); + try { + if (memory_allocation_type == WHOLEMEMORY_MA_HOST) { + ptr = cached_inst->MallocHost(wholememory_get_memory_size_from_tensor(tensor_description)); + if (ptr == nullptr) { WHOLEMEMORY_FAIL_NOTHROW("cached malloc host returned nullptr.\n"); } + } else if (memory_allocation_type == WHOLEMEMORY_MA_PINNED) { + ptr = cached_inst->MallocPinned(wholememory_get_memory_size_from_tensor(tensor_description)); + if (ptr == nullptr) { WHOLEMEMORY_FAIL_NOTHROW("cached malloc pinned returned nullptr.\n"); } + } else if (memory_allocation_type == WHOLEMEMORY_MA_DEVICE) { + ptr = cached_inst->MallocDevice(wholememory_get_memory_size_from_tensor(tensor_description)); + if (ptr == nullptr) { WHOLEMEMORY_FAIL_NOTHROW("cached malloc device returned nullptr.\n"); } + } else { + WHOLEMEMORY_FAIL_NOTHROW("memory_allocation_type incorrect.\n"); + } + } catch (wholememory::cuda_error& wce) { + WHOLEMEMORY_FAIL_NOTHROW("cudaMalloc failed, %s.\n", wce.what()); + } + default_memory_context->desc = *tensor_description; + default_memory_context->ptr = ptr; + default_memory_context->allocation_type = memory_allocation_type; + return ptr; +} + +void cached_free_func(void* memory_context, void* /*global_context*/) +{ + CachedAllocator* cached_inst = CachedAllocator::GetInst(); + auto* default_memory_context = static_cast(memory_context); + auto memory_allocation_type = default_memory_context->allocation_type; + if (memory_allocation_type == WHOLEMEMORY_MA_HOST) { + cached_inst->FreeHost(default_memory_context->ptr, + wholememory_get_memory_size_from_tensor(&default_memory_context->desc)); + } else if (memory_allocation_type == WHOLEMEMORY_MA_PINNED) { + cached_inst->FreePinned(default_memory_context->ptr, + wholememory_get_memory_size_from_tensor(&default_memory_context->desc)); + } else if (memory_allocation_type == WHOLEMEMORY_MA_DEVICE) { + cached_inst->FreeDevice(default_memory_context->ptr, + wholememory_get_memory_size_from_tensor(&default_memory_context->desc)); + } else { + WHOLEMEMORY_FAIL_NOTHROW("memory_allocation_type incorrect.\n"); + } + wholememory_initialize_tensor_desc(&default_memory_context->desc); + default_memory_context->ptr = nullptr; + default_memory_context->allocation_type = WHOLEMEMORY_MA_NONE; +} -static CachedAllocator* p_cached_allocators[K_MAX_DEVICE_COUNT] = {nullptr}; +static wholememory_env_func_t cached_env_func = { + .temporary_fns = + { + .create_memory_context_fn = default_create_memory_context_func, + .destroy_memory_context_fn = default_destroy_memory_context_func, + .malloc_fn = cached_malloc_func, + .free_fn = cached_free_func, + .global_context = nullptr, + }, + .output_fns = { + .malloc_fn = cached_malloc_func, + .free_fn = cached_free_func, + .global_context = nullptr, + }}; -wholememory_env_func_t* get_cached_env_func() { WHOLEMEMORY_FAIL_NOTHROW("Not implemented."); } +wholememory_env_func_t* get_cached_env_func() { return &cached_env_func; } -void drop_env_func_cache() { WHOLEMEMORY_FAIL_NOTHROW("Not implemented."); } +void drop_cached_env_func() { CachedAllocator::GetInst()->DropCaches(); } } // namespace wholememory diff --git a/cpp/src/wholememory/env_func_ptrs.hpp b/cpp/src/wholememory/env_func_ptrs.hpp index 590f28991..42c37d171 100644 --- a/cpp/src/wholememory/env_func_ptrs.hpp +++ b/cpp/src/wholememory/env_func_ptrs.hpp @@ -48,6 +48,6 @@ wholememory_env_func_t* get_cached_env_func(); /** * @brief : drop all caches of inside cached allocator of current CUDA device */ -void drop_env_func_cache(); +void drop_cached_env_func(); } // namespace wholememory diff --git a/cpp/tests/wholememory/wholememory_test_utils.hpp b/cpp/tests/wholememory/wholememory_test_utils.hpp index 0e0bc3c39..890fcd8b5 100644 --- a/cpp/tests/wholememory/wholememory_test_utils.hpp +++ b/cpp/tests/wholememory/wholememory_test_utils.hpp @@ -15,8 +15,6 @@ */ #pragma once -#include - #include "parallel_utils.hpp" #include "wholememory/communicator.hpp" @@ -25,13 +23,15 @@ wholememory_comm_t create_communicator_by_pipes(const std::vector 0 else None, persistent_workers=True if num_workers > 0 else None, sampler=train_sampler, )