Skip to content

Commit

Permalink
[native] Switch to use string based arbitrator configs
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Aug 23, 2024
1 parent febecd6 commit d7ca7af
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 125 deletions.
162 changes: 139 additions & 23 deletions presto-docs/src/main/sphinx/presto_cpp/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,43 @@ The configuration properties of Presto C++ workers are described here, in alphab

In-memory cache.

``query.max-memory-per-node``
``runtime-metrics-collection-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* **Type:** ``boolean``
* **Default value:** ``false``

Enables collection of worker level metrics.

``task.max-drivers-per-task``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``integer``
* **Default value:** ``number of hardware CPUs``

Number of drivers to use per task. Defaults to hardware CPUs.

``query.max-memory-per-node``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Default value:** ``4GB``

Max memory usage for each query.


``system-memory-gb``
^^^^^^^^^^^^^^^^^^^^

* **Type:** ``integer``
* **Default value:** ``40``

Memory allocation limit enforced by an internal memory allocator. It consists of two parts:
1) Memory used by the queries as specified in ``query-memory-gb``; 2) Memory used by the
system, such as disk spilling and cache prefetch.

Set ``system-memory-gb`` to the available machine memory of the deployment.


``query-memory-gb``
^^^^^^^^^^^^^^^^^^^

Expand All @@ -113,41 +142,128 @@ The configuration properties of Presto C++ workers are described here, in alphab
worker node. Memory for system usage such as disk spilling and cache prefetch are
not counted in it.

``query-reserved-memory-gb``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
``shared-arbitrator.reserved-capacity``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``integer``
* **Default value:** ``4``
* **Type:** ``string``
* **Default value:** ``4GB``

Specifies the total amount of memory in GB reserved for the queries on
a worker node. A query can only allocate from this reserved space if
1) the non-reserved space in ``query-memory-gb`` is used up; and 2) the amount
it tries to get is less than ``memory-pool-reserved-capacity``.
it tries to get is less than ``shared-arbitrator.memory-pool-reserved-capacity``.

``runtime-metrics-collection-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* **Type:** ``boolean``
``shared-arbitrator.memory-pool-initial-capacity``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Default value:** ``128MB``

The initial memory pool capacity in bytes allocated on creation.

``shared-arbitrator.global-arbitration-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Default value:** ``false``

Enables collection of worker level metrics.
If true, it allows shared arbitrator to reclaim used memory across query
memory pools.

``system-memory-gb``
^^^^^^^^^^^^^^^^^^^^
``shared-arbitrator.memory-pool-reserved-capacity``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``integer``
* **Default value:** ``40``
* **Type:** ``string``
* **Default value:** ``64MB``

Memory allocation limit enforced via internal memory allocator. It consists of two parts:
1) Memory used by the queries as specified in ``query-memory-gb``; 2) Memory used by the
system, such as disk spilling and cache prefetch.
The amount of memory in bytes reserved for each query memory pool. When
a query tries to allocate memory from the reserved space whose size is
specified by ``shared-arbitrator.reserved-capacity``, it cannot allocate
more than the value specified in ``shared-arbitrator.memory-pool-reserved-capacity``.

Set ``system-memory-gb`` to the available machine memory of the deployment.
``shared-arbitrator.memory-pool-transfer-capacity``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

``task.max-drivers-per-task``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* **Type:** ``string``
* **Default value:** ``32MB``

* **Type:** ``integer``
* **Default value:** ``number of hardware CPUs``
The minimal memory capacity in bytes transferred between memory pools
during memory arbitration.

Number of drivers to use per task. Defaults to hardware CPUs.
``shared-arbitrator.memory-reclaim-max-wait-time``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Default value:** ``5m``

Specifies the max time to wait for memory reclaim by arbitration. The
memory reclaim might fail if the max wait time has exceeded. If it is
zero, then there is no timeout.

``shared-arbitrator.fast-exponential-growth-capacity-limit``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Default value:** ``512MB``

When shared arbitrator grows memory pool's capacity, the growth bytes will
be adjusted in the following way:

* If 2 * current capacity is less than or equal to
``shared-arbitrator.fast-exponential-growth-capacity-limit``, grow
through fast path by at least doubling the current capacity, when
conditions allow (see below NOTE section).
* If 2 * current capacity is greater than
``shared-arbitrator.fast-exponential-growth-capacity-limit``, grow
through slow path by growing capacity by at least
``shared-arbitrator.slow-capacity-grow-pct`` * current capacity if
allowed (see below NOTE section).

NOTE: If original requested growth bytes is larger than the adjusted
growth bytes or adjusted growth bytes reaches max capacity limit, the
adjusted growth bytes will not be respected.

NOTE: Capacity growth adjust is only enabled if both
``shared-arbitrator.fast-exponential-growth-capacity-limit`` and
``shared-arbitrator.slow-capacity-grow-pct`` are set, otherwise it is
disabled.

``shared-arbitrator.slow-capacity-grow-pct``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Default value:** ``0.25``

See description for ``shared-arbitrator.fast-exponential-growth-capacity-limit``

``shared-arbitrator.memory-pool-min-free-capacity``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Default value:** ``128MB``

When shared arbitrator shrinks memory pool's capacity, the shrink bytes
will be adjusted in a way such that AFTER shrink, the stricter (whichever
is smaller) of the following conditions is met, in order to better fit the
pool's current memory usage:

* Free capacity is greater or equal to capacity *
``shared-arbitrator.memory-pool-min-free-capacity-pct``
* Free capacity is greater or equal to
``shared-arbitrator.memory-pool-min-free-capacity``

NOTE: In the conditions when original requested shrink bytes ends up
with more free capacity than above two conditions, the adjusted shrink
bytes is not respected.

NOTE: Capacity shrink adjustment is enabled when both
``shared-arbitrator.memory-pool-min-free-capacity-pct`` and
``shared-arbitrator.memory-pool-min-free-capacity`` are set.

``shared-arbitrator.memory-pool-min-free-capacity-pct``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Default value:** ``0.25``

See description for ``shared-arbitrator.memory-pool-min-free-capacity``
46 changes: 31 additions & 15 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -812,27 +812,43 @@ void PrestoServer::initializeVeloxMemory() {
memoryGb,
"Query memory capacity must not be larger than system memory capacity");
options.arbitratorCapacity = queryMemoryGb << 30;
const uint64_t queryReservedMemoryGb = velox::config::toCapacity(
const uint64_t sharedArbitratorReservedMemoryGb = velox::config::toCapacity(
systemConfig->sharedArbitratorReservedCapacity(),
velox::config::CapacityUnit::GIGABYTE);
VELOX_USER_CHECK_LE(
queryReservedMemoryGb,
sharedArbitratorReservedMemoryGb,
queryMemoryGb,
"Query reserved memory capacity must not be larger than query memory capacity");

// TODO(jtan6): [Config Refactor] Migrate these old settings to string based
// extra settings + grow & shrink settings.
options.arbitratorReservedCapacity = queryReservedMemoryGb << 30;
options.memoryPoolInitCapacity = systemConfig->memoryPoolInitCapacity();
options.memoryPoolReservedCapacity =
systemConfig->memoryPoolReservedCapacity();
options.memoryPoolTransferCapacity =
systemConfig->memoryPoolTransferCapacity();
options.memoryReclaimWaitMs = systemConfig->memoryReclaimWaitMs();
options.globalArbitrationEnabled =
systemConfig->memoryArbitratorGlobalArbitrationEnabled();
"Shared arbitrator reserved memory capacity must not be larger than "
"query memory capacity");

options.largestSizeClassPages = systemConfig->largestSizeClassPages();
options.arbitrationStateCheckCb = velox::exec::memoryArbitrationStateCheck;

using SharedArbitratorConfig = velox::memory::SharedArbitrator::ExtraConfig;
options.extraArbitratorConfigs = {
{std::string(SharedArbitratorConfig::kReservedCapacity),
systemConfig->sharedArbitratorReservedCapacity()},
{std::string(SharedArbitratorConfig::kMemoryPoolInitialCapacity),
systemConfig->sharedArbitratorMemoryPoolInitialCapacity()},
{std::string(SharedArbitratorConfig::kMemoryPoolReservedCapacity),
systemConfig->sharedArbitratorMemoryPoolReservedCapacity()},
{std::string(SharedArbitratorConfig::kMemoryPoolTransferCapacity),
systemConfig->sharedArbitratorMemoryPoolTransferCapacity()},
{std::string(SharedArbitratorConfig::kMemoryReclaimMaxWaitTime),
systemConfig->sharedArbitratorMemoryReclaimWaitTime()},
{std::string(SharedArbitratorConfig::kMemoryPoolMinFreeCapacity),
systemConfig->sharedArbitratorMemoryPoolMinFreeCapacity()},
{std::string(SharedArbitratorConfig::kMemoryPoolMinFreeCapacityPct),
systemConfig->sharedArbitratorMemoryPoolMinFreeCapacityPct()},
{std::string(SharedArbitratorConfig::kGlobalArbitrationEnabled),
systemConfig->sharedArbitratorGlobalArbitrationEnabled()},
{std::string(
SharedArbitratorConfig::kFastExponentialGrowthCapacityLimit),
systemConfig->sharedArbitratorFastExponentialGrowthCapacityLimit()},
{std::string(SharedArbitratorConfig::kSlowCapacityGrowPct),
systemConfig->sharedArbitratorSlowCapacityGrowPct()},
{std::string(SharedArbitratorConfig::kCheckUsageLeak),
folly::to<std::string>(systemConfig->enableMemoryLeakCheck())}};
}
memory::initializeMemoryManager(options);
PRESTO_STARTUP_LOG(INFO) << "Memory manager has been setup: "
Expand Down
41 changes: 3 additions & 38 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ SystemConfig::SystemConfig() {
BOOL_PROP(kEnableSerializedPageChecksum, true),
BOOL_PROP(kUseMmapAllocator, true),
STR_PROP(kMemoryArbitratorKind, ""),
BOOL_PROP(kMemoryArbitratorGlobalArbitrationEnabled, false),
NUM_PROP(kQueryMemoryGb, 38),
NUM_PROP(kQueryReservedMemoryGb, 4),
STR_PROP(kSharedArbitratorReservedCapacity, "4GB"),
STR_PROP(kSharedArbitratorMemoryPoolInitialCapacity, "128MB"),
STR_PROP(kSharedArbitratorMemoryPoolReservedCapacity, "64MB"),
Expand Down Expand Up @@ -509,9 +507,8 @@ std::string SystemConfig::memoryArbitratorKind() const {
return optionalProperty<std::string>(kMemoryArbitratorKind).value_or("");
}

bool SystemConfig::memoryArbitratorGlobalArbitrationEnabled() const {
return optionalProperty<bool>(kMemoryArbitratorGlobalArbitrationEnabled)
.value_or(false);
int32_t SystemConfig::queryMemoryGb() const {
return optionalProperty<int32_t>(kQueryMemoryGb).value();
}

std::string SystemConfig::sharedArbitratorGlobalArbitrationEnabled() const {
Expand All @@ -520,25 +517,11 @@ std::string SystemConfig::sharedArbitratorGlobalArbitrationEnabled() const {
.value_or("false");
}

int32_t SystemConfig::queryMemoryGb() const {
return optionalProperty<int32_t>(kQueryMemoryGb).value();
}

int32_t SystemConfig::queryReservedMemoryGb() const {
return optionalProperty<int32_t>(kQueryReservedMemoryGb).value();
}

std::string SystemConfig::sharedArbitratorReservedCapacity() const {
return optionalProperty<std::string>(kSharedArbitratorReservedCapacity)
.value();
}

uint64_t SystemConfig::memoryPoolInitCapacity() const {
static constexpr uint64_t kMemoryPoolInitCapacityDefault = 128 << 20;
return optionalProperty<uint64_t>(kMemoryPoolInitCapacity)
.value_or(kMemoryPoolInitCapacityDefault);
}

std::string SystemConfig::sharedArbitratorMemoryPoolInitialCapacity() const {
static constexpr std::string_view
kSharedArbitratorMemoryPoolInitialCapacityDefault = "128MB";
Expand All @@ -547,12 +530,6 @@ std::string SystemConfig::sharedArbitratorMemoryPoolInitialCapacity() const {
.value_or(std::string(kSharedArbitratorMemoryPoolInitialCapacityDefault));
}

uint64_t SystemConfig::memoryPoolReservedCapacity() const {
static constexpr uint64_t kMemoryPoolReservedCapacityDefault = 64 << 20;
return optionalProperty<uint64_t>(kMemoryPoolReservedCapacity)
.value_or(kMemoryPoolReservedCapacityDefault);
}

std::string SystemConfig::sharedArbitratorMemoryPoolReservedCapacity() const {
static constexpr std::string_view
kSharedArbitratorMemoryPoolReservedCapacityDefault = "64MB";
Expand All @@ -562,12 +539,6 @@ std::string SystemConfig::sharedArbitratorMemoryPoolReservedCapacity() const {
std::string(kSharedArbitratorMemoryPoolReservedCapacityDefault));
}

uint64_t SystemConfig::memoryPoolTransferCapacity() const {
static constexpr uint64_t kMemoryPoolTransferCapacityDefault = 32 << 20;
return optionalProperty<uint64_t>(kMemoryPoolTransferCapacity)
.value_or(kMemoryPoolTransferCapacityDefault);
}

std::string SystemConfig::sharedArbitratorMemoryPoolTransferCapacity() const {
static constexpr std::string_view
kSharedArbitratorMemoryPoolTransferCapacityDefault = "32MB";
Expand All @@ -577,12 +548,6 @@ std::string SystemConfig::sharedArbitratorMemoryPoolTransferCapacity() const {
std::string(kSharedArbitratorMemoryPoolTransferCapacityDefault));
}

uint64_t SystemConfig::memoryReclaimWaitMs() const {
static constexpr uint64_t kMemoryReclaimWaitMsDefault = {300'000}; // 5 mins.
return optionalProperty<uint64_t>(kMemoryReclaimWaitMs)
.value_or(kMemoryReclaimWaitMsDefault);
}

std::string SystemConfig::sharedArbitratorMemoryReclaimWaitTime() const {
static constexpr std::string_view
kSharedArbitratorMemoryReclaimMaxWaitTimeDefault = "5m";
Expand Down Expand Up @@ -620,7 +585,7 @@ std::string SystemConfig::sharedArbitratorMemoryPoolMinFreeCapacityPct() const {
static constexpr std::string_view
kSharedArbitratorMemoryPoolMinFreeCapacityPctDefault = "0.25";
return optionalProperty<std::string>(
kSharedArbitratorMemoryPoolMinFreeCapacityPctDefault)
kSharedArbitratorMemoryPoolMinFreeCapacityPct)
.value_or(
std::string(kSharedArbitratorMemoryPoolMinFreeCapacityPctDefault));
}
Expand Down
Loading

0 comments on commit d7ca7af

Please sign in to comment.