Skip to content

Commit

Permalink
Add support for multi-process subscription state change notifications (
Browse files Browse the repository at this point in the history
…#7862)

As with the other multi-process notifications, the core idea here is to
eliminate the in-memory state and produce notifications based entirely on the
current state of the Realm file.

SubscriptionStore::update_state() has been replaced with separate functions for
the specific legal state transitions, which also take a write transaction as a
parameter. These functions are called by PendingBootstrapStore inside the same
write transaction as the bootstrap updates which changed the subscription
state. This is both a minor performance optimization (due to fewer writes) and
eliminates a brief window between the two writes where the Realm file was in an
inconsistent state.

There's a minor functional change here: previously old subscription sets were
superseded when the new one reached the Completed state, and now they are
superseded on AwaitingMark. This aligns it with when the new subscription set
becomes the one which is returned by get_active().
  • Loading branch information
tgoyne authored Jul 30, 2024
1 parent f93e758 commit 819bb98
Show file tree
Hide file tree
Showing 20 changed files with 630 additions and 505 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

### Internals
* Fix emscripten build and add emscripten debug/release compile tasks to evergreen. ([PR #7916](https://github.com/realm/realm-core/pull/7916))
* Subscription set state change notifications now work in a multiprocess-compatible manner ([PR #7862](https://github.com/realm/realm-core/pull/7862)).

----------------------------------------------

Expand Down
182 changes: 37 additions & 145 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener

void handle_pending_client_reset_acknowledgement();

void update_subscription_version_info();

// Can be called from any thread.
std::string get_appservices_connection_id();

Expand Down Expand Up @@ -149,7 +147,7 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
uint64_t uploadable;
uint64_t downloaded;
uint64_t downloadable;
int64_t query_version;
int64_t query_version = 0;
double download_estimate;

// Does not check snapshot
Expand Down Expand Up @@ -179,11 +177,7 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
const uint64_t m_schema_version;

std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
int64_t m_flx_active_version = 0;
int64_t m_flx_last_seen_version = 0;
int64_t m_flx_pending_mark_version = 0;
std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;

std::shared_ptr<MigrationStore> m_migration_store;

// Set to true when this session wrapper is actualized (i.e. the wrapped
Expand Down Expand Up @@ -242,8 +236,6 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
void on_resumed();
void on_connection_state_changed(ConnectionState, const std::optional<SessionErrorInfo>&);
void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
void on_flx_sync_error(int64_t version, std::string_view err_msg);
void on_flx_sync_version_complete(int64_t version);

void init_progress_handler();
void check_progress();
Expand Down Expand Up @@ -795,14 +787,6 @@ void SessionImpl::handle_pending_client_reset_acknowledgement()
}
}

void SessionImpl::update_subscription_version_info()
{
// Ignore the call if the session is not active
if (m_state == State::Active) {
m_wrapper.update_subscription_version_info();
}
}

bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
{
// Ignore the message if the session is not active or a steady state message
Expand All @@ -818,10 +802,8 @@ bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
maybe_progress = message.progress;
}

bool new_batch = false;
try {
bootstrap_store->add_batch(*message.query_version, std::move(maybe_progress), message.downloadable,
message.changesets, &new_batch);
bootstrap_store->add_batch(*message.query_version, maybe_progress, message.downloadable, message.changesets);
}
catch (const LogicError& ex) {
if (ex.code() == ErrorCodes::LimitExceeded) {
Expand All @@ -834,12 +816,6 @@ bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
throw;
}

// If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
// bootstrapping.
if (new_batch && message.batch_state == DownloadBatchState::MoreToCome) {
on_flx_sync_progress(*message.query_version, DownloadBatchState::MoreToCome);
}

auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, message.progress,
*message.query_version, message.batch_state, message.changesets.size());
if (hook_action == SyncClientHookAction::EarlyReturn) {
Expand Down Expand Up @@ -891,13 +867,11 @@ void SessionImpl::process_pending_flx_bootstrap()
TransactionRef transact = get_db()->start_write();
while (bootstrap_store->has_pending()) {
auto start_time = std::chrono::steady_clock::now();
auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
auto pending_batch = bootstrap_store->peek_pending(*transact, m_wrapper.m_flx_bootstrap_batch_size_bytes);
if (!pending_batch.progress) {
logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
// Close the write transaction before clearing the bootstrap store to avoid a deadlock because the
// bootstrap store requires a write transaction itself.
transact->close();
bootstrap_store->clear();
bootstrap_store->clear(*transact, pending_batch.query_version);
transact->commit();
return;
}

Expand All @@ -915,7 +889,7 @@ void SessionImpl::process_pending_flx_bootstrap()

history.integrate_server_changesets(
*pending_batch.progress, 1.0, pending_batch.changesets, new_version, batch_state, logger, transact,
[&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
[&](const Transaction& tr, util::Span<Changeset> changesets_applied) {
REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
bootstrap_store->pop_front_pending(tr, changesets_applied.size());
});
Expand All @@ -935,7 +909,6 @@ void SessionImpl::process_pending_flx_bootstrap()
}

REALM_ASSERT_3(query_version, !=, -1);
on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);

on_changesets_integrated(new_version.realm_version, progress);
auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
Expand All @@ -948,15 +921,7 @@ void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
{
// Ignore the call if the session is not active
if (m_state == State::Active) {
m_wrapper.on_flx_sync_error(version, err_msg);
}
}

void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
{
// Ignore the call if the session is not active
if (m_state == State::Active) {
m_wrapper.on_flx_sync_progress(version, batch_state);
get_flx_subscription_store()->set_error(version, err_msg);
}
}

Expand All @@ -974,14 +939,6 @@ MigrationStore* SessionImpl::get_migration_store()
return m_wrapper.get_migration_store();
}

void SessionImpl::on_flx_sync_version_complete(int64_t version)
{
// Ignore the call if the session is not active
if (m_state == State::Active) {
m_wrapper.on_flx_sync_version_complete(version);
}
}

SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
{
// Should never be called if session is not active
Expand Down Expand Up @@ -1182,75 +1139,6 @@ bool SessionWrapper::has_flx_subscription_store() const
return static_cast<bool>(m_flx_subscription_store);
}

void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
{
REALM_ASSERT(!m_finalized);
get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
}

void SessionWrapper::on_flx_sync_version_complete(int64_t version)
{
REALM_ASSERT(!m_finalized);
m_flx_last_seen_version = version;
m_flx_active_version = version;
}

void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
{
if (!has_flx_subscription_store()) {
return;
}

REALM_ASSERT(!m_finalized);
if (batch_state == DownloadBatchState::SteadyState) {
throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
"Unexpected batch state of SteadyState while downloading bootstrap");
}
// Is this a server-initiated bootstrap? Skip notifying the subscription store
if (new_version == m_flx_active_version) {
return;
}
if (new_version < m_flx_active_version) {
throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
util::format("Bootstrap query version %1 is less than active version %2",
new_version, m_flx_active_version));
}
if (new_version < m_flx_last_seen_version) {
throw IntegrationException(
ErrorCodes::SyncProtocolInvariantFailed,
util::format("Download message query version %1 is less than current bootstrap version %2", new_version,
m_flx_last_seen_version));
}

SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy

switch (batch_state) {
case DownloadBatchState::SteadyState:
// Cannot be called with this value.
REALM_UNREACHABLE();
case DownloadBatchState::LastInBatch:
on_flx_sync_version_complete(new_version);
if (new_version == 0) {
new_state = SubscriptionSet::State::Complete;
}
else {
new_state = SubscriptionSet::State::AwaitingMark;
m_flx_pending_mark_version = new_version;
}
break;
case DownloadBatchState::MoreToCome:
if (m_flx_last_seen_version == new_version) {
return;
}

m_flx_last_seen_version = new_version;
new_state = SubscriptionSet::State::Bootstrapping;
break;
}

get_flx_subscription_store()->update_state(new_version, new_state);
}

SubscriptionStore* SessionWrapper::get_flx_subscription_store()
{
REALM_ASSERT(!m_finalized);
Expand Down Expand Up @@ -1443,7 +1331,8 @@ void SessionWrapper::actualize()
conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token); // Throws
std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
if (m_sync_mode == SyncServerMode::FLX) {
m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
m_flx_pending_bootstrap_store =
std::make_unique<PendingBootstrapStore>(m_db, sess->logger, m_flx_subscription_store);
}

sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
Expand All @@ -1453,10 +1342,6 @@ void SessionWrapper::actualize()
});
conn.activate_session(std::move(sess)); // Throws

// Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous
// session cannot change the state of the bootstrap store at the same time.
update_subscription_version_info();

if (was_created)
conn.activate(); // Throws

Expand Down Expand Up @@ -1563,6 +1448,10 @@ void SessionWrapper::on_download_completion()
// completion without this.
check_progress();

if (m_flx_subscription_store) {
m_flx_subscription_store->download_complete();
}

while (!m_download_completion_handlers.empty()) {
auto handler = std::move(m_download_completion_handlers.back());
m_download_completion_handlers.pop_back();
Expand All @@ -1573,13 +1462,6 @@ void SessionWrapper::on_download_completion()
m_upload_completion_handlers.push_back(std::move(handler)); // Throws
m_sync_completion_handlers.pop_back();
}

if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
m_flx_pending_mark_version);
m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
}
}


Expand Down Expand Up @@ -1626,15 +1508,34 @@ void SessionWrapper::check_progress()
REALM_ASSERT(!m_finalized);
REALM_ASSERT(m_sess);

if (!m_progress_handler && m_upload_completion_handlers.empty() && m_sync_completion_handlers.empty())
// Check if there's anything which even wants progress or completion information
bool has_progress_handler = m_progress_handler && m_reliable_download_progress;
bool has_completion_handler = !m_upload_completion_handlers.empty() || !m_sync_completion_handlers.empty();
if (!m_flx_subscription_store && !has_progress_handler && !has_completion_handler)
return;

version_type uploaded_version;
// The order in which we report each type of completion or progress is important,
// and changing it needs to be avoided as it'd be a breaking change to the APIs

TransactionRef tr;
ReportedProgress p;
if (m_flx_subscription_store) {
m_flx_subscription_store->report_progress(tr);
}

if (!has_progress_handler && !has_completion_handler)
return;
// The subscription store may have started a read transaction that we'll
// reuse, but it may not have needed to or may not exist
if (!tr)
tr = m_db->start_read();

version_type uploaded_version;
DownloadableProgress downloadable;
ClientHistory::get_upload_download_state(*m_db, p.downloaded, downloadable, p.uploaded, p.uploadable, p.snapshot,
uploaded_version);
p.query_version = m_flx_last_seen_version;
ClientHistory::get_upload_download_state(*tr, m_db->get_alloc(), p.downloaded, downloadable, p.uploaded,
p.uploadable, p.snapshot, uploaded_version);
if (m_flx_subscription_store && has_progress_handler)
p.query_version = m_flx_subscription_store->get_downloading_query_version(*tr);

report_progress(p, downloadable);
report_upload_completion(uploaded_version);
Expand Down Expand Up @@ -1793,15 +1694,6 @@ void SessionWrapper::handle_pending_client_reset_acknowledgement()
});
}

void SessionWrapper::update_subscription_version_info()
{
if (!m_flx_subscription_store)
return;
auto versions_info = m_flx_subscription_store->get_version_info();
m_flx_active_version = versions_info.active;
m_flx_pending_mark_version = versions_info.pending_mark;
}

std::string SessionWrapper::get_appservices_connection_id()
{
auto pf = util::make_promise_future<std::string>();
Expand Down
17 changes: 8 additions & 9 deletions src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ void ClientHistory::integrate_server_changesets(
const SyncProgress& progress, DownloadableProgress downloadable_bytes,
util::Span<const RemoteChangeset> incoming_changesets, VersionInfo& version_info, DownloadBatchState batch_state,
util::Logger& logger, const TransactionRef& transact,
util::UniqueFunction<void(const TransactionRef&, util::Span<Changeset>)> run_in_write_tr)
util::UniqueFunction<void(const Transaction&, util::Span<Changeset>)> run_in_write_tr)
{
REALM_ASSERT(incoming_changesets.size() != 0);
REALM_ASSERT(
Expand Down Expand Up @@ -502,7 +502,7 @@ void ClientHistory::integrate_server_changesets(
update_sync_progress(progress, downloadable_bytes); // Throws
}
if (run_in_write_tr) {
run_in_write_tr(transact, changesets_for_cb);
run_in_write_tr(*transact, changesets_for_cb);
}

// The reason we can use the `origin_timestamp`, and the `origin_file_ident`
Expand Down Expand Up @@ -610,14 +610,13 @@ size_t ClientHistory::transform_and_apply_server_changesets(util::Span<Changeset
}


void ClientHistory::get_upload_download_state(DB& db, std::uint_fast64_t& downloaded_bytes,
void ClientHistory::get_upload_download_state(Transaction& rt, Allocator& alloc, std::uint_fast64_t& downloaded_bytes,
DownloadableProgress& downloadable_bytes,
std::uint_fast64_t& uploaded_bytes,
std::uint_fast64_t& uploadable_bytes,
std::uint_fast64_t& snapshot_version, version_type& uploaded_version)
{
TransactionRef rt = db.start_read(); // Throws
version_type current_client_version = rt->get_version();
version_type current_client_version = rt.get_version();

downloaded_bytes = 0;
downloadable_bytes = uint64_t(0);
Expand All @@ -627,11 +626,11 @@ void ClientHistory::get_upload_download_state(DB& db, std::uint_fast64_t& downlo
uploaded_version = 0;

using gf = _impl::GroupFriend;
ref_type ref = gf::get_history_ref(*rt);
ref_type ref = gf::get_history_ref(rt);
if (!ref)
return;

Array root(db.get_alloc());
Array root(alloc);
root.init_from_ref(ref);
downloaded_bytes = root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int();
downloadable_bytes = root.get_as_ref_or_tagged(s_progress_downloadable_bytes_iip).get_as_int();
Expand All @@ -642,9 +641,9 @@ void ClientHistory::get_upload_download_state(DB& db, std::uint_fast64_t& downlo
if (uploaded_version == current_client_version)
return;

BinaryColumn changesets(db.get_alloc());
BinaryColumn changesets(alloc);
changesets.init_from_ref(root.get_as_ref(s_changesets_iip));
IntegerBpTree origin_file_idents(db.get_alloc());
IntegerBpTree origin_file_idents(alloc);
origin_file_idents.init_from_ref(root.get_as_ref(s_origin_file_idents_iip));

// `base_version` is the oldest version we have history for. If this is
Expand Down
Loading

0 comments on commit 819bb98

Please sign in to comment.