Skip to content

Commit

Permalink
Split block and evict task (ray-project#8)
Browse files Browse the repository at this point in the history
* blocktask and evict tasks separated

* blocktasks when oom

* command to replay the bug

* Fix crash in raylet on duplicate object

* Fixed bug in blocktasks. Did not block when block task called only

* Merged blocktasks and evicttasks into a single callback and erased evict callback

* get lowest pri, but obj does not have pri yet

* debug logs to see where to put priority

* log without compile error

* commit for migration

* working version. But still need testing

* Blocktasks but 2 new problems emerged

* added microbechmark

* deadlock cases 1 and 2 induced by not spilling. 1 is when workers are stuck with producers and 2 is when consumers are dependent on multiple objects

* polished debug messages

* typo fix

* removed scripts and included block, evict threshold triggered code

* changed get_lowest_pri to prevent a potential seg fault

* polished calling block_task callback

* modified flags to bool for readability and added a screening to avoid calling taskblock callback twice

* changed object_block callback function's arguments back to bool

* fixed unchanged function argument for the blockTask callback

* typo fix

Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu>
Co-authored-by: Ubuntu <ubuntu@ip-172-31-62-69.us-west-2.compute.internal>
  • Loading branch information
3 people authored Mar 3, 2022
1 parent d73b014 commit 7de7aff
Show file tree
Hide file tree
Showing 23 changed files with 362 additions and 204 deletions.
13 changes: 13 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,19 @@ RAY_CONFIG(int64_t, min_spilling_size, 100 * 1024 * 1024)
/// take more than this percentage of the available memory.
RAY_CONFIG(float, object_spilling_threshold, 1.0)

RAY_CONFIG(float, block_tasks_threshold, 1.0)

RAY_CONFIG(float, evict_tasks_threshold, 1.0)

// Whether to use BlockTasks
RAY_CONFIG(bool, enable_BlockTasks, false)

// Whether to block Spill at BlockTasks
RAY_CONFIG(bool, enable_BlockTasksSpill, false)

// Whether to use EvictTasks when spill required
RAY_CONFIG(bool, enable_EvictTasks, false)

/// Maximum number of objects that can be fused into a single file.
RAY_CONFIG(int64_t, max_fused_object_count, 2000)

Expand Down
4 changes: 4 additions & 0 deletions src/ray/common/task/task_priority.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ struct Priority {
return score[depth];
}

int GetDepth() {
return (int)score.size();
}

void SetScore(int64_t depth, int s) {
extend(depth + 1);
RAY_CHECK(score[depth] >= s);
Expand Down
75 changes: 33 additions & 42 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ void BuildCommonTaskSpec(
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
const BundleID &bundle_id, bool placement_group_capture_child_tasks,
const std::string debugger_breakpoint,
const Priority &priority,
const std::string debugger_breakpoint, const Priority &priority,
const std::string &serialized_runtime_env,
const std::vector<std::string> &runtime_env_uris,
const std::string &concurrency_group_name = "") {
Expand All @@ -50,9 +49,7 @@ void BuildCommonTaskSpec(
task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id,
current_task_id, task_index, caller_id, address, num_returns, required_resources,
required_placement_resources, bundle_id, placement_group_capture_child_tasks,
debugger_breakpoint,
priority,
serialized_runtime_env, runtime_env_uris,
debugger_breakpoint, priority, serialized_runtime_env, runtime_env_uris,
concurrency_group_name);
// Set task arguments.
for (const auto &arg : args) {
Expand Down Expand Up @@ -684,9 +681,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
rpc_address_, local_raylet_client_, core_worker_client_pool_, raylet_client_factory,
std::move(lease_policy), memory_store_, task_manager_, local_raylet_id,
RayConfig::instance().worker_lease_timeout_milliseconds(), actor_creator_,
/*get_task_priority=*/[](const TaskSpecification &spec) {
return spec.GetPriority();
},
/*get_task_priority=*/
[](const TaskSpecification &spec) { return spec.GetPriority(); },
RayConfig::instance().max_tasks_in_flight_per_worker(),
boost::asio::steady_timer(io_service_));
auto report_locality_data_callback =
Expand Down Expand Up @@ -1216,9 +1212,7 @@ Status CoreWorker::CreateOwned(const std::shared_ptr<Buffer> &metadata,
if (status.ok()) {
status = plasma_store_provider_->Create(metadata, data_size, *object_id,
/* owner_address = */ rpc_address_,
Priority(),
data,
created_by_worker);
Priority(), data, created_by_worker);
}
if (!status.ok() || !data) {
if (owned_by_us) {
Expand All @@ -1235,8 +1229,8 @@ Status CoreWorker::CreateOwned(const std::shared_ptr<Buffer> &metadata,
Status CoreWorker::CreateExisting(const std::shared_ptr<Buffer> &metadata,
const size_t data_size, const ObjectID &object_id,
const rpc::Address &owner_address,
const Priority &priority,
std::shared_ptr<Buffer> *data, bool created_by_worker) {
const Priority &priority, std::shared_ptr<Buffer> *data,
bool created_by_worker) {
if (options_.is_local_mode) {
return Status::NotImplemented(
"Creating an object with a pre-existing ObjectID is not supported in local "
Expand Down Expand Up @@ -1670,16 +1664,15 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
? function.GetFunctionDescriptor()->DefaultTaskName()
: task_options.name;
// TODO(ekl) offload task building onto a thread pool for performance
BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, task_options.num_returns,
constrained_resources, required_resources, placement_options,
placement_group_capture_child_tasks, debugger_breakpoint,
Priority(),
task_options.serialized_runtime_env, task_options.runtime_env_uris);
BuildCommonTaskSpec(
builder, worker_context_.GetCurrentJobID(), task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_,
function, args, task_options.num_returns, constrained_resources, required_resources,
placement_options, placement_group_capture_child_tasks, debugger_breakpoint,
Priority(), task_options.serialized_runtime_env, task_options.runtime_env_uris);
builder.SetNormalTaskSpec(max_retries, retry_exceptions);
TaskSpecification task_spec = builder.Build();
//priority = task_manager_->GenerateTaskPriority(task_spec);
// priority = task_manager_->GenerateTaskPriority(task_spec);
RAY_LOG(DEBUG) << "Submit task " << task_spec.DebugString();
std::vector<rpc::ObjectReference> returned_refs;
if (options_.is_local_mode) {
Expand All @@ -1689,7 +1682,6 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
CurrentCallSite(), max_retries);
io_service_.post(
[this, task_spec]() {
//(Jae) This is the reason why tasks are not placed with priority
RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec));
},
"CoreWorker.SubmitTask");
Expand Down Expand Up @@ -1734,8 +1726,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
new_placement_resources, actor_creation_options.placement_options,
actor_creation_options.placement_group_capture_child_tasks,
"", /* debugger_breakpoint */
Priority(),
actor_creation_options.serialized_runtime_env,
Priority(), actor_creation_options.serialized_runtime_env,
actor_creation_options.runtime_env_uris);

auto actor_handle = std::make_unique<ActorHandle>(
Expand Down Expand Up @@ -1917,11 +1908,10 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitActorTask(
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, num_returns, task_options.resources,
required_resources, std::make_pair(PlacementGroupID::Nil(), -1),
true, /* placement_group_capture_child_tasks */
"", /* debugger_breakpoint */
Priority(),
"{}", /* serialized_runtime_env */
{}, /* runtime_env_uris */
true, /* placement_group_capture_child_tasks */
"", /* debugger_breakpoint */
Priority(), "{}", /* serialized_runtime_env */
{}, /* runtime_env_uris */
task_options.concurrency_group_name);
// NOTE: placement_group_capture_child_tasks and runtime_env will
// be ignored in the actor because we should always follow the actor's option.
Expand Down Expand Up @@ -2144,9 +2134,10 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id,
const std::vector<ObjectID> &contained_object_ids,
int64_t &task_output_inlined_bytes,
std::shared_ptr<RayObject> *return_object) {
auto spec = worker_context_.GetCurrentTask();
rpc::Address owner_address(options_.is_local_mode
? rpc::Address()
: worker_context_.GetCurrentTask()->CallerAddress());
: spec->CallerAddress());

bool object_already_exists = false;
std::shared_ptr<Buffer> data_buffer;
Expand All @@ -2169,8 +2160,7 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id,
task_output_inlined_bytes += static_cast<int64_t>(data_size);
} else {
RAY_RETURN_NOT_OK(CreateExisting(metadata, data_size, object_id, owner_address,
Priority(),
&data_buffer,
spec->GetPriority(), &data_buffer,
/*created_by_worker=*/true));
object_already_exists = !data_buffer;
}
Expand All @@ -2180,6 +2170,8 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id,
auto contained_refs = GetObjectRefs(contained_object_ids);
*return_object =
std::make_shared<RayObject>(data_buffer, metadata, std::move(contained_refs));
} else {
RAY_LOG(DEBUG) << "Return object already exists " << object_id;
}

return Status::OK();
Expand Down Expand Up @@ -3124,16 +3116,15 @@ void CoreWorker::HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *rep
// any object pinning RPCs in flight.
bool is_idle = !own_objects && pins_in_flight == 0;
reply->set_success(is_idle);
send_reply_callback(
Status::OK(),
[this, is_idle]() {
// If the worker is idle, we exit.
if (is_idle) {
Exit(rpc::WorkerExitType::IDLE_EXIT);
}
},
// We need to kill it regardless if the RPC failed.
[this]() { Exit(rpc::WorkerExitType::INTENDED_EXIT); });
send_reply_callback(Status::OK(),
[this, is_idle]() {
// If the worker is idle, we exit.
if (is_idle) {
Exit(rpc::WorkerExitType::IDLE_EXIT);
}
},
// We need to kill it regardless if the RPC failed.
[this]() { Exit(rpc::WorkerExitType::INTENDED_EXIT); });
}

void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &request,
Expand Down
6 changes: 1 addition & 5 deletions src/ray/core_worker/transport/direct_actor_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,7 @@ void CoreWorkerDirectTaskReceiver::HandleTask(
return_object->set_object_id(id.Binary());

// The object is nullptr if it already existed in the object store.
if (!return_objects[i]) {
RAY_LOG(INFO) << "Failed to create task return object " << id
<< " in the object store, returning an error to the application.";
return_objects[i] = std::make_shared<RayObject>(rpc::ErrorType::OBJECT_LOST);
}
RAY_CHECK(return_objects[i]) << id;

const auto &result = return_objects[i];
return_object->set_size(result->GetSize());
Expand Down
8 changes: 6 additions & 2 deletions src/ray/object_manager/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ using SpillObjectsCallback = std::function<bool()>;

/// Callback when the creation of object(s) is blocked. The priority is the
/// highest priority of a blocked object.
using ObjectCreationBlockedCallback = std::function<void(const ray::Priority &priority)>;
using ObjectCreationBlockedCallback = std::function<void(const ray::Priority &priority,
bool BlockTasks, bool EvictTasks)>;

using SetShouldSpillCallback = std::function<void(bool should_spill)>;

Expand All @@ -53,6 +54,8 @@ struct ObjectInfo {
int owner_port;
/// Owner's worker ID.
WorkerID owner_worker_id;
// Priority of the object. Used for blockTasks() memory backpressure
ray::Priority priority;

int64_t GetObjectSize() const { return data_size + metadata_size; }

Expand All @@ -62,7 +65,8 @@ struct ObjectInfo {
(owner_raylet_id == other.owner_raylet_id) &&
(owner_ip_address == other.owner_ip_address) &&
(owner_port == other.owner_port) &&
(owner_worker_id == other.owner_worker_id));
(owner_worker_id == other.owner_worker_id) &&
(priority == other.priority));
}
};

Expand Down
2 changes: 2 additions & 0 deletions src/ray/object_manager/plasma/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class LocalObject {

const plasma::flatbuf::ObjectSource &GetSource() const { return source; }

ray::Priority &GetPriority() { return object_info.priority; }

void ToPlasmaObject(PlasmaObject *object, bool check_sealed) const {
RAY_DCHECK(object != nullptr);
if (check_sealed) {
Expand Down
Loading

0 comments on commit 7de7aff

Please sign in to comment.