Skip to content

Commit

Permalink
Adopt CheckedMutex in ClientImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
tgoyne committed May 29, 2024
1 parent 702189a commit 62ca644
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 75 deletions.
67 changes: 40 additions & 27 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener

bool m_suspended = false;

// Set when the session has been abandoned, but before it's been finalized.
// Set when the session has been abandoned. After this point none of the
// public API functions should be called again.
bool m_abandoned = false;
// Has the SessionWrapper been finalized?
bool m_finalized = false;
Expand Down Expand Up @@ -439,7 +440,7 @@ bool ClientImpl::wait_for_session_terminations_or_client_stopped()
// Thread safety required

{
std::lock_guard lock{m_mutex};
util::CheckedLockGuard lock{m_mutex};
m_sessions_terminated = false;
}

Expand All @@ -466,16 +467,19 @@ bool ClientImpl::wait_for_session_terminations_or_client_stopped()
else if (!status.is_ok())
throw Exception(status);

std::lock_guard lock{m_mutex};
m_sessions_terminated = true;
{
util::CheckedLockGuard lock{m_mutex};
m_sessions_terminated = true;
}
m_wait_or_client_stopped_cond.notify_all();
}); // Throws

bool completion_condition_was_satisfied;
{
std::unique_lock lock{m_mutex};
while (!m_sessions_terminated && !m_stopped)
m_wait_or_client_stopped_cond.wait(lock);
util::CheckedUniqueLock lock{m_mutex};
m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_mutex) {
return m_sessions_terminated || m_stopped;
});
completion_condition_was_satisfied = !m_stopped;
}
return completion_condition_was_satisfied;
Expand Down Expand Up @@ -510,13 +514,13 @@ void ClientImpl::drain_connections_on_loop()
void ClientImpl::shutdown_and_wait()
{
shutdown();
std::unique_lock lock{m_drain_mutex};
util::CheckedUniqueLock lock{m_drain_mutex};
if (m_drained) {
return;
}

logger.debug("Waiting for %1 connections to drain", m_num_connections);
m_drain_cv.wait(lock, [&] {
m_drain_cv.wait(lock.native_handle(), [&]() REQUIRES(m_drain_mutex) {
return m_num_connections == 0 && m_outstanding_posts == 0;
});

Expand All @@ -526,12 +530,12 @@ void ClientImpl::shutdown_and_wait()
void ClientImpl::shutdown() noexcept
{
{
std::lock_guard lock{m_mutex};
util::CheckedLockGuard lock{m_mutex};
if (m_stopped)
return;
m_stopped = true;
m_wait_or_client_stopped_cond.notify_all();
}
m_wait_or_client_stopped_cond.notify_all();

drain_connections_on_loop();
}
Expand All @@ -541,7 +545,7 @@ void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper,
{
// Thread safety required.
{
std::lock_guard lock{m_mutex};
util::CheckedLockGuard lock{m_mutex};
REALM_ASSERT(m_actualize_and_finalize);
wrapper->mark_initiated();
m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws
Expand All @@ -554,7 +558,7 @@ void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrappe
{
// Thread safety required.
{
std::lock_guard lock{m_mutex};
util::CheckedLockGuard lock{m_mutex};
REALM_ASSERT(m_actualize_and_finalize);
wrapper->mark_abandoned();

Expand All @@ -581,7 +585,7 @@ void ClientImpl::actualize_and_finalize_session_wrappers()
SessionWrapperStack abandoned_session_wrappers;
bool stopped;
{
std::lock_guard lock{m_mutex};
util::CheckedLockGuard lock{m_mutex};
swap(m_unactualized_session_wrappers, unactualized_session_wrappers);
swap(m_abandoned_session_wrappers, abandoned_session_wrappers);
stopped = m_stopped;
Expand Down Expand Up @@ -643,7 +647,7 @@ ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
m_prev_connection_ident = ident;
was_created = true;
{
std::lock_guard lk(m_drain_mutex);
util::CheckedLockGuard lk(m_drain_mutex);
++m_num_connections;
}
return conn;
Expand Down Expand Up @@ -671,10 +675,13 @@ void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
server_slot.alt_connections.erase(j);
}

bool notify;
{
std::lock_guard lk(m_drain_mutex);
util::CheckedLockGuard lk(m_drain_mutex);
REALM_ASSERT(m_num_connections);
--m_num_connections;
notify = --m_num_connections <= 0;
}
if (notify) {
m_drain_cv.notify_all();
}
}
Expand Down Expand Up @@ -1480,7 +1487,7 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped()

std::int_fast64_t target_mark;
{
std::lock_guard lock{m_client.m_mutex};
util::CheckedLockGuard lock{m_client.m_mutex};
target_mark = ++m_target_upload_mark;
}

Expand Down Expand Up @@ -1508,9 +1515,10 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped()

bool completion_condition_was_satisfied;
{
std::unique_lock lock{m_client.m_mutex};
while (m_reached_upload_mark < target_mark && !m_client.m_stopped)
m_client.m_wait_or_client_stopped_cond.wait(lock);
util::CheckedUniqueLock lock{m_client.m_mutex};
m_client.m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_client.m_mutex) {
return m_reached_upload_mark >= target_mark || m_client.m_stopped;
});
completion_condition_was_satisfied = !m_client.m_stopped;
}
return completion_condition_was_satisfied;
Expand All @@ -1525,7 +1533,7 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped()

std::int_fast64_t target_mark;
{
std::lock_guard lock{m_client.m_mutex};
util::CheckedLockGuard lock{m_client.m_mutex};
target_mark = ++m_target_download_mark;
}

Expand Down Expand Up @@ -1553,9 +1561,10 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped()

bool completion_condition_was_satisfied;
{
std::unique_lock lock{m_client.m_mutex};
while (m_reached_download_mark < target_mark && !m_client.m_stopped)
m_client.m_wait_or_client_stopped_cond.wait(lock);
util::CheckedUniqueLock lock{m_client.m_mutex};
m_client.m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_client.m_mutex) {
return m_reached_download_mark >= target_mark || m_client.m_stopped;
});
completion_condition_was_satisfied = !m_client.m_stopped;
}
return completion_condition_was_satisfied;
Expand Down Expand Up @@ -1688,6 +1697,10 @@ void SessionWrapper::force_close()
}

// Must be called from event loop thread
//
// `m_client.m_mutex` is not held while this is called, but it is guaranteed to
// have been acquired at some point in between the final read or write ever made
// from a different thread and when this is called.
void SessionWrapper::finalize()
{
REALM_ASSERT(m_actualized);
Expand Down Expand Up @@ -1780,7 +1793,7 @@ void SessionWrapper::on_upload_completion()
m_download_completion_handlers.push_back(std::move(handler)); // Throws
m_sync_completion_handlers.pop_back();
}
std::lock_guard lock{m_client.m_mutex};
util::CheckedLockGuard lock{m_client.m_mutex};
if (m_staged_upload_mark > m_reached_upload_mark) {
m_reached_upload_mark = m_staged_upload_mark;
m_client.m_wait_or_client_stopped_cond.notify_all();
Expand Down Expand Up @@ -1808,7 +1821,7 @@ void SessionWrapper::on_download_completion()
m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
}

std::lock_guard lock{m_client.m_mutex};
util::CheckedLockGuard lock{m_client.m_mutex};
if (m_staged_download_mark > m_reached_download_mark) {
m_reached_download_mark = m_staged_download_mark;
m_client.m_wait_or_client_stopped_cond.notify_all();
Expand Down
42 changes: 23 additions & 19 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,21 +230,31 @@ ClientImpl::ClientImpl(ClientConfig config)
});
}

void ClientImpl::incr_outstanding_posts()
{
util::CheckedLockGuard lock(m_drain_mutex);
++m_outstanding_posts;
m_drained = false;
}

void ClientImpl::decr_outstanding_posts()
{
util::CheckedLockGuard lock(m_drain_mutex);
REALM_ASSERT(m_outstanding_posts);
if (--m_outstanding_posts <= 0) {
// Notify must happen with lock held or another thread could destroy
// ClientImpl between when we release the lock and when we call notify
m_drain_cv.notify_all();
}
}

void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
{
REALM_ASSERT(m_socket_provider);
{
std::lock_guard lock(m_drain_mutex);
++m_outstanding_posts;
m_drained = false;
}
incr_outstanding_posts();
m_socket_provider->post([handler = std::move(handler), this](Status status) {
auto decr_guard = util::make_scope_exit([&]() noexcept {
std::lock_guard lock(m_drain_mutex);
REALM_ASSERT(m_outstanding_posts);
--m_outstanding_posts;
m_drain_cv.notify_all();
decr_outstanding_posts();
});
handler(status);
});
Expand Down Expand Up @@ -274,18 +284,12 @@ SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds
SyncSocketProvider::FunctionHandler&& handler)
{
REALM_ASSERT(m_socket_provider);
{
std::lock_guard lock(m_drain_mutex);
++m_outstanding_posts;
m_drained = false;
}
incr_outstanding_posts();
return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
auto decr_guard = util::make_scope_exit([&]() noexcept {
decr_outstanding_posts();
});
handler(status);

std::lock_guard lock(m_drain_mutex);
REALM_ASSERT(m_outstanding_posts);
--m_outstanding_posts;
m_drain_cv.notify_all();
});
}

Expand Down
57 changes: 28 additions & 29 deletions src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include <realm/sync/subscriptions.hpp>
#include <realm/sync/trigger.hpp>
#include <realm/util/buffer_stream.hpp>
#include <realm/util/checked_mutex.hpp>
#include <realm/util/logger.hpp>
#include <realm/util/optional.hpp>
#include <realm/util/span.hpp>

#include <cstdint>
Expand Down Expand Up @@ -207,19 +207,19 @@ class ClientImpl {

static constexpr int get_oldest_supported_protocol_version() noexcept;

void shutdown() noexcept;
void shutdown() noexcept REQUIRES(!m_mutex, !m_drain_mutex);

void shutdown_and_wait();
void shutdown_and_wait() REQUIRES(!m_mutex, !m_drain_mutex);

const std::string& get_user_agent_string() const noexcept;
ReconnectMode get_reconnect_mode() const noexcept;
bool is_dry_run() const noexcept;

// Functions to post onto the event loop and create an event loop timer using the
// SyncSocketProvider
void post(SyncSocketProvider::FunctionHandler&& handler);
void post(SyncSocketProvider::FunctionHandler&& handler) REQUIRES(!m_drain_mutex);
SyncSocketProvider::SyncTimer create_timer(std::chrono::milliseconds delay,
SyncSocketProvider::FunctionHandler&& handler);
SyncSocketProvider::FunctionHandler&& handler) REQUIRES(!m_drain_mutex);
using SyncTrigger = std::unique_ptr<Trigger<ClientImpl>>;
SyncTrigger create_trigger(SyncSocketProvider::FunctionHandler&& handler);

Expand All @@ -229,11 +229,11 @@ class ClientImpl {
bool decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
port_type& port, std::string& path) const;

void cancel_reconnect_delay();
bool wait_for_session_terminations_or_client_stopped();
void cancel_reconnect_delay() REQUIRES(!m_drain_mutex);
bool wait_for_session_terminations_or_client_stopped() REQUIRES(!m_mutex, !m_drain_mutex);
// Async version of wait_for_session_terminations_or_client_stopped().
util::Future<void> notify_session_terminated();
void voluntary_disconnect_all_connections();
util::Future<void> notify_session_terminated() REQUIRES(!m_drain_mutex);
void voluntary_disconnect_all_connections() REQUIRES(!m_drain_mutex);

private:
using connection_ident_type = std::int_fast64_t;
Expand Down Expand Up @@ -283,36 +283,32 @@ class ClientImpl {
// Must be accessed only by event loop thread
connection_ident_type m_prev_connection_ident = 0;

std::mutex m_drain_mutex;
util::CheckedMutex m_drain_mutex;
std::condition_variable m_drain_cv;
bool m_drained = false;
uint64_t m_outstanding_posts = 0;
uint64_t m_num_connections = 0;
bool m_drained GUARDED_BY(m_drain_mutex) = false;
uint64_t m_outstanding_posts GUARDED_BY(m_drain_mutex) = 0;
uint64_t m_num_connections GUARDED_BY(m_drain_mutex) = 0;

std::mutex m_mutex;
util::CheckedMutex m_mutex;

bool m_stopped = false; // Protected by `m_mutex`
bool m_sessions_terminated = false; // Protected by `m_mutex`
bool m_stopped GUARDED_BY(m_mutex) = false;
bool m_sessions_terminated GUARDED_BY(m_mutex) = false;

// The set of session wrappers that are not yet wrapping a session object,
// and are not yet abandoned (still referenced by the application).
//
// Protected by `m_mutex`.
std::map<SessionWrapper*, ServerEndpoint> m_unactualized_session_wrappers;
std::map<SessionWrapper*, ServerEndpoint> m_unactualized_session_wrappers GUARDED_BY(m_mutex);

// The set of session wrappers that were successfully actualized, but are
// now abandoned (no longer referenced by the application), and have not yet
// been finalized. Order in queue is immaterial.
//
// Protected by `m_mutex`.
SessionWrapperStack m_abandoned_session_wrappers;
SessionWrapperStack m_abandoned_session_wrappers GUARDED_BY(m_mutex);

// Protected by `m_mutex`.
// Used with m_mutex
std::condition_variable m_wait_or_client_stopped_cond;

void register_unactualized_session_wrapper(SessionWrapper*, ServerEndpoint);
void register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper>) noexcept;
void actualize_and_finalize_session_wrappers();
void register_unactualized_session_wrapper(SessionWrapper*, ServerEndpoint) REQUIRES(!m_mutex);
void register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper>) noexcept REQUIRES(!m_mutex);
void actualize_and_finalize_session_wrappers() REQUIRES(!m_mutex);

// Get or create a connection. If a connection exists for the specified
// endpoint, it will be returned, otherwise a new connection will be
Expand Down Expand Up @@ -340,13 +336,16 @@ class ClientImpl {
bool verify_servers_ssl_certificate,
util::Optional<std::string> ssl_trust_certificate_path,
std::function<SyncConfig::SSLVerifyCallback>,
util::Optional<SyncConfig::ProxyConfig>, bool& was_created);
util::Optional<SyncConfig::ProxyConfig>, bool& was_created) REQUIRES(!m_drain_mutex);

// Destroys the specified connection.
void remove_connection(ClientImpl::Connection&) noexcept;
void remove_connection(ClientImpl::Connection&) noexcept REQUIRES(!m_drain_mutex);

void drain_connections();
void drain_connections_on_loop();
void drain_connections_on_loop() REQUIRES(!m_drain_mutex);

void incr_outstanding_posts() REQUIRES(!m_drain_mutex);
void decr_outstanding_posts() REQUIRES(!m_drain_mutex);

session_ident_type get_next_session_ident() noexcept;

Expand Down

0 comments on commit 62ca644

Please sign in to comment.