Skip to content

Commit

Permalink
Fix stmt level dependency replication
Browse files Browse the repository at this point in the history
Summary:
Fixed a bunch of things:

1. Used both before and after images of update events to find both RW and WW
conflicts
2. Added a flag to indicate a worker failure and stopped other workers from
executing events if the flag is on immediatly
3. Used references in for-loops for better performance
4. Used READ-COMMITTED in dep-stmt and dep-tbl to reduce trx retries on slaves

Squash with: D6138167

Reviewed By: hermanlee

Differential Revision: D6681471

fbshipit-source-id: ffa0059
  • Loading branch information
abhinav04sharma authored and facebook-github-bot committed Jan 10, 2018
1 parent f14c6bc commit 245589f
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 11 deletions.
2 changes: 2 additions & 0 deletions mysql-test/suite/rpl_mts/combinations
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ binlog-format=mixed
binlog-format=row
slave_use_idempotent_for_recovery=yes
mts_dependency_replication=TBL
slave_tx_isolation=READ-COMMITTED

[dep-stmt]
binlog-format=row
slave_use_idempotent_for_recovery=yes
mts_dependency_replication=STMT
slave_tx_isolation=READ-COMMITTED
15 changes: 12 additions & 3 deletions sql/dependency_slave_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,18 @@ bool Dependency_slave_worker::execute_group()
the DAG.
- If a temporary error occurs during execution of an event, the grp is
rollbacked and @current_event_index is reset to 0
- If a fatal error occurs we break out of the loop immidiately
- If a fatal error occurs we break out of the loop immediately
*/
DBUG_ASSERT(current_event_index == 0);
DBUG_ASSERT(events.size() >= 1);
do
{
auto ev= events[current_event_index];
if ((err= execute_event(ev)))
{
c_rli->dependency_worker_error= true;
break;
}
// case: we are not retrying this event, i.e. this was the first time we
// executed this event, so we should remove it from the DAG
if (!trans_retries || current_event_index >= last_current_event_index)
Expand All @@ -154,7 +157,7 @@ bool Dependency_slave_worker::execute_group()
if (err == 1)
{
// Signal a rollback if commit ordering is enabled, we have to do
// this here because it's an append error, so @slave_worker_ends_group
// this here because it's not an exec error, so @slave_worker_ends_group
// is not called
if (commit_order_mngr)
commit_order_mngr->report_rollback(this);
Expand All @@ -174,6 +177,12 @@ int Dependency_slave_worker::execute_event(Log_event_wrapper *ev)
{
// wait for all dependencies to be satisfied
ev->wait();

// case: there was an error in one of the workers, so let's skip execution of
// events immediately
if (c_rli->dependency_worker_error)
return 1;

// case: append to jobs queue only if this is not a trx retry, trx retries
// resets @current_event_index, see @slave_worker_ends_group
if (current_event_index == jobs.len)
Expand Down Expand Up @@ -221,7 +230,7 @@ void Dependency_slave_worker::remove_event(Log_event_wrapper *ev)
* 2) The "value" of the key-value pair is _not_ equal to this event. In this
* case, leave it be; the event corresponds to a later transaction.
*/
for (auto key : ev->keys)
for (auto& key : ev->keys)
{
auto it= c_rli->dag_key_last_penultimate_event.find(key);
DBUG_ASSERT(it != c_rli->dag_key_last_penultimate_event.end());
Expand Down
11 changes: 3 additions & 8 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3385,7 +3385,7 @@ void Log_event::do_post_end_event(Relay_log_info *rli, Log_event_wrapper *ev)
// populate key->last trx penultimate event map
// NOTE: we store the end event for a single event trx
Log_event_wrapper *to_add= rli->prev_event? rli->prev_event : ev;
for (auto key : rli->keys_accessed_by_group)
for (auto& key : rli->keys_accessed_by_group)
{
rli->dag_key_last_penultimate_event[key]= to_add;
to_add->keys.insert(key);
Expand Down Expand Up @@ -11973,11 +11973,6 @@ bool Rows_log_event::parse_keys(Relay_log_info* rli, Log_event_wrapper *ev,
DBUG_RETURN(false);
}

if (get_type_code() == UPDATE_ROWS_EVENT && i % 2 == 1)
{
continue;
}

curr_key.key_length= keyinfo->key_length;
curr_key.table_id= m_table_name;

Expand Down Expand Up @@ -12148,7 +12143,7 @@ void Rows_log_event::prepare(Relay_log_info *rli, Log_event_wrapper *ev)
get_keys(rli, ev, m_keylist);
DBUG_ASSERT(!m_keylist.empty());

for (auto k : m_keylist)
for (auto& k : m_keylist)
{
rli->keys_accessed_by_group.insert(k);
}
Expand All @@ -12170,7 +12165,7 @@ void Rows_log_event::do_add_to_dag(Relay_log_info *rli, Log_event_wrapper *ev)
}

/* Handle dependencies. */
for (auto k : m_keylist)
for (auto& k : m_keylist)
{
if (rli->dag_key_last_penultimate_event.find(k) !=
rli->dag_key_last_penultimate_event.end() &&
Expand Down
5 changes: 5 additions & 0 deletions sql/rpl_rli.h
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,9 @@ class Relay_log_info : public Rpl_info
Log_event_wrapper *current_begin_event= NULL;
bool dag_sync_group= false;

// Used to signal when a dependency worker dies
std::atomic<bool> dependency_worker_error{false};

inline void dag_rdlock()
{
mysql_rwlock_rdlock(&dag_lock);
Expand Down Expand Up @@ -1209,6 +1212,8 @@ class Relay_log_info : public Rpl_info
mysql_mutex_lock(&dag_group_ready_mutex);
dag_group_ready= false;
mysql_mutex_unlock(&dag_group_ready_mutex);

dependency_worker_error= false;
}
#endif // HAVE_REPLICATION and !MYSQL_CLIENT
};
Expand Down

0 comments on commit 245589f

Please sign in to comment.