Skip to content

Commit

Permalink
Add arbitration participant and operation objects to support global m…
Browse files Browse the repository at this point in the history
…emory arbitration optimization (facebookincubator#11074)

Summary:
Pull Request resolved: facebookincubator#11074

Add arbitration participant object to provide all the required arbitration operations and state management on a
query memory pool inside the memory arbitrator, such as arbitration queue to serialize the arbitration request execution
from the same query and the serialize the reclaim, shrink, grow and abort from either arbitration request and the
background memory arbitrations.

Add arbitration operation object to manage a memory arbitration request execution

Reviewed By: tanjialiang

Differential Revision: D63055730

fbshipit-source-id: 0db85eccf6c383807eb006f1fcfad8cb0b0aa596
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 24, 2024
1 parent cc46d81 commit dcaae29
Show file tree
Hide file tree
Showing 6 changed files with 2,862 additions and 0 deletions.
147 changes: 147 additions & 0 deletions velox/common/memory/ArbitrationOperation.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/memory/ArbitrationOperation.h"
#include <mutex>

#include "velox/common/base/Exceptions.h"
#include "velox/common/base/RuntimeMetrics.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/common/time/Timer.h"

using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::memory {
using namespace facebook::velox::memory;

ArbitrationOperation::ArbitrationOperation(
ScopedArbitrationParticipant&& participant,
uint64_t requestBytes,
uint64_t timeoutMs)
: requestBytes_(requestBytes),
timeoutMs_(timeoutMs),
createTimeMs_(getCurrentTimeMs()),
participant_(std::move(participant)) {
VELOX_CHECK_GT(requestBytes_, 0);
}

ArbitrationOperation::~ArbitrationOperation() {
VELOX_CHECK_NE(
state_,
State::kRunning,
"Unexpected arbitration operation state on destruction");
VELOX_CHECK(
allocatedBytes_ == 0 || allocatedBytes_ >= requestBytes_,
"Unexpected allocatedBytes_ {} vs requestBytes_ {}",
succinctBytes(allocatedBytes_),
succinctBytes(requestBytes_));
}

std::string ArbitrationOperation::stateName(State state) {
switch (state) {
case State::kInit:
return "init";
case State::kWaiting:
return "waiting";
case State::kRunning:
return "running";
case State::kFinished:
return "finished";
default:
return fmt::format("unknown state: {}", static_cast<int>(state));
}
}

void ArbitrationOperation::setState(State state) {
switch (state) {
case State::kWaiting:
VELOX_CHECK_EQ(state_, State::kInit);
break;
case State::kRunning:
VELOX_CHECK(this->state_ == State::kWaiting || state_ == State::kInit);
break;
case State::kFinished:
VELOX_CHECK_EQ(this->state_, State::kRunning);
break;
default:
VELOX_UNREACHABLE(
"Unexpected state transition from {} to {}", state_, state);
break;
}
state_ = state;
}

void ArbitrationOperation::start() {
VELOX_CHECK_EQ(state_, State::kInit);
participant_->startArbitration(this);
setState(ArbitrationOperation::State::kRunning);
}

void ArbitrationOperation::finish() {
setState(State::kFinished);
VELOX_CHECK_EQ(finishTimeMs_, 0);
finishTimeMs_ = getCurrentTimeMs();
participant_->finishArbitration(this);
}

bool ArbitrationOperation::aborted() const {
return participant_->aborted();
}

size_t ArbitrationOperation::executionTimeMs() const {
if (state_ == State::kFinished) {
VELOX_CHECK_GE(finishTimeMs_, createTimeMs_);
return finishTimeMs_ - createTimeMs_;
} else {
const auto currentTimeMs = getCurrentTimeMs();
VELOX_CHECK_GE(currentTimeMs, createTimeMs_);
return currentTimeMs - createTimeMs_;
}
}

bool ArbitrationOperation::hasTimeout() const {
return state_ != State::kFinished && timeoutMs() <= 0;
}

size_t ArbitrationOperation::timeoutMs() const {
if (state_ == State::kFinished) {
return 0;
}
const auto execTimeMs = executionTimeMs();
if (execTimeMs >= timeoutMs_) {
return 0;
}
return timeoutMs_ - execTimeMs;
}

void ArbitrationOperation::setGrowTargets() {
// We shall only set grow targets once after start execution.
VELOX_CHECK_EQ(state_, State::kRunning);
VELOX_CHECK(
maxGrowBytes_ == 0 && minGrowBytes_ == 0,
"Arbitration operation grow targets have already been set: {}/{}",
succinctBytes(maxGrowBytes_),
succinctBytes(minGrowBytes_));
participant_->getGrowTargets(requestBytes_, maxGrowBytes_, minGrowBytes_);
VELOX_CHECK_LE(requestBytes_, maxGrowBytes_);
}

std::ostream& operator<<(std::ostream& out, ArbitrationOperation::State state) {
out << ArbitrationOperation::stateName(state);
return out;
}
} // namespace facebook::velox::memory
176 changes: 176 additions & 0 deletions velox/common/memory/ArbitrationOperation.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/common/base/Counters.h"
#include "velox/common/base/GTestMacros.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/future/VeloxPromise.h"
#include "velox/common/memory/ArbitrationParticipant.h"
#include "velox/common/memory/Memory.h"

namespace facebook::velox::memory {

/// Manages the execution of a memory arbitration request within the arbitrator.
class ArbitrationOperation {
public:
ArbitrationOperation(
ScopedArbitrationParticipant&& pool,
uint64_t requestBytes,
uint64_t timeoutMs);

~ArbitrationOperation();

enum class State {
kInit,
kWaiting,
kRunning,
kFinished,
};

State state() const {
return state_;
}

static std::string stateName(State state);

/// Returns the corresponding arbitration participant.
const ScopedArbitrationParticipant& participant() {
return participant_;
}

/// Invoked to start arbitration execution on the arbitration participant. The
/// latter ensures the serialized execution of arbitration operations from the
/// same query with one at a time. So this method blocks until all the prior
/// arbitration operations finish.
void start();

/// Invoked to finish arbitration execution on the arbitration participant. It
/// also resumes the next waiting arbitration operation to execute if there is
/// one.
void finish();

/// Returns true if the corresponding arbitration participant has been
/// aborted.
bool aborted() const;

/// Invoked to set the grow targets for this arbitration operation based on
/// the request size.
///
/// NOTE: this should be called once after the arbitration operation is
/// started.
void setGrowTargets();

uint64_t requestBytes() const {
return requestBytes_;
}

/// Returns the max grow bytes for this arbitration operation which could be
/// larger than the request bytes for exponential growth.
uint64_t maxGrowBytes() const {
return maxGrowBytes_;
}

/// Returns the min grow bytes for this arbitration operation to ensure the
/// arbitration participant has the minimum amount of memory capacity. The
/// arbitrator might allocate memory from the reserved memory capacity pool
/// for the min grow bytes.
uint64_t minGrowBytes() const {
return minGrowBytes_;
}

/// Returns the allocated bytes by this arbitration operation.
uint64_t& allocatedBytes() {
return allocatedBytes_;
}

/// Returns the remaining execution time for this operation before time out.
/// If the operation has already finished, this returns zero.
size_t timeoutMs() const;

/// Returns true if this operation has timed out.
bool hasTimeout() const;

/// Returns the execution time of this arbitration operation since creation.
size_t executionTimeMs() const;

/// Getters/Setters of the wait time in (local) arbitration paritcipant wait
/// queue or (global) arbitrator request wait queue.
void setLocalArbitrationWaitTimeUs(uint64_t waitTimeUs) {
VELOX_CHECK_EQ(localArbitrationWaitTimeUs_, 0);
VELOX_CHECK_EQ(state_, State::kWaiting);
localArbitrationWaitTimeUs_ = waitTimeUs;
}

uint64_t localArbitrationWaitTimeUs() const {
return localArbitrationWaitTimeUs_;
}

void setGlobalArbitrationWaitTimeUs(uint64_t waitTimeUs) {
VELOX_CHECK_EQ(globalArbitrationWaitTimeUs_, 0);
VELOX_CHECK_EQ(state_, State::kRunning);
globalArbitrationWaitTimeUs_ = waitTimeUs;
}

uint64_t globalArbitrationWaitTimeUs() const {
return globalArbitrationWaitTimeUs_;
}

private:
void setState(State state);

const uint64_t requestBytes_;
const uint64_t timeoutMs_;

// The start time of this arbitration operation.
const uint64_t createTimeMs_;
const ScopedArbitrationParticipant participant_;

State state_{State::kInit};

uint64_t finishTimeMs_{0};

uint64_t maxGrowBytes_{0};
uint64_t minGrowBytes_{0};

// The actual bytes allocated from arbitrator based on the request bytes and
// grow targets. It is either zero on failure or between 'requestBytes_' and
// 'maxGrowBytes_' on success.
uint64_t allocatedBytes_{0};

// The time that waits in local arbitration queue.
uint64_t localArbitrationWaitTimeUs_{0};

// The time that waits for global arbitration queue.
uint64_t globalArbitrationWaitTimeUs_{0};

friend class ArbitrationParticipant;
};

std::ostream& operator<<(std::ostream& out, ArbitrationOperation::State state);
} // namespace facebook::velox::memory

template <>
struct fmt::formatter<facebook::velox::memory::ArbitrationOperation::State>
: formatter<std::string> {
auto format(
facebook::velox::memory::ArbitrationOperation::State state,
format_context& ctx) {
return formatter<std::string>::format(
facebook::velox::memory::ArbitrationOperation::stateName(state), ctx);
}
};
Loading

0 comments on commit dcaae29

Please sign in to comment.