diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b1e0dca69c..0a60c3f9047 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ accidentally triggering the load of a previous DB version.** **Features** * #5241 Allow RETURNING clause when inserting into compressed chunks +* #5245 Mange life-cycle of connections via memory contexts **Bugfixes** * #4804 Skip bucketing when start or end of refresh job is null diff --git a/src/cache.h b/src/cache.h index e80cf2e119f..0d676fc033d 100644 --- a/src/cache.h +++ b/src/cache.h @@ -65,9 +65,7 @@ extern TSDLLEXPORT void ts_cache_init(Cache *cache); extern TSDLLEXPORT void ts_cache_invalidate(Cache *cache); extern TSDLLEXPORT void *ts_cache_fetch(Cache *cache, CacheQuery *query); extern TSDLLEXPORT bool ts_cache_remove(Cache *cache, void *key); - -extern MemoryContext ts_cache_memory_ctx(Cache *cache); - +extern TSDLLEXPORT MemoryContext ts_cache_memory_ctx(Cache *cache); extern TSDLLEXPORT Cache *ts_cache_pin(Cache *cache); extern TSDLLEXPORT int ts_cache_release(Cache *cache); diff --git a/tsl/src/remote/connection.c b/tsl/src/remote/connection.c index 19be6c5f220..9896b98c6e1 100644 --- a/tsl/src/remote/connection.c +++ b/tsl/src/remote/connection.c @@ -51,10 +51,14 @@ * This library file contains convenience functionality around the libpq * API. The major additional functionality offered includes: * - * - libpq object lifecycles are tied to transactions (connections and - * results). This ensures that there are no memory leaks caused by libpq - * objects after a transaction completes. - * - connection configuration suitable for TimescaleDB. + * - Lifecycle management: a connection is tied to the memory context it is + * created in and result objects are tied to the connection they are created + * from. The aim is to avoid memory leaks of libpq objects that aren't + * allocated on a PostgreSQL memory context. + * + * - Connection configuration suitable for TimescaleDB, ensuring the data + * nodes use the same relevant configurations as the access node (e.g., time + * zone). * * NOTE that it is strongly adviced that connection-related functions do not * throw exceptions with, e.g., elog(ERROR). While exceptions can be caught @@ -68,8 +72,8 @@ * able to proceed even if the node is no longer available to respond to a * connection. Another example is performing a liveness check for node status. * - * Therefore, it is best that defer throwing exceptions to high-level - * functions that know when it is appropriate. + * Therefore, it is best to defer throwing exceptions to high-level functions + * that know when it is appropriate. */ /* for assigning cursor numbers and prepared statement numbers */ @@ -144,29 +148,27 @@ list_insert_after(ListNode *entry, ListNode *prev) */ typedef struct ResultEntry { - struct ListNode ln; /* Must be first entry */ - TSConnection *conn; /* The connection the result was created on */ - SubTransactionId subtxid; /* The subtransaction ID that created this result, if any. */ + struct ListNode ln; /* Must be first entry */ + TSConnection *conn; /* The connection the result was created on */ PGresult *result; } ResultEntry; typedef struct TSConnection { - ListNode ln; /* Must be first entry */ - PGconn *pg_conn; /* PostgreSQL connection */ - bool closing_guard; /* Guard against calling PQfinish() directly on PGconn */ + ListNode ln; /* Must be first entry */ + PGconn *pg_conn; /* PostgreSQL connection */ TSConnectionStatus status; - NameData node_name; /* Associated data node name */ - char *tz_name; /* Timezone name last sent over connection */ - bool autoclose; /* Set if this connection should automatically - * close at the end of the (sub-)transaction */ - SubTransactionId subtxid; /* The subtransaction ID that created this connection, if any. */ - int xact_depth; /* 0 => no transaction, 1 => main transaction, > 1 => - * levels of subtransactions */ - bool xact_transitioning; /* TRUE if connection is transitioning to - * another transaction state */ - ListNode results; /* Head of PGresult list */ + NameData node_name; /* Associated data node name */ + char tz_name[TZ_STRLEN_MAX + 1]; /* Timezone name last sent over connection */ + int xact_depth; /* 0 => no transaction, 1 => main transaction, > 1 => + * levels of subtransactions */ + bool xact_transitioning; /* TRUE if connection is transitioning to + * another transaction state */ + ListNode results; /* Head of PGresult list */ bool binary_copy; + MemoryContext mcxt; + MemoryContextCallback mcxt_cb; + bool mcxt_cb_invoked; } TSConnection; /* @@ -345,15 +347,6 @@ fill_result_error(TSConnectionError *err, int errcode, const char *errmsg, const #define EVENTPROC_FAILURE 0 #define EVENTPROC_SUCCESS 1 -static void -remote_connection_free(TSConnection *conn) -{ - if (NULL != conn->tz_name) - free(conn->tz_name); - - free(conn); -} - /* * Invoked on PQfinish(conn). Frees all PGresult objects created on the * connection, apart from those already freed with PQclear(). @@ -366,7 +359,6 @@ handle_conn_destroy(PGEventConnDestroy *event) ListNode *curr; Assert(NULL != conn); - Assert(conn->closing_guard); curr = conn->results.next; @@ -382,20 +374,18 @@ handle_conn_destroy(PGEventConnDestroy *event) results_count++; } - conn->pg_conn = NULL; - list_detach(&conn->ln); - if (results_count > 0) elog(DEBUG3, "cleared %u result objects on connection %p", results_count, conn); connstats.connections_closed++; - if (!conn->closing_guard) - { - ereport(WARNING, - (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("invalid closing of connection"))); - remote_connection_free(conn); - } + conn->pg_conn = NULL; + list_detach(&conn->ln); + + /* No need to delete the memory context here if handler was invoked by the + * MemoryContextDelete callback */ + if (!conn->mcxt_cb_invoked) + MemoryContextDelete(conn->mcxt); return EVENTPROC_SUCCESS; } @@ -411,29 +401,19 @@ handle_result_create(PGEventResultCreate *event) ResultEntry *entry; Assert(NULL != conn); - - /* We malloc this (instead of palloc) since bound PGresult, which also - * lives outside PostgreSQL's memory management. */ - entry = malloc(sizeof(ResultEntry)); + entry = MemoryContextAllocZero(conn->mcxt, sizeof(ResultEntry)); if (NULL == entry) return EVENTPROC_FAILURE; - MemSet(entry, 0, sizeof(ResultEntry)); entry->ln.next = entry->ln.prev = NULL; entry->conn = conn; entry->result = event->result; - entry->subtxid = GetCurrentSubTransactionId(); - /* Add entry as new head and set instance data */ list_insert_after(&entry->ln, &conn->results); PQresultSetInstanceData(event->result, eventproc, entry); - elog(DEBUG3, - "created result %p on connection %p subtxid %u", - event->result, - conn, - entry->subtxid); + elog(DEBUG3, "created result %p on connection %p", event->result, conn); connstats.results_created++; @@ -454,9 +434,9 @@ handle_result_destroy(PGEventResultDestroy *event) /* Detach entry */ list_detach(&entry->ln); - elog(DEBUG3, "destroyed result %p for subtxnid %u", entry->result, entry->subtxid); + elog(DEBUG3, "destroyed result %p", entry->result); - free(entry); + pfree(entry); connstats.results_cleared++; @@ -486,6 +466,10 @@ eventproc(PGEventId eventid, void *eventinfo, void *data) case PGEVT_RESULTDESTROY: res = handle_result_destroy((PGEventResultDestroy *) eventinfo); break; + case PGEVT_RESULTCOPY: + /* Not used in the code, so not handled */ + Assert(false); + break; default: /* Not of interest, so return success */ break; @@ -623,6 +607,35 @@ extract_connection_options(List *defelems, const char **keywords, const char **v return option_pos; } +static bool +get_update_conn_cmd(TSConnection *conn, StringInfo cmdbuf) +{ + const char *local_tz_name = pg_get_timezone_name(session_timezone); + + initStringInfo(cmdbuf); + + /* + * We need to enforce the same timezone setting across nodes. Otherwise, + * we might get the wrong result when we push down things like + * date_trunc(text, timestamptz). To safely do that, we also need the + * timezone databases to be the same on all data nodes. + * + * We save away the timezone name so that we know what we last sent over + * the connection. If the time zone changed since last time we sent a + * command, we will send a SET TIMEZONE command with the new timezone + * first. + */ + if (conn->tz_name[0] == '\0' || + (local_tz_name && pg_strcasecmp(conn->tz_name, local_tz_name) != 0)) + { + strncpy(conn->tz_name, local_tz_name, TZ_STRLEN_MAX); + appendStringInfo(cmdbuf, "SET TIMEZONE = '%s'", local_tz_name); + return true; + } + + return false; +} + /* * Internal connection configure. * @@ -630,18 +643,17 @@ extract_connection_options(List *defelems, const char **keywords, const char **v * changed. It is used to pass on configuration settings before executing a * command requested by module users. * - * ATTENTION! This function should *not* use - * `remote_connection_exec_ok_command` since this function is called - * indirectly whenever a remote command is executed, which would lead to - * infinite recursion. Stick to `PQ*` functions. - * * Returns true if the current configuration is OK (no change) or was * successfully applied, otherwise false. */ bool remote_connection_configure_if_changed(TSConnection *conn) { - const char *local_tz_name = pg_get_timezone_name(session_timezone); + StringInfoData cmd = { + .data = NULL, + .len = 0, + .maxlen = 0, + }; bool success = true; /* @@ -655,17 +667,11 @@ remote_connection_configure_if_changed(TSConnection *conn) * command, we will send a SET TIMEZONE command with the new timezone * first. */ - if (conn->tz_name == NULL || - (local_tz_name && pg_strcasecmp(conn->tz_name, local_tz_name) != 0)) + if (get_update_conn_cmd(conn, &cmd)) { - char *set_timezone_cmd = psprintf("SET TIMEZONE = '%s'", local_tz_name); - PGresult *result = PQexec(conn->pg_conn, set_timezone_cmd); - - success = PQresultStatus(result) == PGRES_COMMAND_OK; + PGresult *result = remote_connection_exec(conn, cmd.data); + success = (PQresultStatus(result) == PGRES_COMMAND_OK); PQclear(result); - pfree(set_timezone_cmd); - free(conn->tz_name); - conn->tz_name = strdup(local_tz_name); } return success; @@ -691,6 +697,7 @@ static const char *default_connection_options[] = { "SET datestyle = ISO", "SET intervalstyle = postgres", "SET extra_float_digits = 3", + "SET statement_timeout = 0", NULL, }; @@ -725,27 +732,46 @@ remote_connection_configure(TSConnection *conn) result = PQexec(conn->pg_conn, sql.data); success = PQresultStatus(result) == PGRES_COMMAND_OK; PQclear(result); + pfree(sql.data); return success; } +static void +connection_memcxt_reset_cb(void *arg) +{ + TSConnection *conn = arg; + + conn->mcxt_cb_invoked = true; + + /* Close the connection and free all attached resources, unless already + * closed explicitly before being freed. */ + if (conn->pg_conn != NULL) + PQfinish(conn->pg_conn); +} + +/* + * Create a new connection. + * + * The returned connection object is allocated on the current memory context + * and is tied to its life-cycle. The connection object includes natively + * allocated memory from libpq (via malloc) which will be freed via callbacks + * when the main memory context is freed. + */ static TSConnection * remote_connection_create(PGconn *pg_conn, bool processing, const char *node_name) { - TSConnection *conn = malloc(sizeof(TSConnection)); + MemoryContext mcxt = + AllocSetContextCreate(CurrentMemoryContext, "TSConnection", ALLOCSET_SMALL_SIZES); + TSConnection *conn = MemoryContextAllocZero(mcxt, sizeof(TSConnection)); int ret; - if (NULL == conn) - return NULL; - - MemSet(conn, 0, sizeof(TSConnection)); - /* Must register the event procedure before attaching any instance data */ ret = PQregisterEventProc(pg_conn, eventproc, "remote connection", conn); if (ret == 0) { - free(conn); + MemoryContextDelete(mcxt); return NULL; } @@ -754,45 +780,29 @@ remote_connection_create(PGconn *pg_conn, bool processing, const char *node_name conn->ln.next = conn->ln.prev = NULL; conn->pg_conn = pg_conn; - conn->closing_guard = false; remote_connection_set_status(conn, processing ? CONN_PROCESSING : CONN_IDLE); namestrcpy(&conn->node_name, node_name); - conn->tz_name = NULL; - conn->autoclose = true; - conn->subtxid = GetCurrentSubTransactionId(); + conn->tz_name[0] = '\0'; conn->xact_depth = 0; conn->xact_transitioning = false; /* Initialize results head */ conn->results.next = &conn->results; conn->results.prev = &conn->results; conn->binary_copy = false; - list_insert_after(&conn->ln, &connections); + conn->mcxt = mcxt; + /* Register a memory context callback that will ensure the connection is + * always closed and the resources are freed */ + conn->mcxt_cb.func = connection_memcxt_reset_cb; + conn->mcxt_cb.arg = conn; + MemoryContextRegisterResetCallback(mcxt, &conn->mcxt_cb); + list_insert_after(&conn->ln, &connections); elog(DEBUG3, "created connection %p", conn); connstats.connections_created++; return conn; } -/* - * Set the auto-close behavior. - * - * If set, the connection will be closed at the end of the (sub-)transaction - * it was created on. - * - * The default value is on (true). - * - * Returns the previous setting. - */ -bool -remote_connection_set_autoclose(TSConnection *conn, bool autoclose) -{ - bool old = conn->autoclose; - - conn->autoclose = autoclose; - return old; -} - int remote_connection_xact_depth_get(const TSConnection *conn) { @@ -1727,18 +1737,9 @@ remote_connection_ping(const char *node_name) void remote_connection_close(TSConnection *conn) { - Assert(conn != NULL); - - conn->closing_guard = true; - - if (NULL != conn->pg_conn) - PQfinish(conn->pg_conn); - - /* Assert that PQfinish detached this connection from the global list of - * connections */ - Assert(IS_DETACHED_ENTRY(&conn->ln)); - - remote_connection_free(conn); + /* The PQfinish callback handler will take care of freeing the resources, + * including the TSConnection object. */ + PQfinish(conn->pg_conn); } /* @@ -1995,141 +1996,6 @@ remote_result_close(PGresult *res) PQclear(res); } -/* - * Cleanup connections and results at the end of a (sub-)transaction. - * - * This function is called at the end of transactions and sub-transactions to - * auto-cleanup connections and result objects. - */ -static void -remote_connections_cleanup(SubTransactionId subtxid, bool isabort) -{ - ListNode *curr = connections.next; - unsigned int num_connections = 0; - unsigned int num_results = 0; - - while (curr != &connections) - { - TSConnection *conn = (TSConnection *) curr; - - /* Move to next connection since closing the current one might - * otherwise make the curr pointer invalid. */ - curr = curr->next; - - if (conn->autoclose && (subtxid == InvalidSubTransactionId || subtxid == conn->subtxid)) - { - /* Closes the connection and frees all its PGresult objects */ - remote_connection_close(conn); - num_connections++; - } - else - { - /* We're not closing the connection, but we should clean up any - * lingering results */ - ListNode *curr_result = conn->results.next; - - while (curr_result != &conn->results) - { - ResultEntry *entry = (ResultEntry *) curr_result; - - curr_result = curr_result->next; - - if (subtxid == InvalidSubTransactionId || subtxid == entry->subtxid) - { - PQclear(entry->result); - num_results++; - } - } - } - } - - if (subtxid == InvalidSubTransactionId) - elog(DEBUG3, - "cleaned up %u connections and %u results at %s of transaction", - num_connections, - num_results, - isabort ? "abort" : "commit"); - else - elog(DEBUG3, - "cleaned up %u connections and %u results at %s of sub-transaction %u", - num_connections, - num_results, - isabort ? "abort" : "commit", - subtxid); -} - -static void -remote_connection_xact_end(XactEvent event, void *unused_arg) -{ - /* - * We are deep down in CommitTransaction code path. We do not want our - * emit_log_hook_callback to interfere since it uses its own transaction - */ - emit_log_hook_type prev_emit_log_hook = emit_log_hook; - emit_log_hook = NULL; - - switch (event) - { - case XACT_EVENT_ABORT: - case XACT_EVENT_PARALLEL_ABORT: - /* - * We expect that the waitpoint will be retried and then we - * will return due to the process receiving a SIGTERM if - * the advisory lock is exclusively held by a user call - */ - DEBUG_RETRY_WAITPOINT("remote_conn_xact_end"); - remote_connections_cleanup(InvalidSubTransactionId, true); - break; - case XACT_EVENT_COMMIT: - case XACT_EVENT_PARALLEL_COMMIT: - /* Same retry behavior as above */ - DEBUG_RETRY_WAITPOINT("remote_conn_xact_end"); - remote_connections_cleanup(InvalidSubTransactionId, false); - break; - case XACT_EVENT_PREPARE: - /* - * We expect that the waitpoint will be retried and then we - * will return with a warning on crossing the retry count if - * the advisory lock is exclusively held by a user call - */ - DEBUG_RETRY_WAITPOINT("remote_conn_xact_end"); - break; - default: - /* other events are too early to use DEBUG_WAITPOINT.. */ - break; - } - - /* re-enable the emit_log_hook */ - emit_log_hook = prev_emit_log_hook; -} - -static void -remote_connection_subxact_end(SubXactEvent event, SubTransactionId subtxid, - SubTransactionId parent_subtxid, void *unused_arg) -{ - /* - * We are deep down in CommitTransaction code path. We do not want our - * emit_log_hook_callback to interfere since it uses its own transaction - */ - emit_log_hook_type prev_emit_log_hook = emit_log_hook; - emit_log_hook = NULL; - - switch (event) - { - case SUBXACT_EVENT_ABORT_SUB: - remote_connections_cleanup(subtxid, true); - break; - case SUBXACT_EVENT_COMMIT_SUB: - remote_connections_cleanup(subtxid, false); - break; - default: - break; - } - - /* re-enable the emit_log_hook */ - emit_log_hook = prev_emit_log_hook; -} - bool remote_connection_set_single_row_mode(TSConnection *conn) { @@ -2422,15 +2288,10 @@ remote_connection_stats_get(void) void _remote_connection_init(void) { - RegisterXactCallback(remote_connection_xact_end, NULL); - RegisterSubXactCallback(remote_connection_subxact_end, NULL); - unset_libpq_envvar(); } void _remote_connection_fini(void) { - UnregisterXactCallback(remote_connection_xact_end, NULL); - UnregisterSubXactCallback(remote_connection_subxact_end, NULL); } diff --git a/tsl/src/remote/connection.h b/tsl/src/remote/connection.h index f10906c2bd5..884406c5e68 100644 --- a/tsl/src/remote/connection.h +++ b/tsl/src/remote/connection.h @@ -67,7 +67,6 @@ extern TSConnection *remote_connection_open_by_id(TSConnectionId id); extern TSConnection *remote_connection_open(Oid server_id, Oid user_id); extern TSConnection *remote_connection_open_nothrow(Oid server_id, Oid user_id, char **errmsg); extern List *remote_connection_prepare_auth_options(const ForeignServer *server, Oid user_id); -extern bool remote_connection_set_autoclose(TSConnection *conn, bool autoclose); extern int remote_connection_xact_depth_get(const TSConnection *conn); extern int remote_connection_xact_depth_inc(TSConnection *conn); extern int remote_connection_xact_depth_dec(TSConnection *conn); diff --git a/tsl/src/remote/connection_cache.c b/tsl/src/remote/connection_cache.c index c3889c7cf4a..c148a9b075a 100644 --- a/tsl/src/remote/connection_cache.c +++ b/tsl/src/remote/connection_cache.c @@ -150,6 +150,7 @@ connection_cache_create_entry(Cache *cache, CacheQuery *query) { TSConnectionId *id = (TSConnectionId *) query->data; ConnectionCacheEntry *entry = query->result; + MemoryContext old; /* * Protects against errors in remote_connection_open, necessary since this @@ -162,11 +163,9 @@ connection_cache_create_entry(Cache *cache, CacheQuery *query) * because PGconn allocation happens using malloc. Which is why calling * remote_connection_close at cleanup is critical. */ + old = MemoryContextSwitchTo(ts_cache_memory_ctx(cache)); entry->conn = remote_connection_open_by_id(*id); - - /* Since this connection is managed by the cache, it should not auto-close - * at the end of the transaction */ - remote_connection_set_autoclose(entry->conn, false); + MemoryContextSwitchTo(old); /* Set the hash values of the foreign server and role for cache * invalidation purposes */ diff --git a/tsl/test/expected/remote_connection.out b/tsl/test/expected/remote_connection.out index 45828658d1d..ddc03afd7c4 100644 --- a/tsl/test/expected/remote_connection.out +++ b/tsl/test/expected/remote_connection.out @@ -55,12 +55,12 @@ SELECT * FROM test.get_connection_stats(); \set ON_ERROR_STOP 0 SELECT test.send_remote_query_that_generates_exception(); ERROR: XX000: bad query error thrown from test -LOCATION: ts_test_bad_remote_query, connection.c:184 +LOCATION: ts_test_bad_remote_query, connection.c:206 \set ON_ERROR_STOP 1 SELECT * FROM test.get_connection_stats(); connections_created | connections_closed | results_created | results_cleared ---------------------+--------------------+-----------------+----------------- - 1 | 1 | 7 | 7 + 1 | 1 | 8 | 8 (1 row) SELECT test.remote_connection_tests(); diff --git a/tsl/test/src/remote/connection.c b/tsl/test/src/remote/connection.c index 57dd452822a..28443efc182 100644 --- a/tsl/test/src/remote/connection.c +++ b/tsl/test/src/remote/connection.c @@ -107,14 +107,21 @@ test_simple_queries() static void test_connection_and_result_leaks() { - TSConnection *conn, *subconn; + TSConnection *conn, *subconn, *subconn2; PGresult *res; RemoteConnectionStats *stats; + MemoryContext old; + MemoryContext mcxt; + mcxt = AllocSetContextCreate(CurrentMemoryContext, "test", ALLOCSET_SMALL_SIZES); + + old = MemoryContextSwitchTo(mcxt); stats = remote_connection_stats_get(); remote_connection_stats_reset(); conn = get_connection(); + ASSERT_NUM_OPEN_CONNECTIONS(stats, 1); + res = remote_connection_exec(conn, "SELECT 1"); remote_connection_close(conn); @@ -127,6 +134,8 @@ test_connection_and_result_leaks() BeginInternalSubTransaction("conn leak test"); + /* This connection is created on the subtransaction memory context */ + Assert(CurrentMemoryContext == CurTransactionContext); subconn = get_connection(); ASSERT_NUM_OPEN_CONNECTIONS(stats, 2); @@ -135,7 +144,9 @@ test_connection_and_result_leaks() BeginInternalSubTransaction("conn leak test 2"); - res = remote_connection_exec(subconn, "SELECT 1"); + subconn2 = get_connection(); + ASSERT_NUM_OPEN_CONNECTIONS(stats, 3); + res = remote_connection_exec(subconn2, "SELECT 1"); ASSERT_NUM_OPEN_RESULTS(stats, 2); /* Explicitly close one result */ @@ -149,20 +160,31 @@ test_connection_and_result_leaks() ASSERT_NUM_OPEN_RESULTS(stats, 3); RollbackAndReleaseCurrentSubTransaction(); - - /* Rollback should have cleared the two results created in the - * sub-transaction, but not the one created before the sub-transaction */ - ASSERT_NUM_OPEN_RESULTS(stats, 1); + /* The connection created in the rolled back transaction should be + * destroyed */ + ASSERT_NUM_OPEN_CONNECTIONS(stats, 2); remote_connection_exec(subconn, "SELECT 1"); - ASSERT_NUM_OPEN_RESULTS(stats, 2); + ASSERT_NUM_OPEN_RESULTS(stats, 4); + /* Note that releasing/committing the subtransaction does not delete the memory */ ReleaseCurrentSubTransaction(); - /* Should only leave the original connection created before the first - * sub-transaction, but no results */ + /* The original and subconn still exists */ + ASSERT_NUM_OPEN_CONNECTIONS(stats, 2); + ASSERT_NUM_OPEN_RESULTS(stats, 4); + + remote_connection_exec(conn, "SELECT 1"); + ASSERT_NUM_OPEN_RESULTS(stats, 5); + + MemoryContextSwitchTo(old); + MemoryContextDelete(mcxt); + + /* Original connection should be cleaned up along with its 3 results. The + * subconn object was created on a subtransaction memory context that will + * be cleared when the main transaction ends. */ ASSERT_NUM_OPEN_CONNECTIONS(stats, 1); - ASSERT_NUM_OPEN_RESULTS(stats, 0); + ASSERT_NUM_OPEN_RESULTS(stats, 2); remote_connection_stats_reset(); }