Skip to content

Commit

Permalink
Conform the max-txn-ops requirement of etcd. (#126)
Browse files Browse the repository at this point in the history
We don't need such flags for etcd server anymore, by splitting a large update set to many transactions. We compute etcd ops by DFS when persisting object, ensuring the correctness of segmented updates for etcd.
  • Loading branch information
sighingnow authored Jan 5, 2021
1 parent 4fdf5e9 commit 60ab766
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 6 deletions.
2 changes: 1 addition & 1 deletion python/vineyard/data/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_dataframe_set_index(vineyard_client):


def test_dataframe_with_sparse_array(vineyard_client):
df = pd.DataFrame(np.random.randn(100, 4))
df = pd.DataFrame(np.random.randn(100, 4), columns=['x', 'y', 'z', 'a'])
df.iloc[:98] = np.nan
sdf = df.astype(pd.SparseDtype("float", np.nan))
object_id = vineyard_client.put(sdf)
Expand Down
33 changes: 31 additions & 2 deletions src/server/services/etcd_meta_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,36 @@ void EtcdMetaService::requestLock(
void EtcdMetaService::commitUpdates(
const std::vector<op_t>& changes,
callback_t<unsigned> callback_after_updated) {
// Split to many small txns to conform the requirement of max-txn-ops
// limitation (128) from etcd.
//
// The first n segments will be performed synchronously while the last
// txn will still be executed in a asynchronous manner.
size_t offset = 0;
while (offset + 127 < changes.size()) {
etcdv3::Transaction tx;
for (size_t idx = offset; idx < offset + 127; ++idx) {
auto const& op = changes[idx];
if (op.op == op_t::kPut) {
tx.setup_put(prefix_ + op.kv.key, op.kv.value);
} else if (op.op == op_t::kDel) {
tx.setup_delete(prefix_ + op.kv.key);
}
}
auto resp = etcd_->txn(tx).get();
if (resp.is_ok()) {
offset += 127;
} else {
auto status = Status::EtcdError(resp.error_code(), resp.error_message());
boost::asio::post(
server_ptr_->GetIOContext(),
boost::bind(callback_after_updated, status, resp.index()));
return;
}
}
etcdv3::Transaction tx;
for (auto const& op : changes) {
for (size_t idx = offset; idx < changes.size(); ++idx) {
auto const& op = changes[idx];
if (op.op == op_t::kPut) {
tx.setup_put(prefix_ + op.kv.key, op.kv.value);
} else if (op.op == op_t::kDel) {
Expand All @@ -113,7 +141,8 @@ void EtcdMetaService::commitUpdates(
etcd_->txn(tx).then([this, callback_after_updated](
pplx::task<etcd::Response> const& resp_task) {
auto resp = resp_task.get();
VLOG(10) << "etcd txn use " << resp.duration().count() << " microseconds";
VLOG(10) << "etcd (last) txn use " << resp.duration().count()
<< " microseconds";
auto status = Status::EtcdError(resp.error_code(), resp.error_message());
boost::asio::post(
server_ptr_->GetIOContext(),
Expand Down
1 change: 0 additions & 1 deletion src/server/util/etcd_launcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ Status EtcdLauncher::LaunchEtcdServer(
//
// c.f.: https://github.com/etcd-io/etcd/pull/12448
args.emplace_back("--log-package-levels='etcdserver=ERROR'");
args.emplace_back("--max-txn-ops=102400");
args.emplace_back("--listen-client-urls");
args.emplace_back("http://0.0.0.0:" + std::to_string(endpoint_port_));
args.emplace_back("--advertise-client-urls");
Expand Down
74 changes: 74 additions & 0 deletions test/large_meta_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/** Copyright 2020-2021 Alibaba Group Holding Limited.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include <memory>
#include <string>
#include <thread>

#include "arrow/status.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
#include "glog/logging.h"

#include "basic/ds/array.h"
#include "basic/ds/hashmap.h"
#include "basic/ds/tuple.h"
#include "client/client.h"
#include "client/ds/object_meta.h"
#include "common/util/typename.h"

using namespace vineyard; // NOLINT(build/namespaces)

int main(int argc, char** argv) {
if (argc < 2) {
printf("usage ./large_meta_test <ipc_socket>");
return 1;
}
std::string ipc_socket = std::string(argv[1]);

Client client;
VINEYARD_CHECK_OK(client.Connect(ipc_socket));
LOG(INFO) << "Connected to IPCServer: " << ipc_socket;

const size_t element_size = 10240;

TupleBuilder tup_builder(client);
tup_builder.SetSize(element_size);
for (size_t idx = 0; idx < element_size; ++idx) {
std::vector<double> double_array = {1.0, static_cast<double>(element_size),
static_cast<double>(idx)};
auto builder = std::make_shared<ArrayBuilder<double>>(client, double_array);
tup_builder.SetValue(idx, builder);
}

auto tup = std::dynamic_pointer_cast<Tuple>(tup_builder.Seal(client));
VINEYARD_CHECK_OK(client.Persist(tup->id()));

for (size_t idx = 0; idx < element_size; ++idx) {
auto item = tup->At(idx);
CHECK_EQ(item->meta().GetTypeName(), type_name<Array<double>>());
auto arr = std::dynamic_pointer_cast<Array<double>>(item);
CHECK_EQ(arr->size(), 3);
CHECK_DOUBLE_EQ(arr->data()[0], 1.0);
CHECK_DOUBLE_EQ(arr->data()[1], static_cast<double>(element_size));
CHECK_DOUBLE_EQ(arr->data()[2], static_cast<double>(idx));
}

LOG(INFO) << "Passed large metadata tests...";

client.Disconnect();

return 0;
}
3 changes: 1 addition & 2 deletions test/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ def start_etcd():
client_port = find_port()
peer_port = find_port()
proc = start_program('etcd',
'--max-txn-ops=102400',
'--listen-peer-urls', 'http://0.0.0.0:%d' % peer_port,
'--listen-client-urls', 'http://0.0.0.0:%d' % client_port,
'--advertise-client-urls', 'http://127.0.0.1:%d' % client_port,
Expand Down Expand Up @@ -237,7 +236,7 @@ def run_single_vineyardd_tests(etcd_endpoints):
run_test('get_object_test')
run_test('hashmap_test')
run_test('id_test')
run_test('json_utils_test')
run_test('large_meta_test')
run_test('list_object_test')
run_test('name_test')
run_test('pair_test')
Expand Down

0 comments on commit 60ab766

Please sign in to comment.