Skip to content

Commit

Permalink
[CELEBORN-1809][CIP-14] Add partitionLocation to cppClient
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR adds PartitionLocation to cppClient, which is the component of protocol module.

### Why are the changes needed?
To support communication message of PartitionLocation.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Compilation and UTs.

Closes #3035 from HolyLow/issue/celeborn-1809-add-partition-location-to-cppClient.

Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
  • Loading branch information
HolyLow authored and FMX committed Dec 27, 2024
1 parent eb59c17 commit 4714e91
Show file tree
Hide file tree
Showing 7 changed files with 458 additions and 1 deletion.
3 changes: 2 additions & 1 deletion cpp/celeborn/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
add_subdirectory(utils)
add_subdirectory(proto)
add_subdirectory(memory)
add_subdirectory(utils)
add_subdirectory(conf)
add_subdirectory(protocol)
30 changes: 30 additions & 0 deletions cpp/celeborn/protocol/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
add_library(protocol PartitionLocation.cpp)

target_include_directories(protocol PUBLIC ${CMAKE_BINARY_DIR})

target_link_libraries(
protocol
memory
proto
${FOLLY_WITH_DEPENDENCIES}
${GLOG}
${GFLAGS_LIBRARIES}
)

if(CELEBORN_BUILD_TESTS)
add_subdirectory(tests)
endif()
91 changes: 91 additions & 0 deletions cpp/celeborn/protocol/PartitionLocation.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "celeborn/protocol/PartitionLocation.h"

#include "celeborn/protocol/StatusCode.h"
#include "celeborn/utils/CelebornUtils.h"
#include "celeborn/utils/Exceptions.h"

namespace celeborn {
std::unique_ptr<StorageInfo> StorageInfo::fromPb(const PbStorageInfo& pb) {
auto result = std::make_unique<StorageInfo>();
result->type = static_cast<Type>(pb.type());
result->mountPoint = pb.mountpoint();
result->finalResult = pb.finalresult();
result->filePath = pb.filepath();
result->availableStorageTypes = pb.availablestoragetypes();
return std::move(result);
}

std::unique_ptr<const PartitionLocation> PartitionLocation::fromPb(
const PbPartitionLocation& pb) {
auto result = fromPbWithoutPeer(pb);
if (pb.has_peer()) {
auto peer = fromPbWithoutPeer(pb.peer());
if (result->mode == PRIMARY) {
CELEBORN_CHECK(
peer->mode == REPLICA, "PRIMARY's peer mode should be REPLICA");
result->replicaPeer = std::move(peer);
} else {
CELEBORN_CHECK(
peer->mode == PRIMARY, "REPLICA's peer mode should be PRIMARY");
peer->replicaPeer = std::move(result);
result = std::move(peer);
}
}
CELEBORN_CHECK(result->mode == PRIMARY, "non-peer's mode should be PRIMARY");
return std::move(result);
}

PartitionLocation::PartitionLocation(const PartitionLocation& other)
: id(other.id),
epoch(other.epoch),
host(other.host),
rpcPort(other.rpcPort),
pushPort(other.pushPort),
fetchPort(other.fetchPort),
replicatePort(other.replicatePort),
mode(other.mode),
replicaPeer(
other.replicaPeer
? std::make_unique<PartitionLocation>(*other.replicaPeer)
: nullptr),
storageInfo(std::make_unique<StorageInfo>(*other.storageInfo)) {}

std::unique_ptr<PartitionLocation> PartitionLocation::fromPbWithoutPeer(
const PbPartitionLocation& pb) {
auto result = std::make_unique<PartitionLocation>();
result->id = pb.id();
result->epoch = pb.epoch();
result->host = pb.host();
result->rpcPort = pb.rpcport();
result->pushPort = pb.pushport();
result->fetchPort = pb.fetchport();
result->replicatePort = pb.replicateport();
result->mode = static_cast<Mode>(pb.mode());
result->replicaPeer = nullptr;
result->storageInfo = StorageInfo::fromPb(pb.storageinfo());
return std::move(result);
}

StatusCode toStatusCode(int32_t code) {
CELEBORN_CHECK(code >= 0);
CELEBORN_CHECK(code <= StatusCode::TAIL);
return static_cast<StatusCode>(code);
}
} // namespace celeborn
95 changes: 95 additions & 0 deletions cpp/celeborn/protocol/PartitionLocation.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>

#include "celeborn/proto/TransportMessagesCpp.pb.h"
#include "celeborn/utils/Exceptions.h"

namespace celeborn {
struct StorageInfo {
static std::unique_ptr<StorageInfo> fromPb(const PbStorageInfo& pb);

StorageInfo() = default;

StorageInfo(const StorageInfo& other) = default;

enum Type {
MEMORY = 0,
HDD = 1,
SSD = 2,
HDFS = 3,
OSS = 4,
S3 = 5,
};

static const int MEMORY_MASK = 0b1;
static const int LOCAL_DISK_MASK = 0b10;
static const int HDFS_MASK = 0b100;
static const int OSS_MASK = 0b1000;
static const int S3_MASK = 0b10000;
static const int ALL_TYPES_AVAILABLE_MASK = 0;

Type type{MEMORY};
std::string mountPoint;
// if a file is committed, field "finalResult" will be true
bool finalResult{false};
std::string filePath;
int availableStorageTypes{0};
};

struct PartitionLocation {
enum Mode {
PRIMARY = 0,
REPLICA = 1,
};

int id;
int epoch;
std::string host;
int rpcPort;
int pushPort;
int fetchPort;
int replicatePort;
Mode mode;
// Only primary PartitionLocation might have replicaPeer.
// Replica PartitionLocation would not have replicaPeer.
// The lifecycle of Replica is bounded with Primary.
std::unique_ptr<PartitionLocation> replicaPeer{nullptr};
std::unique_ptr<StorageInfo> storageInfo;
// TODO: RoaringBitmap is not supported yet.
// RoaringBitmap mapIdBitMap;

static std::unique_ptr<const PartitionLocation> fromPb(
const PbPartitionLocation& pb);

PartitionLocation() = default;

PartitionLocation(const PartitionLocation& other);

std::string filename() const {
return std::to_string(id) + "-" + std::to_string(epoch) + "-" +
std::to_string(mode);
}

private:
static std::unique_ptr<PartitionLocation> fromPbWithoutPeer(
const PbPartitionLocation& pb);
};
} // namespace celeborn
95 changes: 95 additions & 0 deletions cpp/celeborn/protocol/StatusCode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

namespace celeborn {
enum StatusCode {
// 1/0 Status
SUCCESS = 0,
PARTIAL_SUCCESS = 1,
REQUEST_FAILED = 2,

// Specific Status
SHUFFLE_ALREADY_REGISTERED = 3,
SHUFFLE_NOT_REGISTERED = 4,
RESERVE_SLOTS_FAILED = 5,
SLOT_NOT_AVAILABLE = 6,
WORKER_NOT_FOUND = 7,
PARTITION_NOT_FOUND = 8,
REPLICA_PARTITION_NOT_FOUND = 9,
DELETE_FILES_FAILED = 10,
PARTITION_EXISTS = 11,
REVIVE_FAILED = 12,
REPLICATE_DATA_FAILED = 13,
NUM_MAPPER_ZERO = 14,
MAP_ENDED = 15,
STAGE_ENDED = 16,

// push data fail causes
PUSH_DATA_FAIL_NON_CRITICAL_CAUSE = 17,
PUSH_DATA_WRITE_FAIL_REPLICA = 18,
PUSH_DATA_WRITE_FAIL_PRIMARY = 19,
PUSH_DATA_FAIL_PARTITION_NOT_FOUND = 20,

HARD_SPLIT = 21,
SOFT_SPLIT = 22,

STAGE_END_TIME_OUT = 23,
SHUFFLE_DATA_LOST = 24,
WORKER_SHUTDOWN = 25,
NO_AVAILABLE_WORKING_DIR = 26,
WORKER_EXCLUDED = 27,
WORKER_UNKNOWN = 28,

COMMIT_FILE_EXCEPTION = 29,

// Rate limit statuses
PUSH_DATA_SUCCESS_PRIMARY_CONGESTED = 30,
PUSH_DATA_SUCCESS_REPLICA_CONGESTED = 31,

PUSH_DATA_HANDSHAKE_FAIL_REPLICA = 32,
PUSH_DATA_HANDSHAKE_FAIL_PRIMARY = 33,
REGION_START_FAIL_REPLICA = 34,
REGION_START_FAIL_PRIMARY = 35,
REGION_FINISH_FAIL_REPLICA = 36,
REGION_FINISH_FAIL_PRIMARY = 37,

PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY = 38,
PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA = 39,
PUSH_DATA_CONNECTION_EXCEPTION_PRIMARY = 40,
PUSH_DATA_CONNECTION_EXCEPTION_REPLICA = 41,
PUSH_DATA_TIMEOUT_PRIMARY = 42,
PUSH_DATA_TIMEOUT_REPLICA = 43,
PUSH_DATA_PRIMARY_WORKER_EXCLUDED = 44,
PUSH_DATA_REPLICA_WORKER_EXCLUDED = 45,

FETCH_DATA_TIMEOUT = 46,
REVIVE_INITIALIZED = 47,
DESTROY_SLOTS_MOCK_FAILURE = 48,
COMMIT_FILES_MOCK_FAILURE = 49,
PUSH_DATA_FAIL_NON_CRITICAL_CAUSE_REPLICA = 50,
OPEN_STREAM_FAILED = 51,
SEGMENT_START_FAIL_REPLICA = 52,
SEGMENT_START_FAIL_PRIMARY = 53,
NO_SPLIT = 54,

TAIL
};

StatusCode toStatusCode(int32_t code);
} // namespace celeborn
34 changes: 34 additions & 0 deletions cpp/celeborn/protocol/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(celeborn_protocol_test PartitionLocationTest.cpp)

add_test(NAME celeborn_protocol_test COMMAND celeborn_protocol_test)

target_include_directories(protocol PUBLIC ${CMAKE_BINARY_DIR})

target_link_libraries(
celeborn_protocol_test
PRIVATE
memory
proto
protocol
utils
${FOLLY_WITH_DEPENDENCIES}
${GLOG}
${GFLAGS_LIBRARIES}
GTest::gtest
GTest::gmock
GTest::gtest_main)
Loading

0 comments on commit 4714e91

Please sign in to comment.