Skip to content

Commit

Permalink
ENG-2964: Allow flushing all tablets of a given table
Browse files Browse the repository at this point in the history
Summary: Implementing ability to flush tablets mem-stores.

Test Plan: yb-admin-test

Reviewers: sergei, timur, mikhail, pritam.damania

Reviewed By: mikhail, pritam.damania

Subscribers: bogdan, ybase, bharat

Differential Revision: https://phabricator.dev.yugabyte.com/D4191
  • Loading branch information
OlegLoginov committed Mar 8, 2018
1 parent a4498b4 commit 4902d06
Show file tree
Hide file tree
Showing 32 changed files with 919 additions and 149 deletions.
2 changes: 2 additions & 0 deletions src/yb/common/entity_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ using TabletId = std::string;
using NamespaceIdTableNamePair = std::pair<NamespaceId, TableName>;
using SystemTableSet = std::set<NamespaceIdTableNamePair>;

using FlushRequestId = std::string;

} // namespace yb

#endif // YB_COMMON_ENTITY_IDS_H
32 changes: 17 additions & 15 deletions src/yb/master/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,25 @@ set(MASTER_PROTO_LIBS
${MASTER_PROTO_LIBS_EXTENSIONS})

set(MASTER_SRCS
async_flush_tablets_task.cc
async_rpc_tasks.cc
call_home.cc
catalog_manager.cc
cluster_balance.cc
flush_manager.cc
master.cc
master_options.cc
master_service_base.cc
master_service.cc
master_tablet_service.cc
master_tserver.cc
master-path-handlers.cc
mini_master.cc
sys_catalog.cc
system_tablet.cc
system_tables_handler.cc
ts_descriptor.cc
ts_manager.cc
yql_virtual_table.cc
yql_vtable_iterator.cc
util/yql_vtable_helpers.cc
Expand All @@ -76,21 +93,6 @@ set(MASTER_SRCS
yql_types_vtable.cc
yql_views_vtable.cc
yql_partitions_vtable.cc
async_rpc_tasks.cc
catalog_manager.cc
cluster_balance.cc
master.cc
master_options.cc
master_service_base.cc
master_service.cc
master-path-handlers.cc
mini_master.cc
sys_catalog.cc
ts_descriptor.cc
ts_manager.cc
master_tablet_service.cc
master_tserver.cc
call_home.cc
${MASTER_SRCS_EXTENSIONS})

add_library(master ${MASTER_SRCS})
Expand Down
109 changes: 109 additions & 0 deletions src/yb/master/async_flush_tablets_task.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
#include "yb/master/async_flush_tablets_task.h"

#include "yb/common/wire_protocol.h"

#include "yb/master/master.h"
#include "yb/master/ts_descriptor.h"
#include "yb/master/flush_manager.h"
#include "yb/master/catalog_manager.h"

#include "yb/rpc/messenger.h"

#include "yb/tserver/tserver_admin.proxy.h"

#include "yb/util/flag_tags.h"
#include "yb/util/format.h"
#include "yb/util/logging.h"

namespace yb {
namespace master {

using std::string;
using tserver::TabletServerErrorPB;

////////////////////////////////////////////////////////////
// AsyncFlushTablets
////////////////////////////////////////////////////////////
AsyncFlushTablets::AsyncFlushTablets(Master *master,
ThreadPool* callback_pool,
const TabletServerId& ts_uuid,
const scoped_refptr<TableInfo>& table,
const vector<TabletId>& tablet_ids,
const FlushRequestId& flush_id)
: RetrySpecificTSRpcTask(master, callback_pool, ts_uuid, table),
tablet_ids_(tablet_ids),
flush_id_(flush_id) {
}

string AsyncFlushTablets::description() const {
return Format("$0 Flush Tablets RPC", permanent_uuid());
}

TabletServerId AsyncFlushTablets::permanent_uuid() const {
return permanent_uuid_;
}

void AsyncFlushTablets::HandleResponse(int attempt) {
server::UpdateClock(resp_, master_->clock());

if (resp_.has_error()) {
Status status = StatusFromPB(resp_.error().status());

// Do not retry on a fatal error.
switch (resp_.error().code()) {
case TabletServerErrorPB::TABLET_NOT_FOUND:
LOG(WARNING) << "TS " << permanent_uuid() << ": flush tablets failed because tablet "
<< resp_.failed_tablet_id() << " was not found. "
<< "No further retry: " << status.ToString();
PerformStateTransition(kStateRunning, kStateComplete);
break;
default:
LOG(WARNING) << "TS " << permanent_uuid() << ": flush tablets failed: "
<< status.ToString();
}
} else {
PerformStateTransition(kStateRunning, kStateComplete);
VLOG(1) << "TS " << permanent_uuid() << ": flush tablets complete";
}

if (state() == kStateComplete) {
// TODO: this class should not know CatalogManager API,
// remove circular dependency between classes.
master_->flush_manager()->HandleFlushTabletsResponse(
flush_id_, permanent_uuid_,
resp_.has_error() ? StatusFromPB(resp_.error().status()) : Status::OK());
} else {
VLOG(1) << "FlushTablets task is not completed";
}
}

bool AsyncFlushTablets::SendRequest(int attempt) {
tserver::FlushTabletsRequestPB req;
req.set_dest_uuid(permanent_uuid_);
req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64());

for (const TabletId& id : tablet_ids_) {
req.add_tablet_ids(id);
}

ts_admin_proxy_->FlushTabletsAsync(req, &resp_, &rpc_, BindRpcCallback());
VLOG(1) << "Send flush tablets request to " << permanent_uuid_
<< " (attempt " << attempt << "):\n"
<< req.DebugString();
return true;
}

} // namespace master
} // namespace yb
53 changes: 53 additions & 0 deletions src/yb/master/async_flush_tablets_task.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
#ifndef YB_MASTER_ASYNC_FLUSH_TABLETS_TASK_H
#define YB_MASTER_ASYNC_FLUSH_TABLETS_TASK_H

#include "yb/master/async_rpc_tasks.h"

namespace yb {
namespace master {

// Send the "Flush Tablets" request to the specified Tablet Server.
// Keeps retrying until we get an "ok" response.
class AsyncFlushTablets : public RetrySpecificTSRpcTask {
public:
AsyncFlushTablets(Master* master,
ThreadPool* callback_pool,
const TabletServerId& ts_uuid,
const scoped_refptr<TableInfo>& table,
const std::vector<TabletId>& tablet_ids,
const FlushRequestId& flush_id);

Type type() const override { return ASYNC_FLUSH_TABLETS; }

std::string type_name() const override { return "Flush Tablets"; }

std::string description() const override;

private:
TabletId tablet_id() const override { return TabletId(); }
TabletServerId permanent_uuid() const;

void HandleResponse(int attempt) override;
bool SendRequest(int attempt) override;

const std::vector<TabletId> tablet_ids_;
const FlushRequestId flush_id_;
tserver::FlushTabletsResponsePB resp_;
};

} // namespace master
} // namespace yb

#endif // YB_MASTER_ASYNC_FLUSH_TABLETS_TASK_H
46 changes: 46 additions & 0 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1930,6 +1930,52 @@ Status CatalogManager::FindNamespace(const NamespaceIdentifierPB& ns_identifier,
return Status::OK();
}

Result<TabletInfos> CatalogManager::GetTabletsOrSetupError(
const TableIdentifierPB& table_identifier,
MasterErrorPB::Code* error,
scoped_refptr<TableInfo>* table,
scoped_refptr<NamespaceInfo>* ns) {
DCHECK_ONLY_NOTNULL(error);
// Lookup the table and verify it exists.
TRACE("Looking up table");
scoped_refptr<TableInfo> local_table_info;
scoped_refptr<TableInfo>& table_obj = table ? *table : local_table_info;
RETURN_NOT_OK(FindTable(table_identifier, &table_obj));
if (table_obj == nullptr) {
*error = MasterErrorPB::TABLE_NOT_FOUND;
return STATUS(NotFound, "Table does not exist", table_identifier.DebugString());
}

TRACE("Locking table");
auto l = table_obj->LockForRead();

if (table_obj->metadata().state().table_type() != TableType::YQL_TABLE_TYPE) {
*error = MasterErrorPB::INVALID_TABLE_TYPE;
return STATUS(InvalidArgument, "Invalid table type", table_identifier.DebugString());
}

if (table_obj->IsCreateInProgress()) {
*error = MasterErrorPB::TABLE_CREATION_IS_IN_PROGRESS;
return STATUS(IllegalState, "Table creation is in progress", table_obj->ToString());
}

if (ns) {
TRACE("Looking up namespace");
boost::shared_lock<LockType> l(lock_);

*ns = FindPtrOrNull(namespace_ids_map_, table_obj->namespace_id());
if (*ns == nullptr) {
*error = MasterErrorPB::NAMESPACE_NOT_FOUND;
return STATUS(
InvalidArgument, "Could not find namespace by namespace id", table_obj->namespace_id());
}
}

TabletInfos tablets;
table_obj->GetAllTablets(&tablets);
return std::move(tablets);
}

// Truncate a Table
Status CatalogManager::TruncateTable(const TruncateTableRequestPB* req,
TruncateTableResponsePB* resp,
Expand Down
17 changes: 12 additions & 5 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1037,10 +1037,22 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
CHECKED_STATUS FindNamespace(const NamespaceIdentifierPB& ns_identifier,
scoped_refptr<NamespaceInfo>* ns_info) const;

CHECKED_STATUS FindTable(const TableIdentifierPB& table_identifier,
scoped_refptr<TableInfo>* table_info);

Result<TabletInfos> GetTabletsOrSetupError(const TableIdentifierPB& table_identifier,
MasterErrorPB::Code* error,
scoped_refptr<TableInfo>* table = nullptr,
scoped_refptr<NamespaceInfo>* ns = nullptr);

void AssertLeaderLockAcquiredForReading() const {
leader_lock_.AssertAcquiredForReading();
}

std::string GenerateId() { return oid_generator_.Next(); }

ThreadPool* WorkerPool() { return worker_pool_.get(); }

protected:
friend class TableLoader;
friend class TabletLoader;
Expand Down Expand Up @@ -1165,9 +1177,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
CHECKED_STATUS BuildLocationsForTablet(const scoped_refptr<TabletInfo>& tablet,
TabletLocationsPB* locs_pb);

CHECKED_STATUS FindTable(const TableIdentifierPB& table_identifier,
scoped_refptr<TableInfo>* table_info);

// Handle one of the tablets in a tablet reported.
// Requires that the lock is already held.
CHECKED_STATUS HandleReportedTablet(TSDescriptor* ts_desc,
Expand Down Expand Up @@ -1325,8 +1334,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
TabletToTabletServerMap *remove_replica_tasks_map,
TabletToTabletServerMap *stepdown_leader_tasks);

std::string GenerateId() { return oid_generator_.Next(); }

// Abort creation of 'table': abort all mutation for TabletInfo and
// TableInfo objects (releasing all COW locks), abort all pending
// tasks associated with the table, and erase any state related to
Expand Down
Loading

0 comments on commit 4902d06

Please sign in to comment.