From 4902d06cd16550ff9f179ce7253174718e085377 Mon Sep 17 00:00:00 2001 From: Oleg Loginov Date: Tue, 20 Feb 2018 20:04:10 +0300 Subject: [PATCH] ENG-2964: Allow flushing all tablets of a given table Summary: Implementing ability to flush tablets mem-stores. Test Plan: yb-admin-test Reviewers: sergei, timur, mikhail, pritam.damania Reviewed By: mikhail, pritam.damania Subscribers: bogdan, ybase, bharat Differential Revision: https://phabricator.dev.yugabyte.com/D4191 --- src/yb/common/entity_ids.h | 2 + src/yb/master/CMakeLists.txt | 32 ++-- src/yb/master/async_flush_tablets_task.cc | 109 ++++++++++++ src/yb/master/async_flush_tablets_task.h | 53 ++++++ src/yb/master/catalog_manager.cc | 46 +++++ src/yb/master/catalog_manager.h | 17 +- src/yb/master/flush_manager.cc | 194 ++++++++++++++++++++++ src/yb/master/flush_manager.h | 85 ++++++++++ src/yb/master/master.cc | 2 + src/yb/master/master.h | 4 + src/yb/master/master.proto | 27 +++ src/yb/master/master_service.cc | 13 ++ src/yb/master/master_service.h | 8 + src/yb/master/master_service_base.cc | 4 + src/yb/master/master_service_base.h | 2 + src/yb/rocksdb/db.h | 6 + src/yb/rocksdb/db/compacted_db_impl.h | 10 +- src/yb/rocksdb/db/db_impl.cc | 65 ++++---- src/yb/rocksdb/db/db_impl.h | 2 + src/yb/rocksdb/db/db_test.cc | 8 +- src/yb/rocksdb/utilities/stackable_db.h | 5 + src/yb/server/monitored_task.h | 1 + src/yb/tablet/tablet-test.cc | 27 +++ src/yb/tablet/tablet.cc | 5 + src/yb/tablet/tablet.h | 2 + src/yb/tools/yb-admin_cli.cc | 17 ++ src/yb/tools/yb-admin_client.cc | 166 ++++++++++++------ src/yb/tools/yb-admin_client.h | 2 + src/yb/tserver/tablet_service.cc | 77 +++++++-- src/yb/tserver/tablet_service.h | 8 + src/yb/tserver/ts_tablet_manager.cc | 49 +++--- src/yb/tserver/tserver_admin.proto | 20 +++ 32 files changed, 919 insertions(+), 149 deletions(-) create mode 100644 src/yb/master/async_flush_tablets_task.cc create mode 100644 src/yb/master/async_flush_tablets_task.h create mode 100644 src/yb/master/flush_manager.cc create mode 100644 src/yb/master/flush_manager.h diff --git a/src/yb/common/entity_ids.h b/src/yb/common/entity_ids.h index 875ac8ca7ef2..823169971059 100644 --- a/src/yb/common/entity_ids.h +++ b/src/yb/common/entity_ids.h @@ -39,6 +39,8 @@ using TabletId = std::string; using NamespaceIdTableNamePair = std::pair; using SystemTableSet = std::set; +using FlushRequestId = std::string; + } // namespace yb #endif // YB_COMMON_ENTITY_IDS_H diff --git a/src/yb/master/CMakeLists.txt b/src/yb/master/CMakeLists.txt index b82a0c5d4a28..9936782f121d 100644 --- a/src/yb/master/CMakeLists.txt +++ b/src/yb/master/CMakeLists.txt @@ -55,8 +55,25 @@ set(MASTER_PROTO_LIBS ${MASTER_PROTO_LIBS_EXTENSIONS}) set(MASTER_SRCS + async_flush_tablets_task.cc + async_rpc_tasks.cc + call_home.cc + catalog_manager.cc + cluster_balance.cc + flush_manager.cc + master.cc + master_options.cc + master_service_base.cc + master_service.cc + master_tablet_service.cc + master_tserver.cc + master-path-handlers.cc + mini_master.cc + sys_catalog.cc system_tablet.cc system_tables_handler.cc + ts_descriptor.cc + ts_manager.cc yql_virtual_table.cc yql_vtable_iterator.cc util/yql_vtable_helpers.cc @@ -76,21 +93,6 @@ set(MASTER_SRCS yql_types_vtable.cc yql_views_vtable.cc yql_partitions_vtable.cc - async_rpc_tasks.cc - catalog_manager.cc - cluster_balance.cc - master.cc - master_options.cc - master_service_base.cc - master_service.cc - master-path-handlers.cc - mini_master.cc - sys_catalog.cc - ts_descriptor.cc - ts_manager.cc - master_tablet_service.cc - master_tserver.cc - call_home.cc ${MASTER_SRCS_EXTENSIONS}) add_library(master ${MASTER_SRCS}) diff --git a/src/yb/master/async_flush_tablets_task.cc b/src/yb/master/async_flush_tablets_task.cc new file mode 100644 index 000000000000..1d3be46e7ccb --- /dev/null +++ b/src/yb/master/async_flush_tablets_task.cc @@ -0,0 +1,109 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// +#include "yb/master/async_flush_tablets_task.h" + +#include "yb/common/wire_protocol.h" + +#include "yb/master/master.h" +#include "yb/master/ts_descriptor.h" +#include "yb/master/flush_manager.h" +#include "yb/master/catalog_manager.h" + +#include "yb/rpc/messenger.h" + +#include "yb/tserver/tserver_admin.proxy.h" + +#include "yb/util/flag_tags.h" +#include "yb/util/format.h" +#include "yb/util/logging.h" + +namespace yb { +namespace master { + +using std::string; +using tserver::TabletServerErrorPB; + +//////////////////////////////////////////////////////////// +// AsyncFlushTablets +//////////////////////////////////////////////////////////// +AsyncFlushTablets::AsyncFlushTablets(Master *master, + ThreadPool* callback_pool, + const TabletServerId& ts_uuid, + const scoped_refptr& table, + const vector& tablet_ids, + const FlushRequestId& flush_id) + : RetrySpecificTSRpcTask(master, callback_pool, ts_uuid, table), + tablet_ids_(tablet_ids), + flush_id_(flush_id) { +} + +string AsyncFlushTablets::description() const { + return Format("$0 Flush Tablets RPC", permanent_uuid()); +} + +TabletServerId AsyncFlushTablets::permanent_uuid() const { + return permanent_uuid_; +} + +void AsyncFlushTablets::HandleResponse(int attempt) { + server::UpdateClock(resp_, master_->clock()); + + if (resp_.has_error()) { + Status status = StatusFromPB(resp_.error().status()); + + // Do not retry on a fatal error. + switch (resp_.error().code()) { + case TabletServerErrorPB::TABLET_NOT_FOUND: + LOG(WARNING) << "TS " << permanent_uuid() << ": flush tablets failed because tablet " + << resp_.failed_tablet_id() << " was not found. " + << "No further retry: " << status.ToString(); + PerformStateTransition(kStateRunning, kStateComplete); + break; + default: + LOG(WARNING) << "TS " << permanent_uuid() << ": flush tablets failed: " + << status.ToString(); + } + } else { + PerformStateTransition(kStateRunning, kStateComplete); + VLOG(1) << "TS " << permanent_uuid() << ": flush tablets complete"; + } + + if (state() == kStateComplete) { + // TODO: this class should not know CatalogManager API, + // remove circular dependency between classes. + master_->flush_manager()->HandleFlushTabletsResponse( + flush_id_, permanent_uuid_, + resp_.has_error() ? StatusFromPB(resp_.error().status()) : Status::OK()); + } else { + VLOG(1) << "FlushTablets task is not completed"; + } +} + +bool AsyncFlushTablets::SendRequest(int attempt) { + tserver::FlushTabletsRequestPB req; + req.set_dest_uuid(permanent_uuid_); + req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64()); + + for (const TabletId& id : tablet_ids_) { + req.add_tablet_ids(id); + } + + ts_admin_proxy_->FlushTabletsAsync(req, &resp_, &rpc_, BindRpcCallback()); + VLOG(1) << "Send flush tablets request to " << permanent_uuid_ + << " (attempt " << attempt << "):\n" + << req.DebugString(); + return true; +} + +} // namespace master +} // namespace yb diff --git a/src/yb/master/async_flush_tablets_task.h b/src/yb/master/async_flush_tablets_task.h new file mode 100644 index 000000000000..2bf6988bd46f --- /dev/null +++ b/src/yb/master/async_flush_tablets_task.h @@ -0,0 +1,53 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// +#ifndef YB_MASTER_ASYNC_FLUSH_TABLETS_TASK_H +#define YB_MASTER_ASYNC_FLUSH_TABLETS_TASK_H + +#include "yb/master/async_rpc_tasks.h" + +namespace yb { +namespace master { + +// Send the "Flush Tablets" request to the specified Tablet Server. +// Keeps retrying until we get an "ok" response. +class AsyncFlushTablets : public RetrySpecificTSRpcTask { + public: + AsyncFlushTablets(Master* master, + ThreadPool* callback_pool, + const TabletServerId& ts_uuid, + const scoped_refptr& table, + const std::vector& tablet_ids, + const FlushRequestId& flush_id); + + Type type() const override { return ASYNC_FLUSH_TABLETS; } + + std::string type_name() const override { return "Flush Tablets"; } + + std::string description() const override; + + private: + TabletId tablet_id() const override { return TabletId(); } + TabletServerId permanent_uuid() const; + + void HandleResponse(int attempt) override; + bool SendRequest(int attempt) override; + + const std::vector tablet_ids_; + const FlushRequestId flush_id_; + tserver::FlushTabletsResponsePB resp_; +}; + +} // namespace master +} // namespace yb + +#endif // YB_MASTER_ASYNC_FLUSH_TABLETS_TASK_H diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 3903f5e91db9..31b41c5eec22 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -1930,6 +1930,52 @@ Status CatalogManager::FindNamespace(const NamespaceIdentifierPB& ns_identifier, return Status::OK(); } +Result CatalogManager::GetTabletsOrSetupError( + const TableIdentifierPB& table_identifier, + MasterErrorPB::Code* error, + scoped_refptr* table, + scoped_refptr* ns) { + DCHECK_ONLY_NOTNULL(error); + // Lookup the table and verify it exists. + TRACE("Looking up table"); + scoped_refptr local_table_info; + scoped_refptr& table_obj = table ? *table : local_table_info; + RETURN_NOT_OK(FindTable(table_identifier, &table_obj)); + if (table_obj == nullptr) { + *error = MasterErrorPB::TABLE_NOT_FOUND; + return STATUS(NotFound, "Table does not exist", table_identifier.DebugString()); + } + + TRACE("Locking table"); + auto l = table_obj->LockForRead(); + + if (table_obj->metadata().state().table_type() != TableType::YQL_TABLE_TYPE) { + *error = MasterErrorPB::INVALID_TABLE_TYPE; + return STATUS(InvalidArgument, "Invalid table type", table_identifier.DebugString()); + } + + if (table_obj->IsCreateInProgress()) { + *error = MasterErrorPB::TABLE_CREATION_IS_IN_PROGRESS; + return STATUS(IllegalState, "Table creation is in progress", table_obj->ToString()); + } + + if (ns) { + TRACE("Looking up namespace"); + boost::shared_lock l(lock_); + + *ns = FindPtrOrNull(namespace_ids_map_, table_obj->namespace_id()); + if (*ns == nullptr) { + *error = MasterErrorPB::NAMESPACE_NOT_FOUND; + return STATUS( + InvalidArgument, "Could not find namespace by namespace id", table_obj->namespace_id()); + } + } + + TabletInfos tablets; + table_obj->GetAllTablets(&tablets); + return std::move(tablets); +} + // Truncate a Table Status CatalogManager::TruncateTable(const TruncateTableRequestPB* req, TruncateTableResponsePB* resp, diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index 9a4907c17b43..133da299da07 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -1037,10 +1037,22 @@ class CatalogManager : public tserver::TabletPeerLookupIf { CHECKED_STATUS FindNamespace(const NamespaceIdentifierPB& ns_identifier, scoped_refptr* ns_info) const; + CHECKED_STATUS FindTable(const TableIdentifierPB& table_identifier, + scoped_refptr* table_info); + + Result GetTabletsOrSetupError(const TableIdentifierPB& table_identifier, + MasterErrorPB::Code* error, + scoped_refptr* table = nullptr, + scoped_refptr* ns = nullptr); + void AssertLeaderLockAcquiredForReading() const { leader_lock_.AssertAcquiredForReading(); } + std::string GenerateId() { return oid_generator_.Next(); } + + ThreadPool* WorkerPool() { return worker_pool_.get(); } + protected: friend class TableLoader; friend class TabletLoader; @@ -1165,9 +1177,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf { CHECKED_STATUS BuildLocationsForTablet(const scoped_refptr& tablet, TabletLocationsPB* locs_pb); - CHECKED_STATUS FindTable(const TableIdentifierPB& table_identifier, - scoped_refptr* table_info); - // Handle one of the tablets in a tablet reported. // Requires that the lock is already held. CHECKED_STATUS HandleReportedTablet(TSDescriptor* ts_desc, @@ -1325,8 +1334,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf { TabletToTabletServerMap *remove_replica_tasks_map, TabletToTabletServerMap *stepdown_leader_tasks); - std::string GenerateId() { return oid_generator_.Next(); } - // Abort creation of 'table': abort all mutation for TabletInfo and // TableInfo objects (releasing all COW locks), abort all pending // tasks associated with the table, and erase any state related to diff --git a/src/yb/master/flush_manager.cc b/src/yb/master/flush_manager.cc new file mode 100644 index 000000000000..7cf3750c251f --- /dev/null +++ b/src/yb/master/flush_manager.cc @@ -0,0 +1,194 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// +#include "yb/master/flush_manager.h" + +#include + +#include "yb/master/async_flush_tablets_task.h" +#include "yb/master/catalog_manager.h" +#include "yb/master/catalog_manager-internal.h" + +namespace yb { +namespace master { + +using std::map; +using std::vector; + +Status FlushManager::FlushTables(const FlushTablesRequestPB* req, + FlushTablesResponsePB* resp) { + LOG(INFO) << "Servicing FlushTables request: " << req->ShortDebugString(); + + // Check request. + if (req->tables_size() == 0) { + const Status s = STATUS(IllegalState, "Empty table list in flush tables request", + req->ShortDebugString()); + return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s); + } + + RETURN_NOT_OK(catalog_manager_->CheckOnline()); + + // Create a new flush request UUID. + const FlushRequestId flush_id = catalog_manager_->GenerateId(); + + // Per TS tablet lists for all provided tables. + map> ts_tablet_map; + scoped_refptr table; + + for (const TableIdentifierPB& table_id_pb : req->tables()) { + MasterErrorPB::Code error = MasterErrorPB::UNKNOWN_ERROR; + + const Result res_tablets = catalog_manager_->GetTabletsOrSetupError( + table_id_pb, &error, &table); + if (!res_tablets.ok()) { + return SetupError(resp->mutable_error(), error, res_tablets.status()); + } + + // Prepare per Tablet Server tablet lists. + for (const scoped_refptr tablet : *res_tablets) { + TRACE("Locking tablet"); + auto l = tablet->LockForRead(); + + TabletInfo::ReplicaMap locs; + tablet->GetReplicaLocations(&locs); + for (const TabletInfo::ReplicaMap::value_type& replica : locs) { + const TabletServerId ts_uuid = replica.second.ts_desc->permanent_uuid(); + ts_tablet_map[ts_uuid].push_back(tablet->id()); + } + } + } + + DCHECK_GT(ts_tablet_map.size(), 0); + + { + std::lock_guard l(lock_); + TRACE("Acquired flush manager lock"); + + // Init Tablet Server id lists in memory storage. + TSFlushingInfo& flush_info = flush_requests_[flush_id]; + flush_info.clear(); + + for (const auto& ts : ts_tablet_map) { + flush_info.ts_flushing_.insert(ts.first); + } + + DCHECK_EQ(flush_info.ts_flushing_.size(), ts_tablet_map.size()); + } + + // Clean-up complete flushing requests. + DeleteCompleteFlushRequests(); + + DCHECK_ONLY_NOTNULL(table.get()); + + // Send FlushTablets requests to all Tablet Servers (one TS - one request). + for (const auto& ts : ts_tablet_map) { + // Using last table async task queue. + SendFlushTabletsRequest(ts.first, table, ts.second, flush_id); + } + + resp->set_flush_request_id(flush_id); + + LOG(INFO) << "Successfully started flushing request " << flush_id; + + return Status::OK(); +} + +Status FlushManager::IsFlushTablesDone(const IsFlushTablesDoneRequestPB* req, + IsFlushTablesDoneResponsePB* resp) { + RETURN_NOT_OK(catalog_manager_->CheckOnline()); + + LOG(INFO) << "Servicing IsFlushTablesDone request: " << req->ShortDebugString(); + + std::lock_guard l(lock_); + TRACE("Acquired flush manager lock"); + + // Check flush request id. + const FlushRequestMap::const_iterator it = flush_requests_.find(req->flush_request_id()); + + if (it == flush_requests_.end()) { + const Status s = STATUS(NotFound, "The flush request was not found", req->flush_request_id()); + return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s); + } + + const TSFlushingInfo& flush_info = it->second; + resp->set_done(flush_info.ts_flushing_.empty()); + resp->set_success(flush_info.ts_failed_.empty()); + + VLOG(1) << "IsFlushTablesDone request: " << req->flush_request_id() + << " Done: " << resp->done() << " Success " << resp->success(); + return Status::OK(); +} + +void FlushManager::SendFlushTabletsRequest(const TabletServerId& ts_uuid, + const scoped_refptr& table, + const vector& tablet_ids, + const FlushRequestId& flush_id) { + auto call = std::make_shared( + master_, catalog_manager_->WorkerPool(), ts_uuid, table, tablet_ids, flush_id); + table->AddTask(call); + WARN_NOT_OK(call->Run(), "Failed to send flush tablets request"); +} + +void FlushManager::HandleFlushTabletsResponse(const FlushRequestId& flush_id, + const TabletServerId& ts_uuid, + const Status& status) { + LOG(INFO) << "Handling Flush Tablets Response from TS " << ts_uuid + << " Status:" << status << " Flush request id: " << flush_id; + + std::lock_guard l(lock_); + TRACE("Acquired flush manager lock"); + + // Check current flush request id. + const FlushRequestMap::iterator it = flush_requests_.find(flush_id); + + if (it == flush_requests_.end()) { + LOG(WARNING) << "Old flush request id is in the flush tablets response: " << flush_id; + return; + } + + TSFlushingInfo& flush_info = it->second; + + if (flush_info.ts_flushing_.erase(ts_uuid) > 0) { + (status.IsOk() ? flush_info.ts_succeed_ : flush_info.ts_failed_).insert(ts_uuid); + + // Finish this flush request operation. + if (flush_info.ts_flushing_.empty()) { + if (flush_info.ts_failed_.empty()) { + LOG(INFO) << "Successfully complete flush table request: " << flush_id; + } else { + LOG(WARNING) << "Failed flush table request: " << flush_id; + } + } + } + + VLOG(1) << "Flush table request: " << flush_id + << ". Flushing " << flush_info.ts_flushing_.size() + << "; Succeed " << flush_info.ts_succeed_.size() + << "; Failed " << flush_info.ts_failed_.size() << " TServers"; +} + +void FlushManager::DeleteCompleteFlushRequests() { + std::lock_guard l(lock_); + TRACE("Acquired flush manager lock"); + + // Clean-up complete flushing requests. + for (FlushRequestMap::iterator it = flush_requests_.begin(); it != flush_requests_.end();) { + if (it->second.ts_flushing_.empty()) { + it = flush_requests_.erase(it); + } else { + ++it; + } + } +} + +} // namespace master +} // namespace yb diff --git a/src/yb/master/flush_manager.h b/src/yb/master/flush_manager.h new file mode 100644 index 000000000000..811c0bbc5ab5 --- /dev/null +++ b/src/yb/master/flush_manager.h @@ -0,0 +1,85 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// +#ifndef YB_MASTER_FLUSH_MANAGER_H +#define YB_MASTER_FLUSH_MANAGER_H + +#include "yb/common/entity_ids.h" +#include "yb/master/master.pb.h" +#include "yb/util/locks.h" +#include "yb/util/status.h" +#include "yb/util/enums.h" + +namespace yb { +namespace master { + +class Master; +class CatalogManager; +class TableInfo; + +// Handle Flush-related operations. +class FlushManager { + public: + explicit FlushManager(Master* master, CatalogManager* catalog_manager) + : master_(DCHECK_NOTNULL(master)), + catalog_manager_(DCHECK_NOTNULL(catalog_manager)) {} + + // API to start a table flushing. + CHECKED_STATUS FlushTables(const FlushTablesRequestPB* req, + FlushTablesResponsePB* resp); + + CHECKED_STATUS IsFlushTablesDone(const IsFlushTablesDoneRequestPB* req, + IsFlushTablesDoneResponsePB* resp); + + void HandleFlushTabletsResponse(const FlushRequestId& flush_id, + const TabletServerId& ts_uuid, + const Status& status); + + private: + // Start the background task to send the FlushTablets RPC to the Tablet Server. + void SendFlushTabletsRequest(const TabletServerId& ts_uuid, + const scoped_refptr& table, + const std::vector& tablet_ids, + const FlushRequestId& flush_id); + + void DeleteCompleteFlushRequests(); + + Master* master_; + CatalogManager* catalog_manager_; + + // Lock protecting the various in memory storage structures. + typedef rw_spinlock LockType; + mutable LockType lock_; + + typedef std::unordered_set TSIdSet; + struct TSFlushingInfo { + void clear() { + ts_flushing_.clear(); + ts_succeed_.clear(); + ts_failed_.clear(); + } + + TSIdSet ts_flushing_; + TSIdSet ts_succeed_; + TSIdSet ts_failed_; + }; + + // Map of flushing requests: flush_request-id -> current per TS info. + typedef std::unordered_map FlushRequestMap; + FlushRequestMap flush_requests_; + + DISALLOW_COPY_AND_ASSIGN(FlushManager); +}; + +} // namespace master +} // namespace yb +#endif // YB_MASTER_FLUSH_MANAGER_H diff --git a/src/yb/master/master.cc b/src/yb/master/master.cc index 65f95a623aba..90827b77fffe 100644 --- a/src/yb/master/master.cc +++ b/src/yb/master/master.cc @@ -43,6 +43,7 @@ #include "yb/common/wire_protocol.h" #include "yb/gutil/strings/substitute.h" #include "yb/master/catalog_manager.h" +#include "yb/master/flush_manager.h" #include "yb/master/master_rpc.h" #include "yb/master/master_util.h" #include "yb/master/master.pb.h" @@ -123,6 +124,7 @@ Master::Master(const MasterOptions& opts) ts_manager_(new TSManager()), catalog_manager_(new YB_EDITION_NS_PREFIX CatalogManager(this)), path_handlers_(new MasterPathHandlers(this)), + flush_manager_(new FlushManager(this, catalog_manager())), opts_(opts), registration_initialized_(false), maintenance_manager_(new MaintenanceManager(MaintenanceManager::DEFAULT_OPTIONS)), diff --git a/src/yb/master/master.h b/src/yb/master/master.h index 70b387b5fac2..8e19184ddd01 100644 --- a/src/yb/master/master.h +++ b/src/yb/master/master.h @@ -66,6 +66,7 @@ class CatalogManager; class TSDescriptor; class TSManager; class MasterPathHandlers; +class FlushManager; class Master : public server::RpcAndWebServerBase { public: @@ -93,6 +94,8 @@ class Master : public server::RpcAndWebServerBase { YB_EDITION_NS_PREFIX CatalogManager* catalog_manager() const { return catalog_manager_.get(); } + FlushManager* flush_manager() const { return flush_manager_.get(); } + scoped_refptr metric_entity_cluster() { return metric_entity_cluster_; } void SetMasterAddresses(std::shared_ptr> master_addresses) { @@ -169,6 +172,7 @@ class Master : public server::RpcAndWebServerBase { gscoped_ptr ts_manager_; gscoped_ptr catalog_manager_; gscoped_ptr path_handlers_; + gscoped_ptr flush_manager_; // For initializing the catalog manager. gscoped_ptr init_pool_; diff --git a/src/yb/master/master.proto b/src/yb/master/master.proto index 4f50140155a1..4fdcc9fd1bb8 100644 --- a/src/yb/master/master.proto +++ b/src/yb/master/master.proto @@ -1098,6 +1098,30 @@ message UDTypeInfoPB { optional NamespaceIdentifierPB namespace = 5; } +message FlushTablesRequestPB { + repeated TableIdentifierPB tables = 1; +} + +message FlushTablesResponsePB { + optional MasterErrorPB error = 1; + + optional bytes flush_request_id = 2; +} + +message IsFlushTablesDoneRequestPB { + optional bytes flush_request_id = 1; +} + +message IsFlushTablesDoneResponsePB { + optional MasterErrorPB error = 1; + + // true if the flush tables operation is completed, false otherwise + optional bool done = 2; + + // true if the flush tables operation is successfull + optional bool success = 3; +} + service MasterService { // TS->Master RPCs rpc TSHeartbeat(TSHeartbeatRequestPB) returns (TSHeartbeatResponsePB); @@ -1164,4 +1188,7 @@ service MasterService { returns (GetLoadMovePercentResponsePB); rpc IsLoadBalanced(IsLoadBalancedRequestPB) returns (IsLoadBalancedResponsePB); + + rpc FlushTables(FlushTablesRequestPB) returns (FlushTablesResponsePB); + rpc IsFlushTablesDone(IsFlushTablesDoneRequestPB) returns (IsFlushTablesDoneResponsePB); } diff --git a/src/yb/master/master_service.cc b/src/yb/master/master_service.cc index 7aa05ddd3145..dac1295edad5 100644 --- a/src/yb/master/master_service.cc +++ b/src/yb/master/master_service.cc @@ -40,6 +40,7 @@ #include "yb/common/wire_protocol.h" #include "yb/master/catalog_manager-internal.h" +#include "yb/master/flush_manager.h" #include "yb/master/master_service_base-internal.h" #include "yb/master/master.h" #include "yb/master/ts_descriptor.h" @@ -535,5 +536,17 @@ void MasterServiceImpl::IsLoadBalanced( HandleIn(req, resp, &rpc, &CatalogManager::IsLoadBalanced); } +void MasterServiceImpl::FlushTables(const FlushTablesRequestPB* req, + FlushTablesResponsePB* resp, + RpcContext rpc) { + HandleIn(req, resp, &rpc, &FlushManager::FlushTables); +} + +void MasterServiceImpl::IsFlushTablesDone(const IsFlushTablesDoneRequestPB* req, + IsFlushTablesDoneResponsePB* resp, + RpcContext rpc) { + HandleIn(req, resp, &rpc, &FlushManager::IsFlushTablesDone); +} + } // namespace master } // namespace yb diff --git a/src/yb/master/master_service.h b/src/yb/master/master_service.h index bd7188773cc1..d1fcd71255ac 100644 --- a/src/yb/master/master_service.h +++ b/src/yb/master/master_service.h @@ -177,6 +177,14 @@ class MasterServiceImpl : public MasterServiceIf, const IsLoadBalancedRequestPB* req, IsLoadBalancedResponsePB* resp, rpc::RpcContext rpc) override; + virtual void FlushTables( + const FlushTablesRequestPB* req, FlushTablesResponsePB* resp, + rpc::RpcContext rpc) override; + + virtual void IsFlushTablesDone( + const IsFlushTablesDoneRequestPB* req, IsFlushTablesDoneResponsePB* resp, + rpc::RpcContext rpc) override; + private: }; diff --git a/src/yb/master/master_service_base.cc b/src/yb/master/master_service_base.cc index 424fb83f67c2..e9603579c7a8 100644 --- a/src/yb/master/master_service_base.cc +++ b/src/yb/master/master_service_base.cc @@ -23,5 +23,9 @@ YB_EDITION_NS_PREFIX CatalogManager* MasterServiceBase::handler(CatalogManager*) return server_->catalog_manager(); } +FlushManager* MasterServiceBase::handler(FlushManager*) { + return server_->flush_manager(); +} + } // namespace master } // namespace yb diff --git a/src/yb/master/master_service_base.h b/src/yb/master/master_service_base.h index 22379db964c6..29e4f7e1f437 100644 --- a/src/yb/master/master_service_base.h +++ b/src/yb/master/master_service_base.h @@ -27,6 +27,7 @@ namespace master { class Master; class CatalogManager; +class FlushManager; // Base class for any master service with a few helpers. class MasterServiceBase { @@ -56,6 +57,7 @@ class MasterServiceBase { Status (HandlerType::*f)(const ReqType*, RespType*, rpc::RpcContext*)); YB_EDITION_NS_PREFIX CatalogManager* handler(CatalogManager*); + FlushManager* handler(FlushManager*); Master* server_; diff --git a/src/yb/rocksdb/db.h b/src/yb/rocksdb/db.h index 7ef30b650c75..c0166ca682f9 100644 --- a/src/yb/rocksdb/db.h +++ b/src/yb/rocksdb/db.h @@ -710,6 +710,12 @@ class DB { return Flush(options, DefaultColumnFamily()); } + // Wait for end of mem-table data flushing. + virtual Status WaitForFlush(ColumnFamilyHandle* column_family) = 0; + virtual Status WaitForFlush() { + return WaitForFlush(DefaultColumnFamily()); + } + // Sync the wal. Note that Write() followed by SyncWAL() is not exactly the // same as Write() with sync=true: in the latter case the changes won't be // visible until the sync is done. diff --git a/src/yb/rocksdb/db/compacted_db_impl.h b/src/yb/rocksdb/db/compacted_db_impl.h index 6cd6daa7a070..b020bcce1ebd 100644 --- a/src/yb/rocksdb/db/compacted_db_impl.h +++ b/src/yb/rocksdb/db/compacted_db_impl.h @@ -17,8 +17,8 @@ // or implied. See the License for the specific language governing permissions and limitations // under the License. // -#ifndef ROCKSDB_DB_COMPACTED_DB_IMPL_H -#define ROCKSDB_DB_COMPACTED_DB_IMPL_H +#ifndef YB_ROCKSDB_DB_COMPACTED_DB_IMPL_H +#define YB_ROCKSDB_DB_COMPACTED_DB_IMPL_H #pragma once #ifndef ROCKSDB_LITE @@ -94,6 +94,10 @@ class CompactedDBImpl : public DBImpl { ColumnFamilyHandle* column_family) override { return STATUS(NotSupported, "Not supported in compacted db mode."); } + using DBImpl::WaitForFlush; + virtual Status WaitForFlush(ColumnFamilyHandle* column_family) override { + return STATUS(NotSupported, "Not supported in compacted db mode."); + } private: friend class DB; @@ -112,4 +116,4 @@ class CompactedDBImpl : public DBImpl { } // namespace rocksdb #endif // ROCKSDB_LITE -#endif // ROCKSDB_DB_COMPACTED_DB_IMPL_H +#endif // YB_ROCKSDB_DB_COMPACTED_DB_IMPL_H diff --git a/src/yb/rocksdb/db/db_impl.cc b/src/yb/rocksdb/db/db_impl.cc index b5a48cb01dd8..7b5875e1753f 100644 --- a/src/yb/rocksdb/db/db_impl.cc +++ b/src/yb/rocksdb/db/db_impl.cc @@ -1705,7 +1705,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, return STATUS(InvalidArgument, "Invalid target path ID"); } - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); auto cfd = cfh->cfd(); bool exclusive = options.exclusive_manual_compaction; @@ -1830,7 +1830,7 @@ Status DBImpl::CompactFiles( return STATUS(InvalidArgument, "ColumnFamilyHandle must be non-null."); } - auto cfd = reinterpret_cast(column_family)->cfd(); + auto cfd = down_cast(column_family)->cfd(); assert(cfd); Status s; @@ -2114,7 +2114,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, #ifdef ROCKSDB_LITE return STATUS(NotSupported, "Not supported in ROCKSDB LITE"); #else - auto* cfd = reinterpret_cast(column_family)->cfd(); + auto* cfd = down_cast(column_family)->cfd(); if (options_map.empty()) { RLOG(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "SetOptions() on column family [%s], empty input", @@ -2273,7 +2273,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { } int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) { - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); return cfh->cfd()->NumberLevels(); } @@ -2282,7 +2282,7 @@ int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) { } int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); InstrumentedMutexLock l(&mutex_); return cfh->cfd()->GetSuperVersion()-> mutable_cf_options.level0_stop_writes_trigger; @@ -2290,10 +2290,16 @@ int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { Status DBImpl::Flush(const FlushOptions& flush_options, ColumnFamilyHandle* column_family) { - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); return FlushMemTable(cfh->cfd(), flush_options); } +Status DBImpl::WaitForFlush(ColumnFamilyHandle* column_family) { + auto cfh = down_cast(column_family); + // Wait until the flush completes. + return WaitForFlushMemTable(cfh->cfd()); +} + Status DBImpl::SyncWAL() { autovector logs_to_sync; bool need_log_dir_sync; @@ -2520,7 +2526,7 @@ InternalIterator* DBImpl::NewInternalIterator( if (column_family == nullptr) { cfd = default_cf_handle_->cfd(); } else { - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); cfd = cfh->cfd(); } @@ -2588,8 +2594,7 @@ Status DBImpl::EnableAutoCompaction( Status status = this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}}); if (status.ok()) { - ColumnFamilyData* cfd = - reinterpret_cast(cf_ptr)->cfd(); + ColumnFamilyData* cfd = down_cast(cf_ptr)->cfd(); InstrumentedMutexLock guard_lock(&mutex_); InstallSuperVersionAndScheduleWork(cfd, nullptr, *cfd->GetLatestMutableCFOptions()); } else { @@ -3494,7 +3499,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, StopWatch sw(env_, stats_, DB_GET); PERF_TIMER_GUARD(get_snapshot_time); - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); auto cfd = cfh->cfd(); SequenceNumber snapshot; @@ -3562,7 +3567,7 @@ std::vector DBImpl::MultiGet( std::unordered_map multiget_cf_data; // fill up and allocate outside of mutex for (auto cf : column_family) { - auto cfh = reinterpret_cast(cf); + auto cfh = down_cast(cf); auto cfd = cfh->cfd(); if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) { auto mgcfd = new MultiGetColumnFamilyData(); @@ -3607,7 +3612,7 @@ std::vector DBImpl::MultiGet( std::string* value = &(*values)[i]; LookupKey lkey(keys[i], snapshot); - auto cfh = reinterpret_cast(column_family[i]); + auto cfh = down_cast(column_family[i]); auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); assert(mgd_iter != multiget_cf_data.end()); auto mgd = mgd_iter->second; @@ -3671,7 +3676,7 @@ std::vector DBImpl::MultiGet( Status DBImpl::AddFile(ColumnFamilyHandle* column_family, const std::string& file_path, bool move_file) { Status status; - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); ColumnFamilyData* cfd = cfh->cfd(); ExternalSstFileInfo file_info; @@ -3799,7 +3804,7 @@ void DeleteFile(Env* env, const std::string& path, const shared_ptr& inf Status DBImpl::AddFile(ColumnFamilyHandle* column_family, const ExternalSstFileInfo* file_info, bool move_file) { Status status; - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); ColumnFamilyData* cfd = cfh->cfd(); if (file_info->num_entries == 0) { @@ -4007,8 +4012,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, // this is outside the mutex if (s.ok()) { - NewThreadStatusCfInfo( - reinterpret_cast(*handle)->cfd()); + NewThreadStatusCfInfo(down_cast(*handle)->cfd()); if (!persist_options_status.ok()) { if (db_options_.fail_if_options_file_error) { s = STATUS(IOError, @@ -4025,7 +4029,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, } Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); auto cfd = cfh->cfd(); if (cfd->GetID() == 0) { return STATUS(InvalidArgument, "Can't drop default column family"); @@ -4127,7 +4131,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, return NewErrorIterator(STATUS(NotSupported, "ReadTier::kPersistedData is not yet supported in iterators.")); } - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); auto cfd = cfh->cfd(); XFUNC_TEST("", "managed_new", managed_new1, xf_manage_new, @@ -4253,7 +4257,7 @@ Status DBImpl::NewIterators( "Managed interator not supported without snapshots"); } for (auto cfh : column_families) { - auto cfd = reinterpret_cast(cfh)->cfd(); + auto cfd = down_cast(cfh)->cfd(); auto iter = new ManagedIterator(this, read_options, cfd); iterators->push_back(iter); } @@ -4264,7 +4268,7 @@ Status DBImpl::NewIterators( "Tailing interator not supported in RocksDB lite"); #else for (auto cfh : column_families) { - auto cfd = reinterpret_cast(cfh)->cfd(); + auto cfd = down_cast(cfh)->cfd(); SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); auto iter = new ForwardIterator(this, read_options, cfd, sv); iterators->push_back(NewDBIterator( @@ -4278,8 +4282,7 @@ Status DBImpl::NewIterators( SequenceNumber latest_snapshot = versions_->LastSequence(); for (size_t i = 0; i < column_families.size(); ++i) { - auto* cfd = reinterpret_cast( - column_families[i])->cfd(); + auto* cfd = down_cast(column_families[i])->cfd(); SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); auto snapshot = @@ -4342,7 +4345,7 @@ Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family, Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); if (!cfh->cfd()->ioptions()->merge_operator) { return STATUS(NotSupported, "Provide a merge_operator when opening DB"); } else { @@ -4950,7 +4953,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { #ifndef ROCKSDB_LITE Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, TablePropertiesCollection* props) { - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); auto cfd = cfh->cfd(); // Increment the ref count @@ -4972,7 +4975,7 @@ Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family, const Range* range, std::size_t n, TablePropertiesCollection* props) { - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); auto cfd = cfh->cfd(); // Increment the ref count @@ -5002,7 +5005,7 @@ Env* DBImpl::GetEnv() const { } const Options& DBImpl::GetOptions(ColumnFamilyHandle* column_family) const { - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); return *cfh->cfd()->options(); } @@ -5012,7 +5015,7 @@ bool DBImpl::GetProperty(ColumnFamilyHandle* column_family, const Slice& property, std::string* value) { const DBPropertyInfo* property_info = GetPropertyInfo(property); value->clear(); - auto cfd = reinterpret_cast(column_family)->cfd(); + auto cfd = down_cast(column_family)->cfd(); if (property_info == nullptr) { return false; } else if (property_info->handle_int) { @@ -5040,7 +5043,7 @@ bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family, if (property_info == nullptr || property_info->handle_int == nullptr) { return false; } - auto cfd = reinterpret_cast(column_family)->cfd(); + auto cfd = down_cast(column_family)->cfd(); return GetIntPropertyInternal(cfd, *property_info, false, value); } @@ -5226,7 +5229,7 @@ void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family, const Range* range, int n, uint64_t* sizes, bool include_memtable) { Version* v; - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); auto cfd = cfh->cfd(); SuperVersion* sv = GetAndRefSuperVersion(cfd); v = sv->current; @@ -5371,7 +5374,7 @@ Status DBImpl::DeleteFile(std::string name) { Status DBImpl::DeleteFilesInRange(ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end) { Status status; - auto cfh = reinterpret_cast(column_family); + auto cfh = down_cast(column_family); ColumnFamilyData* cfd = cfh->cfd(); VersionEdit edit; std::vector deleted_files; @@ -5494,7 +5497,7 @@ void DBImpl::GetColumnFamilyMetaData( ColumnFamilyHandle* column_family, ColumnFamilyMetaData* cf_meta) { assert(column_family); - auto* cfd = reinterpret_cast(column_family)->cfd(); + auto* cfd = down_cast(column_family)->cfd(); auto* sv = GetAndRefSuperVersion(cfd); sv->current->GetColumnFamilyMetaData(cf_meta); ReturnAndCleanupSuperVersion(cfd, sv); diff --git a/src/yb/rocksdb/db/db_impl.h b/src/yb/rocksdb/db/db_impl.h index 62683e85becf..c790f3def50c 100644 --- a/src/yb/rocksdb/db/db_impl.h +++ b/src/yb/rocksdb/db/db_impl.h @@ -187,6 +187,8 @@ class DBImpl : public DB { using DB::Flush; virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family) override; + using DB::WaitForFlush; + virtual Status WaitForFlush(ColumnFamilyHandle* column_family) override; virtual Status SyncWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; diff --git a/src/yb/rocksdb/db/db_test.cc b/src/yb/rocksdb/db/db_test.cc index fb4e027909a3..7d591a19239b 100644 --- a/src/yb/rocksdb/db/db_test.cc +++ b/src/yb/rocksdb/db/db_test.cc @@ -4771,8 +4771,12 @@ class ModelDB: public DB { using DB::Flush; virtual Status Flush(const rocksdb::FlushOptions& options, ColumnFamilyHandle* column_family) override { - Status ret; - return ret; + return Status::OK(); + } + + using DB::WaitForFlush; + virtual Status WaitForFlush(ColumnFamilyHandle* column_family) override { + return Status::OK(); } Status SyncWAL() override { diff --git a/src/yb/rocksdb/utilities/stackable_db.h b/src/yb/rocksdb/utilities/stackable_db.h index 98ea78c894ad..aae463e188b3 100644 --- a/src/yb/rocksdb/utilities/stackable_db.h +++ b/src/yb/rocksdb/utilities/stackable_db.h @@ -247,6 +247,11 @@ class StackableDB : public DB { return db_->Flush(fopts, column_family); } + using DB::WaitForFlush; + virtual Status WaitForFlush(ColumnFamilyHandle* column_family) override { + return db_->WaitForFlush(column_family); + } + virtual Status SyncWAL() override { return db_->SyncWAL(); } diff --git a/src/yb/server/monitored_task.h b/src/yb/server/monitored_task.h index 468ece543515..df1b81077a4c 100644 --- a/src/yb/server/monitored_task.h +++ b/src/yb/server/monitored_task.h @@ -89,6 +89,7 @@ class MonitoredTask : public std::enable_shared_from_this { ASYNC_TRY_STEP_DOWN, ASYNC_SNAPSHOT_OP, ASYNC_COPARTITION_TABLE, + ASYNC_FLUSH_TABLETS, }; virtual Type type() const = 0; diff --git a/src/yb/tablet/tablet-test.cc b/src/yb/tablet/tablet-test.cc index d2d6d06ceb3b..533e75b5c4bc 100644 --- a/src/yb/tablet/tablet-test.cc +++ b/src/yb/tablet/tablet-test.cc @@ -186,5 +186,32 @@ TYPED_TEST(TestTablet, TestMetricsInit) { ASSERT_OK(registry->WriteAsJson(&new_writer, { "*" }, MetricJsonOptions())); } +TYPED_TEST(TestTablet, TestFlushedOpId) { + LocalTabletWriter writer(this->tablet().get()); + const int64_t N = 1000; + + // Insert & flush one row to start index counting. + ASSERT_OK(this->InsertTestRow(&writer, 0, 333)); + ASSERT_OK(this->tablet()->Flush(FlushMode::kSync)); + OpId id = ASSERT_RESULT(this->tablet()->MaxPersistentOpId()); + const int64_t start_index = id.index; + + this->InsertTestRows(1, N, 555); + id = ASSERT_RESULT(this->tablet()->MaxPersistentOpId()); + ASSERT_EQ(id.index, start_index); + + ASSERT_OK(this->tablet()->Flush(FlushMode::kSync)); + id = ASSERT_RESULT(this->tablet()->MaxPersistentOpId()); + ASSERT_EQ(id.index, start_index + N); + + this->InsertTestRows(1, N, 777); + id = ASSERT_RESULT(this->tablet()->MaxPersistentOpId()); + ASSERT_EQ(id.index, start_index + N); + + ASSERT_OK(this->tablet()->Flush(FlushMode::kSync)); + id = ASSERT_RESULT(this->tablet()->MaxPersistentOpId()); + ASSERT_EQ(id.index, start_index + 2*N); +} + } // namespace tablet } // namespace yb diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index 1f788079b066..73c15090a75a 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -815,6 +815,11 @@ Status Tablet::Flush(FlushMode mode) { return Status::OK(); } +Status Tablet::WaitForFlush() { + TRACE_EVENT0("tablet", "Tablet::WaitForFlush"); + return rocksdb_->WaitForFlush(); +} + Status Tablet::ImportData(const std::string& source_dir) { return rocksdb_->Import(source_dir); } diff --git a/src/yb/tablet/tablet.h b/src/yb/tablet/tablet.h index 04a9024058ec..3aeba87123bf 100644 --- a/src/yb/tablet/tablet.h +++ b/src/yb/tablet/tablet.h @@ -285,6 +285,8 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier { // Makes RocksDB Flush. CHECKED_STATUS Flush(FlushMode mode); + CHECKED_STATUS WaitForFlush(); + // Prepares the transaction context for the alter schema operation. // An error will be returned if the specified schema is invalid (e.g. // key mismatch, or missing IDs) diff --git a/src/yb/tools/yb-admin_cli.cc b/src/yb/tools/yb-admin_cli.cc index 99ff64e0fa64..0c5d90782928 100644 --- a/src/yb/tools/yb-admin_cli.cc +++ b/src/yb/tools/yb-admin_cli.cc @@ -192,6 +192,23 @@ void ClusterAdminCli::RegisterCommandHandlers(ClusterAdminClientClass* client) { return Status::OK(); }); + Register( + "flush_table", " [timeout_in_seconds] (default 20)", + [client](const CLIArguments& args) -> Status { + if (args.size() != 4 && args.size() != 5) { + UsageAndExit(args[0]); + } + const YBTableName table_name(args[2], args[3]); + int timeout_secs = 20; + if (args.size() > 4) { + timeout_secs = std::stoi(args[4].c_str()); + } + + RETURN_NOT_OK_PREPEND(client->FlushTable(table_name, timeout_secs), + Substitute("Unable to flush table $0", table_name.ToString())); + return Status::OK(); + }); + Register( "list_all_tablet_servers", "", [client](const CLIArguments&) -> Status { diff --git a/src/yb/tools/yb-admin_client.cc b/src/yb/tools/yb-admin_client.cc index 1999086b1357..5baaaddf72a2 100644 --- a/src/yb/tools/yb-admin_client.cc +++ b/src/yb/tools/yb-admin_client.cc @@ -52,6 +52,11 @@ PB_ENUM_FORMATTERS(yb::tablet::TabletStatePB); namespace yb { namespace tools { +using namespace std::literals; + +using std::cout; +using std::endl; + using google::protobuf::RepeatedPtrField; using client::YBClientBuilder; @@ -68,13 +73,17 @@ using consensus::RaftPeerPB; using consensus::RunLeaderElectionRequestPB; using consensus::RunLeaderElectionResponsePB; -using master::MasterServiceProxy; +using master::FlushTablesRequestPB; +using master::FlushTablesResponsePB; +using master::IsFlushTablesDoneRequestPB; +using master::IsFlushTablesDoneResponsePB; using master::ListMastersRequestPB; using master::ListMastersResponsePB; using master::ListMasterRaftPeersRequestPB; using master::ListMasterRaftPeersResponsePB; using master::ListTabletServersRequestPB; using master::ListTabletServersResponsePB; +using master::MasterServiceProxy; using master::TabletLocationsPB; using master::TSInfoPB; @@ -344,7 +353,7 @@ Status ClusterAdminClient::GetLoadMoveCompletion() { if (resp.has_error()) { return StatusFromPB(resp.error().status()); } - std::cout << "Percent complete = " << resp.percent() << std::endl; + cout << "Percent complete = " << resp.percent() << endl; return Status::OK(); } @@ -390,9 +399,9 @@ Status ClusterAdminClient::ListLeaderCounts(const YBTableName& table_name) { // Standard deviation: 1.24722 // Adjusted deviation %: 10.9717% vector leader_dist, best_case, worst_case; - std::cout << RightPadToUuidWidth("Server UUID") << kColumnSep << "Leader Count" << std::endl; + cout << RightPadToUuidWidth("Server UUID") << kColumnSep << "Leader Count" << endl; for (const auto& leader_count : leader_counts) { - std::cout << leader_count.first << kColumnSep << leader_count.second << std::endl; + cout << leader_count.first << kColumnSep << leader_count.second << endl; leader_dist.push_back(leader_count.second); } @@ -410,8 +419,8 @@ Status ClusterAdminClient::ListLeaderCounts(const YBTableName& table_name) { double best_stdev = yb::standard_deviation(best_case); double worst_stdev = yb::standard_deviation(worst_case); double percent_dev = (stdev - best_stdev) / (worst_stdev - best_stdev) * 100.0; - std::cout << "Standard deviation: " << stdev << std::endl; - std::cout << "Adjusted deviation percentage: " << percent_dev << "%" << std::endl; + cout << "Standard deviation: " << stdev << endl; + cout << "Adjusted deviation percentage: " << percent_dev << "%" << endl; } return Status::OK(); @@ -623,13 +632,13 @@ Status ClusterAdminClient::ListAllTabletServers() { RETURN_NOT_OK(ListTabletServers(&servers)); if (!servers.empty()) { - std::cout << RightPadToUuidWidth("Tablet Server UUID") << kColumnSep - << kRpcHostPortHeading << std::endl; + cout << RightPadToUuidWidth("Tablet Server UUID") << kColumnSep + << kRpcHostPortHeading << endl; } for (const ListTabletServersResponsePB::Entry& server : servers) { - std::cout << server.instance_id().permanent_uuid() << kColumnSep - << FormatFirstHostPort(server.registration().common().rpc_addresses()) - << std::endl; + cout << server.instance_id().permanent_uuid() << kColumnSep + << FormatFirstHostPort(server.registration().common().rpc_addresses()) + << endl; } return Status::OK(); @@ -645,16 +654,16 @@ Status ClusterAdminClient::ListAllMasters() { return StatusFromPB(lresp.error().status()); } if (!lresp.masters().empty()) { - std::cout << RightPadToUuidWidth("Master UUID") << kColumnSep - << RightPadToWidth(kRpcHostPortHeading, kHostPortColWidth)<< kColumnSep - << "State" << kColumnSep - << "Role" << std::endl; + cout << RightPadToUuidWidth("Master UUID") << kColumnSep + << RightPadToWidth(kRpcHostPortHeading, kHostPortColWidth)<< kColumnSep + << "State" << kColumnSep + << "Role" << endl; } for (int i = 0; i < lresp.masters_size(); i++) { if (lresp.masters(i).role() != consensus::RaftPeerPB::UNKNOWN_ROLE) { auto master_reg = lresp.masters(i).has_registration() ? &lresp.masters(i).registration() : nullptr; - std::cout + cout << (master_reg ? lresp.masters(i).instance_id().permanent_uuid() : RightPadToUuidWidth("UNKNOWN_UUID")) << kColumnSep << RightPadToWidth( @@ -662,9 +671,9 @@ Status ClusterAdminClient::ListAllMasters() { kHostPortColWidth) << kColumnSep << (lresp.masters(i).has_error() ? PBEnumToString(lresp.masters(i).error().code()) : "ALIVE") << kColumnSep - << PBEnumToString(lresp.masters(i).role()) << std::endl; + << PBEnumToString(lresp.masters(i).role()) << endl; } else { - std::cout << "UNREACHABLE MASTER at index " << i << "." << std::endl; + cout << "UNREACHABLE MASTER at index " << i << "." << endl; } } @@ -678,16 +687,16 @@ Status ClusterAdminClient::ListAllMasters() { } if (r_resp.masters_size() != lresp.masters_size()) { - std::cout << "WARNING: Mismatch in in-memory masters and raft peers info." - << "Raft peer info from master leader dumped below.\n"; + cout << "WARNING: Mismatch in in-memory masters and raft peers info." + << "Raft peer info from master leader dumped below." << endl; for (int i = 0; i < r_resp.masters_size(); i++) { if (r_resp.masters(i).member_type() != consensus::RaftPeerPB::UNKNOWN_MEMBER_TYPE) { const auto& master = r_resp.masters(i); - std::cout << master.permanent_uuid() << " " - << master.last_known_addr().host() << "/" - << master.last_known_addr().port() << std::endl; + cout << master.permanent_uuid() << " " + << master.last_known_addr().host() << "/" + << master.last_known_addr().port() << endl; } else { - std::cout << "UNREACHABLE MASTER at index " << i << "." << std::endl; + cout << "UNREACHABLE MASTER at index " << i << "." << endl; } } } @@ -700,10 +709,10 @@ Status ClusterAdminClient::ListTabletServersLogLocations() { RETURN_NOT_OK(ListTabletServers(&servers)); if (!servers.empty()) { - std::cout << RightPadToUuidWidth("TS UUID") << kColumnSep - << kRpcHostPortHeading << kColumnSep - << "LogLocation" - << std::endl; + cout << RightPadToUuidWidth("TS UUID") << kColumnSep + << kRpcHostPortHeading << kColumnSep + << "LogLocation" + << endl; } for (const ListTabletServersResponsePB::Entry& server : servers) { @@ -721,9 +730,9 @@ Status ClusterAdminClient::ListTabletServersLogLocations() { tserver::GetLogLocationResponsePB resp; ts_proxy.get()->GetLogLocation(req, &resp, &rpc); - std::cout << ts_uuid << kColumnSep - << ts_addr << kColumnSep - << resp.log_location() << std::endl; + cout << ts_uuid << kColumnSep + << ts_addr << kColumnSep + << resp.log_location() << endl; } return Status::OK(); @@ -733,7 +742,7 @@ Status ClusterAdminClient::ListTables() { vector tables; RETURN_NOT_OK(yb_client_->ListTables(&tables)); for (const YBTableName& table : tables) { - std::cout << table.ToString() << std::endl; + cout << table.ToString() << endl; } return Status::OK(); } @@ -743,9 +752,9 @@ Status ClusterAdminClient::ListTablets(const YBTableName& table_name, const int std::vector locations; RETURN_NOT_OK(yb_client_->GetTablets( table_name, max_tablets, &tablet_uuids, &ranges, &locations)); - std::cout << RightPadToUuidWidth("Tablet UUID") << kColumnSep - << RightPadToWidth("Range", kPartitionRangeColWidth) << kColumnSep - << "Leader" << std::endl; + cout << RightPadToUuidWidth("Tablet UUID") << kColumnSep + << RightPadToWidth("Range", kPartitionRangeColWidth) << kColumnSep + << "Leader" << endl; for (int i = 0; i < tablet_uuids.size(); i++) { string tablet_uuid = tablet_uuids[i]; string leader_host_port; @@ -760,9 +769,9 @@ Status ClusterAdminClient::ListTablets(const YBTableName& table_name, const int } } } - std::cout << tablet_uuid << kColumnSep - << RightPadToWidth(ranges[i], kPartitionRangeColWidth) << kColumnSep - << leader_host_port << std::endl; + cout << tablet_uuid << kColumnSep + << RightPadToWidth(ranges[i], kPartitionRangeColWidth) << kColumnSep + << leader_host_port << endl; } return Status::OK(); } @@ -781,14 +790,14 @@ Status ClusterAdminClient::ListPerTabletTabletServers(const TabletId& tablet_id) if (resp.tablet_locations_size() != 1) { if (resp.tablet_locations_size() > 0) { std::cerr << "List of all incorrect locations - " << resp.tablet_locations_size() - << " : " << std::endl; + << " : " << endl; for (int i = 0; i < resp.tablet_locations_size(); i++) { std::cerr << i << " : " << resp.tablet_locations(i).DebugString(); if (i >= MAX_NUM_ELEMENTS_TO_SHOW_ON_ERROR) { break; } } - std::cerr << std::endl; + std::cerr << endl; } return STATUS_FORMAT(IllegalState, "Incorrect number of locations $0 for tablet $1.", @@ -797,15 +806,15 @@ Status ClusterAdminClient::ListPerTabletTabletServers(const TabletId& tablet_id) TabletLocationsPB locs = resp.tablet_locations(0); if (!locs.replicas().empty()) { - std::cout << RightPadToUuidWidth("Server UUID") << kColumnSep - << RightPadToWidth(kRpcHostPortHeading, kHostPortColWidth) << kColumnSep - << "Role" << std::endl; + cout << RightPadToUuidWidth("Server UUID") << kColumnSep + << RightPadToWidth(kRpcHostPortHeading, kHostPortColWidth) << kColumnSep + << "Role" << endl; } for (const auto& replica : locs.replicas()) { - std::cout << replica.ts_info().permanent_uuid() << kColumnSep - << RightPadToWidth(FormatHostPort(replica.ts_info().rpc_addresses(0)), + cout << replica.ts_info().permanent_uuid() << kColumnSep + << RightPadToWidth(FormatHostPort(replica.ts_info().rpc_addresses(0)), kHostPortColWidth) << kColumnSep - << PBEnumToString(replica.role()) << std::endl; + << PBEnumToString(replica.role()) << endl; } return Status::OK(); @@ -813,7 +822,7 @@ Status ClusterAdminClient::ListPerTabletTabletServers(const TabletId& tablet_id) Status ClusterAdminClient::DeleteTable(const YBTableName& table_name) { RETURN_NOT_OK(yb_client_->DeleteTable(table_name)); - std::cout << "Deleted table " << table_name.ToString() << std::endl; + cout << "Deleted table " << table_name.ToString() << endl; return Status::OK(); } @@ -856,15 +865,15 @@ Status ClusterAdminClient::ListTabletsForTabletServer(const PeerId& ts_uuid) { tserver::ListTabletsForTabletServerResponsePB resp; RETURN_NOT_OK(ts_proxy.get()->ListTabletsForTabletServer(req, &resp, &rpc)); - std::cout << RightPadToWidth("Table name", kTableNameColWidth) << kColumnSep - << RightPadToUuidWidth("Tablet ID") << kColumnSep - << "Is Leader" << kColumnSep - << "State" << std::endl; + cout << RightPadToWidth("Table name", kTableNameColWidth) << kColumnSep + << RightPadToUuidWidth("Tablet ID") << kColumnSep + << "Is Leader" << kColumnSep + << "State" << endl; for (const auto& entry : resp.entries()) { - std::cout << RightPadToWidth(entry.table_name(), kTableNameColWidth) << kColumnSep - << RightPadToUuidWidth(entry.tablet_id()) << kColumnSep - << entry.is_leader() << kColumnSep - << PBEnumToString(entry.state()) << std::endl; + cout << RightPadToWidth(entry.table_name(), kTableNameColWidth) << kColumnSep + << RightPadToUuidWidth(entry.tablet_id()) << kColumnSep + << entry.is_leader() << kColumnSep + << PBEnumToString(entry.state()) << endl; } return Status::OK(); } @@ -908,6 +917,53 @@ Status ClusterAdminClient::SetLoadBalancerEnabled(const bool is_enabled) { return Status::OK(); } +Status ClusterAdminClient::FlushTable(const YBTableName& table_name, int timeout_secs) { + RpcController rpc; + rpc.set_timeout(timeout_); + FlushTablesRequestPB req; + FlushTablesResponsePB resp; + table_name.SetIntoTableIdentifierPB(req.add_tables()); + RETURN_NOT_OK(master_proxy_->FlushTables(req, &resp, &rpc)); + + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + + cout << "Started flushing table " << table_name.ToString() << endl + << "Flush request id: " << resp.flush_request_id() << endl; + + IsFlushTablesDoneRequestPB wait_req; + IsFlushTablesDoneResponsePB wait_resp; + + // Wait for table creation. + wait_req.set_flush_request_id(resp.flush_request_id()); + + for (int k = 0; k < timeout_secs; ++k) { + rpc.Reset(); + RETURN_NOT_OK(master_proxy_->IsFlushTablesDone(wait_req, &wait_resp, &rpc)); + + if (wait_resp.has_error()) { + if (wait_resp.error().status().code() == AppStatusPB::NOT_FOUND) { + cout << "Flush request was deleted: " << resp.flush_request_id() << endl; + } + + return StatusFromPB(wait_resp.error().status()); + } + + if (wait_resp.done()) { + cout << "Flushing complete: " << (wait_resp.success() ? "SUCCESS" : "FAILED") << endl; + return Status::OK(); + } + + cout << "Waiting for flushing... " << (wait_resp.success() ? "" : "Already FAILED") << endl; + std::this_thread::sleep_for(1s); + } + + return STATUS(TimedOut, + Substitute("Expired timeout ($0 seconds) for table $1 flushing", + timeout_secs, table_name.ToString())); +} + string RightPadToUuidWidth(const string &s) { return RightPadToWidth(s, kNumCharactersInUuid); } diff --git a/src/yb/tools/yb-admin_client.h b/src/yb/tools/yb-admin_client.h index b0a236147acb..0c8560392487 100644 --- a/src/yb/tools/yb-admin_client.h +++ b/src/yb/tools/yb-admin_client.h @@ -132,6 +132,8 @@ class ClusterAdminClient { CHECKED_STATUS DropRedisTable(); + CHECKED_STATUS FlushTable(const client::YBTableName& table_name, int timeout_secs); + protected: // Fetch the locations of the replicas for a given tablet from the Master. CHECKED_STATUS GetTabletLocations(const TabletId& tablet_id, diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index e84c8b5b5983..d9cedeb78a26 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -160,6 +160,7 @@ using strings::Substitute; using tablet::AlterSchemaOperationState; using tablet::Tablet; using tablet::TabletPeer; +using tablet::TabletPeerPtr; using tablet::TabletStatusPB; using tablet::TruncateOperationState; using tablet::OperationCompletionCallback; @@ -168,7 +169,7 @@ using tablet::WriteOperationState; namespace { template -bool GetConsensusOrRespond(const scoped_refptr& tablet_peer, +bool GetConsensusOrRespond(const TabletPeerPtr& tablet_peer, RespClass* resp, rpc::RpcContext* context, scoped_refptr* consensus) { @@ -182,7 +183,7 @@ bool GetConsensusOrRespond(const scoped_refptr& tablet_peer, return true; } -Status GetTabletRef(const scoped_refptr& tablet_peer, +Status GetTabletRef(const TabletPeerPtr& tablet_peer, shared_ptr* tablet, TabletServerErrorPB::Code* error_code) { *DCHECK_NOTNULL(tablet) = tablet_peer->shared_tablet(); @@ -370,7 +371,7 @@ void TabletServiceAdminImpl::AlterSchema(const AlterSchemaRequestPB* req, server::UpdateClock(*req, server_->Clock()); - scoped_refptr tablet_peer; + TabletPeerPtr tablet_peer; if (!LookupTabletPeerOrRespond(server_->tablet_manager(), req->tablet_id(), resp, &context, &tablet_peer)) { return; @@ -528,7 +529,7 @@ void TabletServiceImpl::Truncate(const TruncateRequestPB* req, UpdateClock(*req, server_->Clock()); - scoped_refptr tablet_peer; + TabletPeerPtr tablet_peer; if (!LookupTabletPeerOrRespond(server_->tablet_manager(), req->tablet_id(), resp, &context, @@ -642,6 +643,54 @@ void TabletServiceAdminImpl::CopartitionTable(const CopartitionTableRequestPB* r LOG(INFO) << "tserver doesn't support co-partitioning yet"; } +void TabletServiceAdminImpl::FlushTablets(const FlushTabletsRequestPB* req, + FlushTabletsResponsePB* resp, + rpc::RpcContext context) { + if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "FlushTablets", req, resp, &context)) { + return; + } + + if (req->tablet_ids_size() == 0) { + const Status s = STATUS(InvalidArgument, "No tablet ids"); + SetupErrorAndRespond( + resp->mutable_error(), s, TabletServerErrorPB_Code_UNKNOWN_ERROR, &context); + return; + } + + server::UpdateClock(*req, server_->Clock()); + + TRACE_EVENT1("tserver", "FlushTablets", + "TS: ", req->dest_uuid()); + + LOG(INFO) << "Processing FlushTablets from " << context.requestor_string(); + VLOG(1) << "Full request: " << req->DebugString(); + TabletPeers tablet_peers; + + for (const TabletId& id : req->tablet_ids()) { + resp->set_failed_tablet_id(id); + + TabletPeerPtr tablet_peer; + if (!LookupTabletPeerOrRespond(server_->tablet_manager(), id, resp, &context, &tablet_peer)) { + return; + } + + RETURN_UNKNOWN_ERROR_IF_NOT_OK( + tablet_peer->tablet()->Flush(tablet::FlushMode::kAsync), resp, &context); + + tablet_peers.push_back(tablet_peer); + resp->clear_failed_tablet_id(); + } + + // Wait for end of all flush operations. + for (const TabletPeerPtr& tablet_peer : tablet_peers) { + resp->set_failed_tablet_id(tablet_peer->tablet()->tablet_id()); + RETURN_UNKNOWN_ERROR_IF_NOT_OK(tablet_peer->tablet()->WaitForFlush(), resp, &context); + resp->clear_failed_tablet_id(); + } + + context.RespondSuccess(); +} + void TabletServiceImpl::Write(const WriteRequestPB* req, WriteResponsePB* resp, rpc::RpcContext context) { @@ -759,7 +808,7 @@ bool TabletServiceImpl::GetTabletOrRespond(const ReadRequestPB* req, template bool TabletServiceImpl::DoGetTabletOrRespond(const Req* req, Resp* resp, rpc::RpcContext* context, std::shared_ptr* tablet) { - scoped_refptr tablet_peer; + TabletPeerPtr tablet_peer; if (!LookupTabletPeerOrRespond(server_->tablet_manager(), req->tablet_id(), resp, context, &tablet_peer)) { return false; @@ -1000,7 +1049,7 @@ void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req, if (!CheckUuidMatchOrRespond(tablet_manager_, "UpdateConsensus", req, resp, &context)) { return; } - scoped_refptr tablet_peer; + TabletPeerPtr tablet_peer; if (!LookupTabletPeerOrRespond(tablet_manager_, req->tablet_id(), resp, &context, &tablet_peer)) { return; } @@ -1034,7 +1083,7 @@ void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req, if (!CheckUuidMatchOrRespond(tablet_manager_, "RequestConsensusVote", req, resp, &context)) { return; } - scoped_refptr tablet_peer; + TabletPeerPtr tablet_peer; if (!LookupTabletPeerOrRespond(tablet_manager_, req->tablet_id(), resp, &context, &tablet_peer)) { return; } @@ -1059,7 +1108,7 @@ void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req, !CheckUuidMatchOrRespond(tablet_manager_, "ChangeConfig", req, resp, &context)) { return; } - scoped_refptr tablet_peer; + TabletPeerPtr tablet_peer; if (!LookupTabletPeerOrRespond(tablet_manager_, req->tablet_id(), resp, &context, &tablet_peer)) { return; @@ -1100,7 +1149,7 @@ class RpcScope { if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) { return; } - scoped_refptr tablet_peer; + TabletPeerPtr tablet_peer; if (!LookupTabletPeerOrRespond(tablet_manager, req->tablet_id(), resp, context, &tablet_peer)) { return; } @@ -1198,7 +1247,7 @@ void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *re if (!CheckUuidMatchOrRespond(tablet_manager_, "GetLastOpId", req, resp, &context)) { return; } - scoped_refptr tablet_peer; + TabletPeerPtr tablet_peer; if (!LookupTabletPeerOrRespond(tablet_manager_, req->tablet_id(), resp, &context, &tablet_peer)) { return; } @@ -1263,10 +1312,10 @@ void TabletServiceImpl::NoOp(const NoOpRequestPB *req, void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req, ListTabletsResponsePB* resp, rpc::RpcContext context) { - std::vector > peers; + TabletPeers peers; server_->tablet_manager()->GetTabletPeers(&peers); RepeatedPtrField* peer_status = resp->mutable_status_and_schema(); - for (const scoped_refptr& peer : peers) { + for (const TabletPeerPtr& peer : peers) { StatusAndSchemaPB* status = peer_status->Add(); peer->GetTabletStatusPB(status->mutable_tablet_status()); CHECK_OK(SchemaToPB(peer->status_listener()->schema(), @@ -1288,9 +1337,9 @@ void TabletServiceImpl::ListTabletsForTabletServer(const ListTabletsForTabletSer ListTabletsForTabletServerResponsePB* resp, rpc::RpcContext context) { // Replicating logic from path-handlers. - std::vector > peers; + TabletPeers peers; server_->tablet_manager()->GetTabletPeers(&peers); - for (const scoped_refptr& peer : peers) { + for (const TabletPeerPtr& peer : peers) { TabletStatusPB status; peer->GetTabletStatusPB(&status); diff --git a/src/yb/tserver/tablet_service.h b/src/yb/tserver/tablet_service.h index eab27365d5f5..891517124578 100644 --- a/src/yb/tserver/tablet_service.h +++ b/src/yb/tserver/tablet_service.h @@ -59,6 +59,8 @@ class TabletServer; class TabletServiceImpl : public TabletServerServiceIf { public: + typedef std::vector TabletPeers; + explicit TabletServiceImpl(TabletServerIf* server); void Write(const WriteRequestPB* req, WriteResponsePB* resp, rpc::RpcContext context) override; @@ -153,6 +155,8 @@ class TabletServiceImpl : public TabletServerServiceIf { class TabletServiceAdminImpl : public TabletServerAdminServiceIf { public: + typedef std::vector TabletPeers; + explicit TabletServiceAdminImpl(TabletServer* server); virtual void CreateTablet(const CreateTabletRequestPB* req, CreateTabletResponsePB* resp, @@ -170,6 +174,10 @@ class TabletServiceAdminImpl : public TabletServerAdminServiceIf { CopartitionTableResponsePB* resp, rpc::RpcContext context) override; + virtual void FlushTablets(const FlushTabletsRequestPB* req, + FlushTabletsResponsePB* resp, + rpc::RpcContext context) override; + private: TabletServer* server_; }; diff --git a/src/yb/tserver/ts_tablet_manager.cc b/src/yb/tserver/ts_tablet_manager.cc index 9cada472f673..d11d6731e4df 100644 --- a/src/yb/tserver/ts_tablet_manager.cc +++ b/src/yb/tserver/ts_tablet_manager.cc @@ -245,6 +245,7 @@ using tablet::TabletMetadata; using tablet::TabletClass; using tablet::TabletPeer; using tablet::TabletPeerClass; +using tablet::TabletPeerPtr; using tablet::TabletStatusListener; using tablet::TabletStatusPB; @@ -253,7 +254,7 @@ void TSTabletManager::MaybeFlushTablet() { int iteration = 0; while (memory_monitor()->Exceeded() || (iteration++ == 0 && FLAGS_pretend_memory_exceeded_enforce_flush)) { - scoped_refptr tablet_to_flush = TabletToFlush(); + TabletPeerPtr tablet_to_flush = TabletToFlush(); // TODO(bojanserafimov): If tablet_to_flush flushes now because of other reasons, // we will schedule a second flush, which will unnecessarily stall writes for a short time. This // will not happen often, but should be fixed. @@ -266,10 +267,10 @@ void TSTabletManager::MaybeFlushTablet() { // Return the tablet with the oldest write in memstore, or nullptr if all // tablet memstores are empty or about to flush. -scoped_refptr TSTabletManager::TabletToFlush() { +TabletPeerPtr TSTabletManager::TabletToFlush() { boost::shared_lock lock(lock_); // For using the tablet map HybridTime oldest_write_in_memstores = HybridTime::kMax; - scoped_refptr tablet_to_flush; + TabletPeerPtr tablet_to_flush; for (const TabletMap::value_type& entry : tablet_map_) { const auto tablet = entry.second->shared_tablet(); if (tablet) { @@ -435,7 +436,7 @@ Status TSTabletManager::Init() { CHECK_OK(StartTabletStateTransitionUnlocked(meta->tablet_id(), "opening tablet", &deleter)); } - scoped_refptr tablet_peer = CreateAndRegisterTabletPeer(meta, NEW_PEER); + TabletPeerPtr tablet_peer = CreateAndRegisterTabletPeer(meta, NEW_PEER); RETURN_NOT_OK(open_tablet_pool_->SubmitFunc( std::bind(&TSTabletManager::OpenTablet, this, meta, deleter))); } @@ -480,7 +481,7 @@ Status TSTabletManager::CreateNewTablet( const Schema &schema, const PartitionSchema &partition_schema, RaftConfigPB config, - scoped_refptr *tablet_peer) { + TabletPeerPtr *tablet_peer) { CHECK_EQ(state(), MANAGER_RUNNING); CHECK(IsRaftConfigMember(server_->instance_pb().permanent_uuid(), config)); @@ -500,7 +501,7 @@ Status TSTabletManager::CreateNewTablet( TRACE("Acquired tablet manager lock"); // Sanity check that the tablet isn't already registered. - scoped_refptr junk; + TabletPeerPtr junk; if (LookupTabletUnlocked(tablet_id, &junk)) { return STATUS(AlreadyPresent, "Tablet already registered", tablet_id); } @@ -540,7 +541,7 @@ Status TSTabletManager::CreateNewTablet( RETURN_NOT_OK_PREPEND(ConsensusMetadata::Create(fs_manager_, tablet_id, fs_manager_->uuid(), config, consensus::kMinimumTerm, &cmeta), "Unable to create new ConsensusMeta for tablet " + tablet_id); - scoped_refptr new_peer = CreateAndRegisterTabletPeer(meta, NEW_PEER); + TabletPeerPtr new_peer = CreateAndRegisterTabletPeer(meta, NEW_PEER); // We can run this synchronously since there is nothing to bootstrap. RETURN_NOT_OK( @@ -574,7 +575,7 @@ Status CheckLeaderTermNotLower( Status HandleReplacingStaleTablet( scoped_refptr meta, - scoped_refptr old_tablet_peer, + TabletPeerPtr old_tablet_peer, const string& tablet_id, const string& uuid, const int64_t& leader_term) { @@ -628,7 +629,7 @@ Status TSTabletManager::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB const string kLogPrefix = LogPrefix(tablet_id, fs_manager_->uuid()); - scoped_refptr old_tablet_peer; + TabletPeerPtr old_tablet_peer; scoped_refptr meta; bool replacing_tablet = false; scoped_refptr deleter; @@ -675,7 +676,7 @@ Status TSTabletManager::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB // Registering a non-initialized TabletPeer offers visibility through the Web UI. RegisterTabletPeerMode mode = replacing_tablet ? REPLACEMENT_PEER : NEW_PEER; - scoped_refptr tablet_peer = CreateAndRegisterTabletPeer(meta, mode); + TabletPeerPtr tablet_peer = CreateAndRegisterTabletPeer(meta, mode); // Download all of the remote files. TOMBSTONE_NOT_OK(rb_client->FetchAll(tablet_peer->status_listener()), @@ -726,9 +727,9 @@ Status TSTabletManager::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB } // Create and register a new TabletPeer, given tablet metadata. -scoped_refptr TSTabletManager::CreateAndRegisterTabletPeer( +TabletPeerPtr TSTabletManager::CreateAndRegisterTabletPeer( const scoped_refptr& meta, RegisterTabletPeerMode mode) { - scoped_refptr tablet_peer( + TabletPeerPtr tablet_peer( new TabletPeerClass(meta, local_peer_pb_, apply_pool_.get(), @@ -754,7 +755,7 @@ Status TSTabletManager::DeleteTablet( TRACE("Deleting tablet $0", tablet_id); - scoped_refptr tablet_peer; + TabletPeerPtr tablet_peer; scoped_refptr deleter; { // Acquire the lock in exclusive mode as we'll add a entry to the @@ -887,7 +888,7 @@ void TSTabletManager::OpenTablet(const scoped_refptr& meta, TRACE_EVENT1("tserver", "TSTabletManager::OpenTablet", "tablet_id", tablet_id); - scoped_refptr tablet_peer; + TabletPeerPtr tablet_peer; CHECK(LookupTablet(tablet_id, &tablet_peer)) << "Tablet not registered prior to OpenTabletAsync call: " << tablet_id; @@ -1001,10 +1002,10 @@ void TSTabletManager::Shutdown() { // Take a snapshot of the peers list -- that way we don't have to hold // on to the lock while shutting them down, which might cause a lock // inversion. (see KUDU-308 for example). - vector > peers_to_shutdown; + TabletPeers peers_to_shutdown; GetTabletPeers(&peers_to_shutdown); - for (const scoped_refptr& peer : peers_to_shutdown) { + for (const TabletPeerPtr& peer : peers_to_shutdown) { peer->Shutdown(); } @@ -1036,7 +1037,7 @@ void TSTabletManager::Shutdown() { } void TSTabletManager::RegisterTablet(const std::string& tablet_id, - const scoped_refptr& tablet_peer, + const TabletPeerPtr& tablet_peer, RegisterTabletPeerMode mode) { std::lock_guard lock(lock_); // If we are replacing a tablet peer, we delete the existing one first. @@ -1052,14 +1053,14 @@ void TSTabletManager::RegisterTablet(const std::string& tablet_id, } bool TSTabletManager::LookupTablet(const string& tablet_id, - scoped_refptr* tablet_peer) const { + TabletPeerPtr* tablet_peer) const { boost::shared_lock shared_lock(lock_); return LookupTabletUnlocked(tablet_id, tablet_peer); } bool TSTabletManager::LookupTabletUnlocked(const string& tablet_id, - scoped_refptr* tablet_peer) const { - const scoped_refptr* found = FindOrNull(tablet_map_, tablet_id); + TabletPeerPtr* tablet_peer) const { + const TabletPeerPtr* found = FindOrNull(tablet_map_, tablet_id); if (!found) { return false; } @@ -1068,7 +1069,7 @@ bool TSTabletManager::LookupTabletUnlocked(const string& tablet_id, } Status TSTabletManager::GetTabletPeer(const string& tablet_id, - scoped_refptr* tablet_peer) const { + TabletPeerPtr* tablet_peer) const { if (!LookupTablet(tablet_id, tablet_peer)) { return STATUS(NotFound, "Tablet not found", tablet_id); } @@ -1085,7 +1086,7 @@ const NodeInstancePB& TSTabletManager::NodeInstance() const { return server_->instance_pb(); } -void TSTabletManager::GetTabletPeers(vector >* tablet_peers) const { +void TSTabletManager::GetTabletPeers(TabletPeers* tablet_peers) const { boost::shared_lock shared_lock(lock_); AppendValuesFromMap(tablet_map_, tablet_peers); } @@ -1156,7 +1157,7 @@ void TSTabletManager::InitLocalRaftPeerPB() { } void TSTabletManager::CreateReportedTabletPB(const string& tablet_id, - const scoped_refptr& tablet_peer, + const TabletPeerPtr& tablet_peer, ReportedTabletPB* reported_tablet) { reported_tablet->set_tablet_id(tablet_id); reported_tablet->set_state(tablet_peer->state()); @@ -1182,7 +1183,7 @@ void TSTabletManager::GenerateIncrementalTabletReport(TabletReportPB* report) { report->set_is_incremental(true); for (const DirtyMap::value_type& dirty_entry : dirty_tablets_) { const string& tablet_id = dirty_entry.first; - scoped_refptr* tablet_peer = FindOrNull(tablet_map_, tablet_id); + TabletPeerPtr* tablet_peer = FindOrNull(tablet_map_, tablet_id); if (tablet_peer) { // Dirty entry, report on it. CreateReportedTabletPB(tablet_id, *tablet_peer, report->add_updated_tablets()); diff --git a/src/yb/tserver/tserver_admin.proto b/src/yb/tserver/tserver_admin.proto index 4aef429214ff..965a2b19b2ca 100644 --- a/src/yb/tserver/tserver_admin.proto +++ b/src/yb/tserver/tserver_admin.proto @@ -154,6 +154,24 @@ enum TSTabletManagerStatePB { MANAGER_SHUTDOWN = 3; } +message FlushTabletsRequestPB { + // UUID of server this request is addressed to. + optional bytes dest_uuid = 1; + + // Tablets to flush. + repeated bytes tablet_ids = 2; + + optional fixed64 propagated_hybrid_time = 3; +} + +message FlushTabletsResponsePB { + optional TabletServerErrorPB error = 1; + + optional bytes failed_tablet_id = 2; + + optional fixed64 propagated_hybrid_time = 3; +} + service TabletServerAdminService { // Create a new, empty tablet with the specified parameters. Only used for // brand-new tablets, not for "moves". @@ -167,4 +185,6 @@ service TabletServerAdminService { // Create a co-partitioned table in an existing tablet rpc CopartitionTable(CopartitionTableRequestPB) returns (CopartitionTableResponsePB); + + rpc FlushTablets(FlushTabletsRequestPB) returns (FlushTabletsResponsePB); }