Skip to content

Commit

Permalink
#2394: Fixed invalid read-time for RF==1 case.
Browse files Browse the repository at this point in the history
Summary:
Fixed: #2394: [YCQL] TTL enforcement delayed when non-default (weaker) read-consistency is used

The issue happens in the situation (as in the Golang test at https://gist.githubusercontent.com/kmuthukk/f182837117f7af553a0d500c54e89ee3/raw):
 - CREATE TABLE with DEFAULT_TTL=5
 - INSERT a few records
 - Sleep( 6 or more seconds )
 - SELECT records - the returned result set must be empty
In fact - the set is NOT empty, but it happens with ConsistencyLevel=ONE only. With ConsistencyLevel=QUORUM the test passed, because in this case the request must be processed by the LEADER and the LEADER always knows correct safe-time. In case of ConsistencyLevel=ONE the system can route the read request to any FOLLOWER. In case of RF=1 we work with the LEADER peer as with FOLLOWER - and this is the processed incorrectly corner case.

Finally, the TTL does not work correctly because read-time is not valid (it's in past).
The invalid read-time is returned by the MvccManager::SafeTimeForFollower() method. It returns invalid value in case of RF=1 (in case RF=3 the value is correct, because FOLLOWERS are correctly updated by the LEADER and the internal MVCC value propagated_safe_time_ is updated).
In the case RF=1 the LEADER does not update itself and the MVCC variable propagated_safe_time_ is still in past.

Solution: in case of RF=1 the MvccManager::SafeTimeForFollower() must use MvccManager::SafeTime(), which correctly calculates the up-to-date HybridTime value on the LEADER.

Test Plan:
ybd --java-test org.yb.loadtester.TestRF1Cluster#testDefaultTTLWithFixedRF
ybd --java-test org.yb.loadtester.TestRF1Cluster#testDefaultTTLWithFixedRF -n 100

ybd --java-test org.yb.loadtester.TestRF1Cluster#testDefaultTTLWithChangedRF
ybd --java-test org.yb.loadtester.TestRF1Cluster#testDefaultTTLWithChangedRF -n 100

GO script attached to the github issue.

Reviewers: kannan, mikhail, sergei

Reviewed By: sergei

Subscribers: mikhail, bogdan, kannan, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D7330
  • Loading branch information
OlegLoginov committed Oct 22, 2019
1 parent 634fc17 commit 320759a
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,21 @@
//
package org.yb.loadtester;

import java.util.List;

import com.datastax.driver.core.*;
import com.datastax.driver.core.querybuilder.QueryBuilder;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yb.AssertionWrappers;
import org.yb.YBTestRunner;
import org.yb.minicluster.MiniYBCluster;

import static junit.framework.TestCase.assertTrue;
import static junit.framework.TestCase.assertEquals;

/**
* This is an integration test that is specific to RF=1 case, so that we can default to
Expand Down Expand Up @@ -61,4 +68,94 @@ public void testRF1LoadBalance() throws Exception {
Thread.sleep(2 * MiniYBCluster.CQL_NODE_LIST_REFRESH_SECS * 1000);
LOG.info("Load Balance Done.");
}

private boolean verifyNumRowsInTable(
ConsistencyLevel consistencyLevel, String tableName, int numRows) {
Statement statement = QueryBuilder.select()
.from(DEFAULT_TEST_KEYSPACE, tableName)
.setConsistencyLevel(consistencyLevel);
return numRows == session.execute(statement).all().size();
}

private void internalTestDefaultTTL() throws Exception {
// A set of write/read ops to stabilize the table.
for (int i = 0; i < 10; ++i) {
session.execute(String.format("INSERT INTO employee(id, name, age, lang) " +
"VALUES(%d, 'John+%d', %d, 'Go')", 100 + i, i, i));
session.execute("SELECT * FROM employee");
session.execute(String.format("DELETE FROM employee WHERE id = %d", 100 + i));
}

long start_time = System.currentTimeMillis();
int num_rows = 3;
for (int i = 0; i < num_rows; ++i) {
session.execute(String.format(
"INSERT INTO employee(id, name, age, lang) VALUES(%d, 'John+%d', %d, 'Go')", i, i, i));
}

LOG.info("Select from table and verify inserted rows. Time after start (ms): " +
(System.currentTimeMillis() - start_time));
ResultSet rs = session.execute("SELECT * FROM employee");
List<Row> rows = rs.all();
LOG.info("Row count: " + rows.size());
assertEquals(num_rows, rows.size());

long sleep_time = 5000 + start_time - System.currentTimeMillis();
if (sleep_time > 0) {
LOG.info("Sleep time (ms): " + sleep_time);
Thread.sleep(sleep_time);
}

LOG.info("Select from table and verify TTL has expired - read by default.");
rs = session.execute("SELECT * FROM employee");
rows = rs.all();
LOG.info("Row count: " + rows.size());
assertEquals(0, rows.size());

// Test all available Consistency Levels.
for (ConsistencyLevel level : ConsistencyLevel.values()) {
LOG.info("Test expired TTL with Consistency: " + level);
assertTrue(verifyNumRowsInTable(level, "employee", 0));
}
}

@Test
public void testDefaultTTLWithChangedRF() throws Exception {
LOG.info("Create table and insert values with default TTL for RF = 1");
session.execute("CREATE TABLE employee (id int PRIMARY KEY, name varchar, age int, " +
"lang varchar) WITH default_time_to_live = 4");

internalTestDefaultTTL();

LOG.info("Change replication factor from 1 to 3");
performRFChange(1, 3);

LOG.info("Insert values with default TTL for RF = 3");
internalTestDefaultTTL();

LOG.info("Change replication factor from 3 to 1");
performRFChange(3, 1);

LOG.info("Insert values with default TTL for reduced RF = 1");
internalTestDefaultTTL();

session.execute("DROP TABLE employee");
}

@Test
public void testDefaultTTLWithFixedRF() throws Exception {
for (int rf = 1; rf <= 5; rf += 2) {
// Destroy existing cluster and recreate it.
destroyMiniCluster();
createMiniCluster(rf, rf);
updateConfigReplicationFactor(rf);
setUpCqlClient();

LOG.info("Create table and insert values with default TTL for RF = " + rf);
session.execute("CREATE TABLE employee (id int PRIMARY KEY, name varchar, age int, " +
"lang varchar) WITH default_time_to_live = 4");

internalTestDefaultTTL();
}
}
}
8 changes: 8 additions & 0 deletions src/yb/consensus/raft_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2828,6 +2828,9 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
LOG(WARNING) << "Could not clear pending state : " << s.ToString();
}
}
} else if (IsChangeConfigOperation(op_type) && change_config_replicated_listener_) {
// Notify the TabletPeer owner object.
change_config_replicated_listener_(state_->GetCommittedConfigUnlocked());
}

client_cb(status);
Expand Down Expand Up @@ -2949,6 +2952,11 @@ void RaftConsensus::SetMajorityReplicatedListener(std::function<void()> updater)
majority_replicated_listener_ = std::move(updater);
}

void RaftConsensus::SetChangeConfigReplicatedListener(
std::function<void(const RaftConfigPB&)> listener) {
change_config_replicated_listener_ = std::move(listener);
}

yb::OpId RaftConsensus::MinRetryableRequestOpId() {
return state_->MinRetryableRequestOpId();
}
Expand Down
8 changes: 8 additions & 0 deletions src/yb/consensus/raft_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,

void SetMajorityReplicatedListener(std::function<void()> updater);

void SetChangeConfigReplicatedListener(std::function<void(const RaftConfigPB&)> listener);

yb::OpId MinRetryableRequestOpId();

CHECKED_STATUS StartElection(const LeaderElectionData& data) override {
Expand Down Expand Up @@ -667,6 +669,12 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// waiting for it to advance.
std::function<void()> majority_replicated_listener_;

// This is called every time the Raft config was changed and replicated.
// This is used to notify the higher layer about the config change. Currently it's
// needed to update the internal flag in the MvccManager to return a correct safe
// time value for a read/write operation in case of RF==1 mode.
std::function<void(const RaftConfigPB&)> change_config_replicated_listener_;

scoped_refptr<Histogram> update_raft_config_dns_latency_;

// Used only when follower_reject_update_consensus_requests_seconds is greater than 0.
Expand Down
12 changes: 12 additions & 0 deletions src/yb/tablet/mvcc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,21 @@ void MvccManager::UpdatePropagatedSafeTimeOnLeader(HybridTime ht_lease) {
cond_.notify_all();
}

void MvccManager::SetLeaderOnlyMode(bool leader_only) {
std::unique_lock<std::mutex> lock(mutex_);
leader_only_mode_ = leader_only;
}

HybridTime MvccManager::SafeTimeForFollower(
HybridTime min_allowed, CoarseTimePoint deadline) const {
std::unique_lock<std::mutex> lock(mutex_);

if (leader_only_mode_) {
// If there are no followers (RF == 1), use SafeTime()
// because propagated_safe_time_ can be not updated.
return DoGetSafeTime(min_allowed, deadline, HybridTime::kMax, &lock);
}

SafeTimeWithSource result;
auto predicate = [this, &result, min_allowed] {
// last_replicated_ is updated earlier than propagated_safe_time_, so because of concurrency it
Expand Down
6 changes: 6 additions & 0 deletions src/yb/tablet/mvcc.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ class MvccManager {
// `prefix` is used for logging.
explicit MvccManager(std::string prefix, server::ClockPtr clock);

// Set special RF==1 mode flag to handle safe time requests correctly in case
// there are no heartbeats to update internal propagated_safe_time_ correctly.
void SetLeaderOnlyMode(bool leader_only);

// Sets time of last replicated operation, used after bootstrap.
void SetLastReplicated(HybridTime ht);

Expand Down Expand Up @@ -153,6 +157,8 @@ class MvccManager {
// leader, this is a safe time that gets updated every time the majority-replicated watermarks
// change.
HybridTime propagated_safe_time_ = HybridTime::kMin;
// Special flag for RF==1 mode when propagated_safe_time_ can be not up-to-date.
bool leader_only_mode_ = false;

// Because different calls that have current hybrid time leader lease as an argument can come to
// us out of order, we might see an older value of hybrid time leader lease expiration after a
Expand Down
1 change: 0 additions & 1 deletion src/yb/tablet/tablet_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ using base::subtle::Barrier_AtomicIncrement;
using strings::Substitute;

using yb::consensus::MinimumOpId;
using yb::consensus::RaftConfigPB;

namespace yb {
namespace tablet {
Expand Down
9 changes: 8 additions & 1 deletion src/yb/tablet/tablet_peer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,13 @@ Status TabletPeer::InitTabletPeer(const shared_ptr<TabletClass> &tablet,
mvcc_manager->UpdatePropagatedSafeTimeOnLeader(ht_lease);
}
});

auto mvcc_leader_only_mode_updater = [this](const RaftConfigPB& config) {
tablet_->mvcc_manager()->SetLeaderOnlyMode(config.peers_size() == 1);
};

mvcc_leader_only_mode_updater(RaftConfig()); // Set initial flag value.
consensus_->SetChangeConfigReplicatedListener(mvcc_leader_only_mode_updater);
}

RETURN_NOT_OK(prepare_thread_->Start());
Expand Down Expand Up @@ -1004,7 +1011,7 @@ HybridTime TabletPeer::HtLeaseExpiration() const {

TableType TabletPeer::table_type() {
// TODO: what if tablet is not set?
return tablet()->table_type();
return DCHECK_NOTNULL(tablet())->table_type();
}

void TabletPeer::SetFailed(const Status& error) {
Expand Down

0 comments on commit 320759a

Please sign in to comment.