diff --git a/src/cache.h b/src/cache.h index e80cf2e119f..0d676fc033d 100644 --- a/src/cache.h +++ b/src/cache.h @@ -65,9 +65,7 @@ extern TSDLLEXPORT void ts_cache_init(Cache *cache); extern TSDLLEXPORT void ts_cache_invalidate(Cache *cache); extern TSDLLEXPORT void *ts_cache_fetch(Cache *cache, CacheQuery *query); extern TSDLLEXPORT bool ts_cache_remove(Cache *cache, void *key); - -extern MemoryContext ts_cache_memory_ctx(Cache *cache); - +extern TSDLLEXPORT MemoryContext ts_cache_memory_ctx(Cache *cache); extern TSDLLEXPORT Cache *ts_cache_pin(Cache *cache); extern TSDLLEXPORT int ts_cache_release(Cache *cache); diff --git a/tsl/src/data_node.c b/tsl/src/data_node.c index f8faaba4dfe..68503125dae 100644 --- a/tsl/src/data_node.c +++ b/tsl/src/data_node.c @@ -510,17 +510,19 @@ data_node_bootstrap_extension(TSConnection *conn) quote_literal_cstr(EXTENSION_NAME)); if (PQresultStatus(res) != PGRES_TUPLES_OK) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res)))); + remote_result_elog(res, ERROR); if (PQntuples(res) == 0) { + remote_result_close(res); + if (schema_oid != PG_PUBLIC_NAMESPACE) { - PGresult *res = remote_connection_execf(conn, - "CREATE SCHEMA %s AUTHORIZATION %s", - schema_name_quoted, - quote_identifier(username)); + res = remote_connection_execf(conn, + "CREATE SCHEMA %s AUTHORIZATION %s", + schema_name_quoted, + quote_identifier(username)); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { const char *const sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); @@ -528,6 +530,8 @@ data_node_bootstrap_extension(TSConnection *conn) (sqlstate && strcmp(sqlstate, ERRCODE_DUPLICATE_SCHEMA_STR) == 0); if (!schema_exists) remote_result_elog(res, ERROR); + + remote_result_close(res); /* If the schema already existed on the remote node, we got a * duplicate schema error and the schema was not created. In * that case, we log an error with a hint on how to fix the @@ -538,6 +542,8 @@ data_node_bootstrap_extension(TSConnection *conn) errhint("Make sure that the data node does not contain any " "existing objects prior to adding it."))); } + + remote_result_close(res); } remote_connection_cmdf_ok(conn, @@ -556,6 +562,7 @@ data_node_bootstrap_extension(TSConnection *conn) PQhost(remote_connection_get_pg_conn(conn)), PQport(remote_connection_get_pg_conn(conn)), PQgetvalue(res, 0, 1)))); + remote_result_close(res); data_node_validate_extension(conn); return false; } @@ -635,7 +642,8 @@ data_node_validate_extension_availability(TSConnection *conn) if (PQresultStatus(res) != PGRES_TUPLES_OK) ereport(ERROR, - (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res)))); + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("failed to validate remote extension: %s", PQresultErrorMessage(res)))); if (PQntuples(res) == 0) ereport(ERROR, @@ -1450,6 +1458,7 @@ create_alter_data_node_tuple(TupleDesc tupdesc, const char *node_name, List *opt MemSet(nulls, false, sizeof(nulls)); values[AttrNumberGetAttrOffset(Anum_alter_data_node_node_name)] = CStringGetDatum(node_name); + values[AttrNumberGetAttrOffset(Anum_alter_data_node_available)] = BoolGetDatum(true); foreach (lc, options) { diff --git a/tsl/src/remote/async.c b/tsl/src/remote/async.c index e46d14ddb35..d840cd4da30 100644 --- a/tsl/src/remote/async.c +++ b/tsl/src/remote/async.c @@ -152,6 +152,8 @@ async_request_set_state(AsyncRequest *req, AsyncRequestState new_state) static AsyncRequest * async_request_send_internal(AsyncRequest *req, int elevel) { + int ret = 0; + if (req->state != DEFERRED) elog(elevel, "can't send async request in state \"%d\"", req->state); @@ -170,39 +172,30 @@ async_request_send_internal(AsyncRequest *req, int elevel) * the prepared statements we use in this module are simple enough that * the data node will make the right choices. */ - if (0 == PQsendPrepare(remote_connection_get_pg_conn(req->conn), - req->stmt_name, - req->sql, - req->prep_stmt_params, - NULL)) - { - /* - * null is fine to pass down as the res, the connection error message - * will get through - */ - remote_connection_elog(req->conn, elevel); - return NULL; - } + ret = PQsendPrepare(remote_connection_get_pg_conn(req->conn), + req->stmt_name, + req->sql, + req->prep_stmt_params, + NULL); } else { - if (0 == PQsendQueryParams(remote_connection_get_pg_conn(req->conn), - req->sql, - stmt_params_total_values(req->params), - /* param types - see note above */ NULL, - stmt_params_values(req->params), - stmt_params_lengths(req->params), - stmt_params_formats(req->params), - req->res_format)) - { - /* - * null is fine to pass down as the res, the connection error message - * will get through - */ - remote_connection_elog(req->conn, elevel); - return NULL; - } + ret = PQsendQueryParams(remote_connection_get_pg_conn(req->conn), + req->sql, + stmt_params_total_values(req->params), + /* param types - see note above */ NULL, + stmt_params_values(req->params), + stmt_params_lengths(req->params), + stmt_params_formats(req->params), + req->res_format); + } + + if (ret == 0 || !remote_connection_flush(req->conn, NULL)) + { + remote_connection_elog(req->conn, elevel); + return NULL; } + async_request_set_state(req, EXECUTING); remote_connection_set_status(req->conn, CONN_PROCESSING); return req; @@ -684,7 +677,6 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time) ListCell *lc; int rc; WaitEvent event; - uint32 wait_event_info = PG_WAIT_EXTENSION; AsyncRequest *wait_req; AsyncResponse *result; long timeout_ms = -1L; @@ -723,7 +715,7 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time) while (true) { wait_req = NULL; - rc = WaitEventSetWait(we_set, timeout_ms, &event, 1, wait_event_info); + rc = WaitEventSetWait(we_set, timeout_ms, &event, 1, PG_WAIT_EXTENSION); if (rc == 0) { @@ -731,8 +723,6 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time) break; } - CHECK_FOR_INTERRUPTS(); - if (event.events & ~(WL_SOCKET_READABLE | WL_LATCH_SET)) { /* @@ -748,6 +738,7 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time) if (event.events & WL_LATCH_SET) { ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); } if (event.events & WL_SOCKET_READABLE) diff --git a/tsl/src/remote/connection.c b/tsl/src/remote/connection.c index 19be6c5f220..920434d152d 100644 --- a/tsl/src/remote/connection.c +++ b/tsl/src/remote/connection.c @@ -11,6 +11,7 @@ * the PostgreSQL License. */ #include +#include #include #include #include @@ -19,15 +20,19 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include #include +#include +#include #include #include @@ -41,6 +46,7 @@ #include "connection.h" #include "data_node.h" #include "debug_point.h" +#include "stmt_params.h" #include "utils.h" #include "ts_catalog/metadata.h" #include "config.h" @@ -51,10 +57,21 @@ * This library file contains convenience functionality around the libpq * API. The major additional functionality offered includes: * - * - libpq object lifecycles are tied to transactions (connections and - * results). This ensures that there are no memory leaks caused by libpq - * objects after a transaction completes. - * - connection configuration suitable for TimescaleDB. + * - Lifecycle management: a connection is tied to the memory context it is + * created in and result objects are tied to the connection they are created + * from. The aim is to avoid memory leaks of libpq objects that aren't + * allocated on a PostgreSQL memory context. + * + * - Connection configuration suitable for TimescaleDB, e.g., ensuring the + data nodes use the same time zone as the access node. + * + * - Integration with PostgreSQL signal handlers to ensure remote transactions + * are properly interrupted by signals (like ctrl-C or statement_timeout). + * + * - Non-blocking operation. Sockets operate in non-blocking mode by default + * and connection functions try to never make a blocking libpq call without + * checking/waiting for read- or write-readiness. Again, this is to + * integrate with PostgreSQL's interrupt/signal handling. * * NOTE that it is strongly adviced that connection-related functions do not * throw exceptions with, e.g., elog(ERROR). While exceptions can be caught @@ -68,8 +85,8 @@ * able to proceed even if the node is no longer available to respond to a * connection. Another example is performing a liveness check for node status. * - * Therefore, it is best that defer throwing exceptions to high-level - * functions that know when it is appropriate. + * Therefore, it is best to defer throwing exceptions to high-level functions + * that know when it is appropriate. */ /* for assigning cursor numbers and prepared statement numbers */ @@ -144,29 +161,29 @@ list_insert_after(ListNode *entry, ListNode *prev) */ typedef struct ResultEntry { - struct ListNode ln; /* Must be first entry */ - TSConnection *conn; /* The connection the result was created on */ - SubTransactionId subtxid; /* The subtransaction ID that created this result, if any. */ + struct ListNode ln; /* Must be first entry */ + TSConnection *conn; /* The connection the result was created on */ PGresult *result; } ResultEntry; typedef struct TSConnection { - ListNode ln; /* Must be first entry */ - PGconn *pg_conn; /* PostgreSQL connection */ - bool closing_guard; /* Guard against calling PQfinish() directly on PGconn */ + ListNode ln; /* Must be first entry */ + PGconn *pg_conn; /* PostgreSQL connection */ TSConnectionStatus status; - NameData node_name; /* Associated data node name */ - char *tz_name; /* Timezone name last sent over connection */ - bool autoclose; /* Set if this connection should automatically - * close at the end of the (sub-)transaction */ - SubTransactionId subtxid; /* The subtransaction ID that created this connection, if any. */ - int xact_depth; /* 0 => no transaction, 1 => main transaction, > 1 => - * levels of subtransactions */ - bool xact_transitioning; /* TRUE if connection is transitioning to - * another transaction state */ - ListNode results; /* Head of PGresult list */ + NameData node_name; /* Associated data node name */ + char tz_name[TZ_STRLEN_MAX + 1]; /* Timezone name last sent over connection */ + int xact_depth; /* 0 => no transaction, 1 => main transaction, > 1 => + * levels of subtransactions */ + bool xact_transitioning; /* TRUE if connection is transitioning to + * another transaction state */ + ListNode results; /* Head of PGresult list */ bool binary_copy; + MemoryContext mcxt; + MemoryContextCallback mcxt_cb; + bool mcxt_cb_invoked; + WaitEventSet *wes; + int sockeventpos; } TSConnection; /* @@ -345,15 +362,6 @@ fill_result_error(TSConnectionError *err, int errcode, const char *errmsg, const #define EVENTPROC_FAILURE 0 #define EVENTPROC_SUCCESS 1 -static void -remote_connection_free(TSConnection *conn) -{ - if (NULL != conn->tz_name) - free(conn->tz_name); - - free(conn); -} - /* * Invoked on PQfinish(conn). Frees all PGresult objects created on the * connection, apart from those already freed with PQclear(). @@ -366,7 +374,6 @@ handle_conn_destroy(PGEventConnDestroy *event) ListNode *curr; Assert(NULL != conn); - Assert(conn->closing_guard); curr = conn->results.next; @@ -382,20 +389,20 @@ handle_conn_destroy(PGEventConnDestroy *event) results_count++; } - conn->pg_conn = NULL; - list_detach(&conn->ln); - if (results_count > 0) elog(DEBUG3, "cleared %u result objects on connection %p", results_count, conn); connstats.connections_closed++; - if (!conn->closing_guard) - { - ereport(WARNING, - (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("invalid closing of connection"))); - remote_connection_free(conn); - } + conn->pg_conn = NULL; + list_detach(&conn->ln); + + FreeWaitEventSet(conn->wes); + + /* No need to delete the memory context here if handler was invoked by the + * MemoryContextDelete callback */ + if (!conn->mcxt_cb_invoked) + MemoryContextDelete(conn->mcxt); return EVENTPROC_SUCCESS; } @@ -411,29 +418,19 @@ handle_result_create(PGEventResultCreate *event) ResultEntry *entry; Assert(NULL != conn); - - /* We malloc this (instead of palloc) since bound PGresult, which also - * lives outside PostgreSQL's memory management. */ - entry = malloc(sizeof(ResultEntry)); + entry = MemoryContextAllocZero(conn->mcxt, sizeof(ResultEntry)); if (NULL == entry) return EVENTPROC_FAILURE; - MemSet(entry, 0, sizeof(ResultEntry)); entry->ln.next = entry->ln.prev = NULL; entry->conn = conn; entry->result = event->result; - entry->subtxid = GetCurrentSubTransactionId(); - /* Add entry as new head and set instance data */ list_insert_after(&entry->ln, &conn->results); PQresultSetInstanceData(event->result, eventproc, entry); - elog(DEBUG3, - "created result %p on connection %p subtxid %u", - event->result, - conn, - entry->subtxid); + elog(DEBUG3, "created result %p on connection %p", event->result, conn); connstats.results_created++; @@ -454,9 +451,9 @@ handle_result_destroy(PGEventResultDestroy *event) /* Detach entry */ list_detach(&entry->ln); - elog(DEBUG3, "destroyed result %p for subtxnid %u", entry->result, entry->subtxid); + elog(DEBUG3, "destroyed result %p", entry->result); - free(entry); + pfree(entry); connstats.results_cleared++; @@ -486,6 +483,10 @@ eventproc(PGEventId eventid, void *eventinfo, void *data) case PGEVT_RESULTDESTROY: res = handle_result_destroy((PGEventResultDestroy *) eventinfo); break; + case PGEVT_RESULTCOPY: + /* Not used in the code, so not handled */ + Assert(false); + break; default: /* Not of interest, so return success */ break; @@ -623,6 +624,35 @@ extract_connection_options(List *defelems, const char **keywords, const char **v return option_pos; } +static bool +get_update_conn_cmd(TSConnection *conn, StringInfo cmdbuf) +{ + const char *local_tz_name = pg_get_timezone_name(session_timezone); + + initStringInfo(cmdbuf); + + /* + * We need to enforce the same timezone setting across nodes. Otherwise, + * we might get the wrong result when we push down things like + * date_trunc(text, timestamptz). To safely do that, we also need the + * timezone databases to be the same on all data nodes. + * + * We save away the timezone name so that we know what we last sent over + * the connection. If the time zone changed since last time we sent a + * command, we will send a SET TIMEZONE command with the new timezone + * first. + */ + if (conn->tz_name[0] == '\0' || + (local_tz_name && pg_strcasecmp(conn->tz_name, local_tz_name) != 0)) + { + strncpy(conn->tz_name, local_tz_name, TZ_STRLEN_MAX); + appendStringInfo(cmdbuf, "SET TIMEZONE = '%s'", local_tz_name); + return true; + } + + return false; +} + /* * Internal connection configure. * @@ -630,18 +660,17 @@ extract_connection_options(List *defelems, const char **keywords, const char **v * changed. It is used to pass on configuration settings before executing a * command requested by module users. * - * ATTENTION! This function should *not* use - * `remote_connection_exec_ok_command` since this function is called - * indirectly whenever a remote command is executed, which would lead to - * infinite recursion. Stick to `PQ*` functions. - * * Returns true if the current configuration is OK (no change) or was * successfully applied, otherwise false. */ bool remote_connection_configure_if_changed(TSConnection *conn) { - const char *local_tz_name = pg_get_timezone_name(session_timezone); + StringInfoData cmd = { + .data = NULL, + .len = 0, + .maxlen = 0, + }; bool success = true; /* @@ -655,17 +684,11 @@ remote_connection_configure_if_changed(TSConnection *conn) * command, we will send a SET TIMEZONE command with the new timezone * first. */ - if (conn->tz_name == NULL || - (local_tz_name && pg_strcasecmp(conn->tz_name, local_tz_name) != 0)) + if (get_update_conn_cmd(conn, &cmd)) { - char *set_timezone_cmd = psprintf("SET TIMEZONE = '%s'", local_tz_name); - PGresult *result = PQexec(conn->pg_conn, set_timezone_cmd); - - success = PQresultStatus(result) == PGRES_COMMAND_OK; + PGresult *result = remote_connection_exec(conn, cmd.data); + success = (PQresultStatus(result) == PGRES_COMMAND_OK); PQclear(result); - pfree(set_timezone_cmd); - free(conn->tz_name); - conn->tz_name = strdup(local_tz_name); } return success; @@ -722,30 +745,47 @@ remote_connection_configure(TSConnection *conn) i++; } - result = PQexec(conn->pg_conn, sql.data); + result = remote_connection_exec(conn, sql.data); success = PQresultStatus(result) == PGRES_COMMAND_OK; PQclear(result); return success; } +static void +connection_memcxt_reset_cb(void *arg) +{ + TSConnection *conn = arg; + + conn->mcxt_cb_invoked = true; + + /* Close the connection and free all attached resources, unless already + * closed explicitly before being freed. */ + if (conn->pg_conn != NULL) + PQfinish(conn->pg_conn); +} + +/* + * Create a new connection. + * + * The connection object is allocated on the in + * order to match the lifetime and memory management of the libpq connection + * it wraps. + */ static TSConnection * remote_connection_create(PGconn *pg_conn, bool processing, const char *node_name) { - TSConnection *conn = malloc(sizeof(TSConnection)); + MemoryContext mcxt = + AllocSetContextCreate(CurrentMemoryContext, "TSConnection", ALLOCSET_SMALL_SIZES); + TSConnection *conn = MemoryContextAllocZero(mcxt, sizeof(TSConnection)); int ret; - if (NULL == conn) - return NULL; - - MemSet(conn, 0, sizeof(TSConnection)); - /* Must register the event procedure before attaching any instance data */ ret = PQregisterEventProc(pg_conn, eventproc, "remote connection", conn); if (ret == 0) { - free(conn); + MemoryContextDelete(mcxt); return NULL; } @@ -754,45 +794,34 @@ remote_connection_create(PGconn *pg_conn, bool processing, const char *node_name conn->ln.next = conn->ln.prev = NULL; conn->pg_conn = pg_conn; - conn->closing_guard = false; remote_connection_set_status(conn, processing ? CONN_PROCESSING : CONN_IDLE); namestrcpy(&conn->node_name, node_name); - conn->tz_name = NULL; - conn->autoclose = true; - conn->subtxid = GetCurrentSubTransactionId(); + conn->tz_name[0] = '\0'; conn->xact_depth = 0; conn->xact_transitioning = false; /* Initialize results head */ conn->results.next = &conn->results; conn->results.prev = &conn->results; conn->binary_copy = false; + conn->mcxt = mcxt; + conn->wes = CreateWaitEventSet(mcxt, 3); + AddWaitEventToSet(conn->wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); + AddWaitEventToSet(conn->wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); + conn->sockeventpos = + AddWaitEventToSet(conn->wes, WL_SOCKET_WRITEABLE, PQsocket(conn->pg_conn), NULL, NULL); + + /* Register a memory context callback that will ensure the connection is + * always closed and the resources are freed */ + conn->mcxt_cb.func = connection_memcxt_reset_cb; + conn->mcxt_cb.arg = conn; + MemoryContextRegisterResetCallback(mcxt, &conn->mcxt_cb); list_insert_after(&conn->ln, &connections); - elog(DEBUG3, "created connection %p", conn); connstats.connections_created++; return conn; } -/* - * Set the auto-close behavior. - * - * If set, the connection will be closed at the end of the (sub-)transaction - * it was created on. - * - * The default value is on (true). - * - * Returns the previous setting. - */ -bool -remote_connection_set_autoclose(TSConnection *conn, bool autoclose) -{ - bool old = conn->autoclose; - - conn->autoclose = autoclose; - return old; -} - int remote_connection_xact_depth_get(const TSConnection *conn) { @@ -853,9 +882,6 @@ remote_connection_set_status(TSConnection *conn, TSConnectionStatus status) { Assert(conn != NULL); conn->status = status; - - /* Should be blocking except when doing COPY. */ - Assert(PQisnonblocking(conn->pg_conn) == (conn->status == CONN_COPY_IN)); } TSConnectionStatus @@ -890,30 +916,224 @@ remote_connection_get_result_error(const PGresult *res, TSConnectionError *err) fill_result_error(err, ERRCODE_CONNECTION_EXCEPTION, "", res); } +static bool +wait_and_consume_input(const TSConnection *conn) +{ + WaitEvent event; + + ModifyWaitEvent(conn->wes, conn->sockeventpos, WL_SOCKET_READABLE, NULL); + WaitEventSetWait(conn->wes, -1, &event, 1, PG_WAIT_EXTENSION); + + if (event.events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + if (event.events & WL_SOCKET_READABLE) + { + /* Consume input and try again.*/ + int ret = PQconsumeInput(conn->pg_conn); + + if (ret == 0) + { + return false; + } + else + { + Assert(ret == 1); + } + } + + return true; +} + +PGresult * +remote_connection_get_result(const TSConnection *conn) +{ + PGresult *pgres = NULL; + int ret = 1; + + do + { + ret = PQisBusy(conn->pg_conn); + + switch (ret) + { + case 1: /* PQgetResult would block */ + if (!wait_and_consume_input(conn)) + { + remote_connection_elog(conn, WARNING); + } + break; + case 0: /* PQgetResult would not block */ + pgres = PQgetResult(conn->pg_conn); + break; + default: + pg_unreachable(); + break; + } + } while (ret != 0); + + return pgres; +} + +bool +remote_connection_flush(const TSConnection *conn, TSConnectionError *err) +{ + int ret = 1; + + do + { + ret = PQflush(conn->pg_conn); + + if (ret == 1) + { + WaitEvent event; + ModifyWaitEvent(conn->wes, + conn->sockeventpos, + WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE, + NULL); + WaitEventSetWait(conn->wes, -1, &event, 1, PG_WAIT_EXTENSION); + + if (event.events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (event.events & WL_SOCKET_READABLE) + { + int consumed = PQconsumeInput(conn->pg_conn); + + /* PqconsumeInput() normally returns 1, but 0 on error. */ + if (consumed == 0) + { + fill_connection_error(err, + ERRCODE_CONNECTION_EXCEPTION, + "could not consume data on connection", + conn); + return false; + } + } + + if (event.events & WL_SOCKET_WRITEABLE) + { + /* Try PQflush() again */ + } + } + else if (ret == -1) + { + fill_connection_error(err, + ERRCODE_CONNECTION_EXCEPTION, + "could not flush data on connection", + conn); + } + } while (ret == 1); + + return ret == 0; +} + /* * Execute a remote command. * - * Like PQexec, which this functions uses internally, the PGresult returned - * describes only the last command executed in a multi-command string. + * The execution blocks until a result is received or a failure occurs. Unlike + * PQexec(), however, this function handles PostgreSQL interrupts (e.g., the + * user cancels the query). Like PQexec(), the PGresult returned describes + * only the last command executed in a multi-command string. */ PGresult * -remote_connection_exec(TSConnection *conn, const char *cmd) +remote_connection_exec_params(TSConnection *conn, const char *cmd, StmtParams *params, bool binary, + bool single_row_mode) { - PGresult *res; + WaitEvent event; + PGresult *res = NULL; + int ret = 0; + size_t cmdlen = strlen(cmd); + StringInfoData cmd_buf = { + .data = (char *) cmd, + .len = cmdlen, + .maxlen = cmdlen + 1, + }; - if (!remote_connection_configure_if_changed(conn)) + if (get_update_conn_cmd(conn, &cmd_buf)) { - res = PQmakeEmptyPGresult(conn->pg_conn, PGRES_FATAL_ERROR); - PQfireResultCreateEvents(conn->pg_conn, res); - return res; + appendStringInfo(&cmd_buf, ";%s", cmd); } - res = PQexec(conn->pg_conn, cmd); + ModifyWaitEvent(conn->wes, conn->sockeventpos, WL_SOCKET_WRITEABLE, NULL); + bool done = false; + + do + { + /* Wait for writable socket in outer loop */ + WaitEventSetWait(conn->wes, 0, &event, 1, PG_WAIT_EXTENSION); + + if (event.events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + if (event.events & WL_SOCKET_WRITEABLE) + { + PGresult *curr_res; + + if (params) + { + ret = PQsendQueryParams(conn->pg_conn, + cmd, + stmt_params_total_values(params), + /* param types */ NULL, + stmt_params_values(params), + stmt_params_lengths(params), + stmt_params_formats(params), + binary ? 1 : 0); + } + else + { + ret = PQsendQuery(conn->pg_conn, cmd); + } + + if (ret == 0 || !remote_connection_flush(conn, NULL)) + { + res = PQmakeEmptyPGresult(conn->pg_conn, PGRES_FATAL_ERROR); + PQfireResultCreateEvents(conn->pg_conn, res); + return res; + } + + if (single_row_mode && !remote_connection_set_single_row_mode(conn)) + { + res = PQmakeEmptyPGresult(conn->pg_conn, PGRES_FATAL_ERROR); + PQfireResultCreateEvents(conn->pg_conn, res); + return res; + } + + /* Command sent, so now wait for readable result in inner loop */ + do + { + curr_res = remote_connection_get_result(conn); + + if (curr_res == NULL) + done = true; + else + { + if (PQresultStatus(curr_res) == PGRES_COPY_IN || + PQresultStatus(curr_res) == PGRES_COPY_OUT || + PQresultStatus(curr_res) == PGRES_COPY_BOTH || + PQstatus(conn->pg_conn) == CONNECTION_BAD) + done = true; + else if (res != NULL) + PQclear(res); + + res = curr_res; + } + } while (!done); + } + } while (!done); /* * Workaround for the libpq disconnect case. * - * libpq disconnect will create an result object without creating + * libpq disconnect will create an empty result object without generating * events, which is usually done for a regular errors. * * In order to be compatible with our error handling code, force @@ -922,18 +1142,21 @@ remote_connection_exec(TSConnection *conn, const char *cmd) */ if (res) { - ExecStatusType status = PQresultStatus(res); ResultEntry *entry = PQresultInstanceData(res, eventproc); - if (status == PGRES_FATAL_ERROR && entry == NULL) - { - res = PQmakeEmptyPGresult(conn->pg_conn, PGRES_FATAL_ERROR); + if (entry == NULL) PQfireResultCreateEvents(conn->pg_conn, res); - } } + return res; } +PGresult * +remote_connection_exec(TSConnection *conn, const char *cmd) +{ + return remote_connection_exec_params(conn, cmd, NULL, false, false); +} + /* * Must be a macro since va_start() must be called in the function that takes * a variable number of arguments. @@ -1062,9 +1285,6 @@ remote_connection_check_extension(TSConnection *conn) "SELECT extversion FROM pg_extension WHERE extname = %s", quote_literal_cstr(EXTENSION_NAME)); - /* Just to capture any bugs in the SELECT above */ - Assert(PQnfields(res) == 1); - switch (PQntuples(res)) { case 0: /* extension does not exists */ @@ -1365,17 +1585,20 @@ TSConnection * remote_connection_open_with_options_nothrow(const char *node_name, List *connection_options, char **errmsg) { - PGconn *volatile pg_conn = NULL; - TSConnection *ts_conn; + PGconn *pg_conn = NULL; + TSConnection *ts_conn = NULL; const char **keywords; const char **values; + WaitEventSet *wes; + WaitEvent event; + int sockeventpos; + PostgresPollingStatusType status; if (NULL != errmsg) *errmsg = NULL; setup_full_connection_options(connection_options, &keywords, &values); - - pg_conn = PQconnectdbParams(keywords, values, 0 /* Do not expand dbname param */); + pg_conn = PQconnectStartParams(keywords, values, 0 /* Do not expand dbname param */); /* Cast to (char **) to silence warning with MSVC compiler */ pfree((char **) keywords); @@ -1384,16 +1607,75 @@ remote_connection_open_with_options_nothrow(const char *node_name, List *connect if (NULL == pg_conn) return NULL; - if (PQstatus(pg_conn) != CONNECTION_OK) + if (PQstatus(pg_conn) == CONNECTION_BAD) { finish_connection(pg_conn, errmsg); return NULL; } - ts_conn = remote_connection_create(pg_conn, false, node_name); + wes = CreateWaitEventSet(CurrentMemoryContext, 3); + AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); + AddWaitEventToSet(wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); + sockeventpos = AddWaitEventToSet(wes, WL_SOCKET_WRITEABLE, PQsocket(pg_conn), NULL, NULL); - if (NULL == ts_conn) + status = PGRES_POLLING_WRITING; + + do + { + int io_flag; + + if (status == PGRES_POLLING_READING) + io_flag = WL_SOCKET_READABLE; +#ifdef WIN32 + /* Windows needs a different test while waiting for connection-made */ + else if (PQstatus(pg_conn) == CONNECTION_STARTED) + io_flag = WL_SOCKET_CONNECTED; +#endif + else + io_flag = WL_SOCKET_WRITEABLE; + + ModifyWaitEvent(wes, sockeventpos, io_flag, NULL); + WaitEventSetWait(wes, -1, &event, 1, PG_WAIT_EXTENSION); + + if (event.events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (event.events & io_flag) + { + /* + * PQconnetPoll() is supposed to be non-blocking, but it + * isn't. PQconnectPoll() will internally try to send a startup + * packet and it can block when reading the response. So, if there + * is a network issue (e.g., black hole routing) the connection + * attempt will hang on PQconnectPoll(). There's nothing that can + * be done about it, unless libpq is fixed. */ + status = PQconnectPoll(pg_conn); + } + } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); + + FreeWaitEventSet(wes); + + if (PQstatus(pg_conn) != CONNECTION_OK) + { finish_connection(pg_conn, errmsg); + return NULL; + } + + /* Switch the connection into nonblocking mode */ + if (PQsetnonblocking(pg_conn, 1) != 0) + { + finish_connection(pg_conn, errmsg); + } + else + { + ts_conn = remote_connection_create(pg_conn, false, node_name); + + if (NULL == ts_conn) + finish_connection(pg_conn, errmsg); + } return ts_conn; } @@ -1710,13 +1992,8 @@ remote_connection_ping(const char *node_name) if (PQstatus(conn->pg_conn) == CONNECTION_OK) { - if (1 == PQsendQuery(conn->pg_conn, PING_QUERY)) - { - PGresult *res = PQgetResult(conn->pg_conn); - - success = (PQresultStatus(res) == PGRES_TUPLES_OK); - PQclear(res); - } + PGresult *res = remote_connection_exec(conn, PING_QUERY); + success = (PQresultStatus(res) == PGRES_TUPLES_OK); } remote_connection_close(conn); @@ -1727,18 +2004,9 @@ remote_connection_ping(const char *node_name) void remote_connection_close(TSConnection *conn) { - Assert(conn != NULL); - - conn->closing_guard = true; - - if (NULL != conn->pg_conn) - PQfinish(conn->pg_conn); - - /* Assert that PQfinish detached this connection from the global list of - * connections */ - Assert(IS_DETACHED_ENTRY(&conn->ln)); - - remote_connection_free(conn); + /* The PQfinish callback handler will take care of freeing the resources, + * including the TSConnection object. */ + PQfinish(conn->pg_conn); } /* @@ -1809,17 +2077,19 @@ remote_connection_drain(TSConnection *conn, TimestampTz endtime, PGresult **resu /* In what follows, do not leak any PGresults on an error. */ PG_TRY(); { + ModifyWaitEvent(conn->wes, conn->sockeventpos, WL_SOCKET_READABLE, NULL); + for (;;) { PGresult *res; while (PQisBusy(pg_conn)) { - int wc; TimestampTz now = GetCurrentTimestamp(); long remaining_secs; int remaining_usecs; long cur_timeout_ms; + WaitEvent event; /* If timeout has expired, give up, else get sleep time. */ if (now >= endtime) @@ -1835,25 +2105,23 @@ remote_connection_drain(TSConnection *conn, TimestampTz endtime, PGresult **resu Min(MAX_CONN_WAIT_TIMEOUT_MS, remaining_secs * USECS_PER_SEC + remaining_usecs); /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | WL_EXIT_ON_PM_DEATH | - WL_TIMEOUT, - PQsocket(pg_conn), - cur_timeout_ms, - PG_WAIT_EXTENSION); - ResetLatch(MyLatch); + WaitEventSetWait(conn->wes, cur_timeout_ms, &event, 1, PG_WAIT_EXTENSION); - CHECK_FOR_INTERRUPTS(); + if (event.events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } /* Data available in socket? */ - if ((wc & WL_SOCKET_READABLE) && (0 == PQconsumeInput(pg_conn))) + if ((event.events & WL_SOCKET_READABLE) && (0 == PQconsumeInput(pg_conn))) { connresult = CONN_DISCONNECT; goto exit; } } - res = PQgetResult(pg_conn); + res = PQgetResult(conn->pg_conn); if (res == NULL) { @@ -1865,7 +2133,7 @@ remote_connection_drain(TSConnection *conn, TimestampTz endtime, PGresult **resu else if (PQresultStatus(res) == PGRES_COPY_OUT) { /* - * We are inside the COPY subprotocol, need to sychronize with + * We are inside the COPY subprotocol, need to synchronize with * the server. */ int end_res = PQendcopy(pg_conn); @@ -1876,7 +2144,6 @@ remote_connection_drain(TSConnection *conn, TimestampTz endtime, PGresult **resu remote_connection_error_elog(&err, WARNING); } } - PQclear(last_res); last_res = res; } @@ -1911,18 +2178,42 @@ remote_connection_drain(TSConnection *conn, TimestampTz endtime, PGresult **resu return connresult; } +static bool +send_cancel(TSConnection *conn) +{ + PGcancel *cancel; + char errbuf[256]; + bool success = true; + + cancel = PQgetCancel(conn->pg_conn); + + if (cancel == NULL) + return false; + + if (PQcancel(cancel, errbuf, sizeof(errbuf)) == 0) + { + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send cancel request: %s", errbuf))); + + success = false; + } + + PQfreeCancel(cancel); + + return success; +} + /* * Cancel the currently-in-progress query and ignore the result. Returns true if we successfully * cancel the query and discard any pending result, and false if not. */ bool -remote_connection_cancel_query(TSConnection *conn) +remote_connection_cancel_query(TSConnection *conn, const char **errmsg) { - PGcancel *cancel; - char errbuf[256]; TimestampTz endtime; TSConnectionError err; - bool success; + bool volatile success = false; if (!conn) return true; @@ -1937,43 +2228,47 @@ remote_connection_cancel_query(TSConnection *conn) */ PG_TRY(); { - if (conn->status == CONN_COPY_IN && !remote_connection_end_copy(conn, &err)) + if (conn->status == CONN_COPY_IN && !remote_connection_end_copy_in(conn, &err)) remote_connection_error_elog(&err, WARNING); - /* - * If it takes too long to cancel the query and discard the result, assume - * the connection is dead. - */ - endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); - /* * Issue cancel request. Unfortunately, there's no good way to limit the * amount of time that we might block inside PQcancel(). */ - if ((cancel = PQgetCancel(conn->pg_conn))) + if (!send_cancel(conn)) { - if (!PQcancel(cancel, errbuf, sizeof(errbuf))) - { - ereport(WARNING, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not send cancel request: %s", errbuf))); - PQfreeCancel(cancel); - remote_connection_set_status(conn, CONN_IDLE); - return false; - } - PQfreeCancel(cancel); + remote_connection_set_status(conn, CONN_IDLE); + + if (errmsg != NULL) + *errmsg = "cancelation failed"; + return false; } + /* + * If it takes too long to cancel the query and discard the result, assume + * the connection is dead. + */ + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); + switch (remote_connection_drain(conn, endtime, NULL)) { case CONN_OK: /* Successfully, drained */ + success = true; + break; case CONN_NO_RESPONSE: /* No response, likely beceause there was nothing to cancel */ success = true; break; - default: + case CONN_DISCONNECT: + success = false; + if (errmsg != NULL) + *errmsg = "connection drain failed due to disconnect"; + break; + case CONN_TIMEOUT: success = false; + if (errmsg != NULL) + *errmsg = "connection drain failed due to timeout"; break; } } @@ -1995,145 +2290,65 @@ remote_result_close(PGresult *res) PQclear(res); } -/* - * Cleanup connections and results at the end of a (sub-)transaction. - * - * This function is called at the end of transactions and sub-transactions to - * auto-cleanup connections and result objects. - */ -static void -remote_connections_cleanup(SubTransactionId subtxid, bool isabort) +bool +remote_connection_set_single_row_mode(TSConnection *conn) { - ListNode *curr = connections.next; - unsigned int num_connections = 0; - unsigned int num_results = 0; + return PQsetSingleRowMode(conn->pg_conn) == 1; +} - while (curr != &connections) - { - TSConnection *conn = (TSConnection *) curr; +bool +remote_connection_put_copy_data(const TSConnection *conn, const char *buffer, size_t nbytes, + TSConnectionError *err) +{ + int res = 0; - /* Move to next connection since closing the current one might - * otherwise make the curr pointer invalid. */ - curr = curr->next; + Assert(PQisnonblocking(conn->pg_conn)); + Assert(conn->status == CONN_COPY_IN); - if (conn->autoclose && (subtxid == InvalidSubTransactionId || subtxid == conn->subtxid)) - { - /* Closes the connection and frees all its PGresult objects */ - remote_connection_close(conn); - num_connections++; - } - else - { - /* We're not closing the connection, but we should clean up any - * lingering results */ - ListNode *curr_result = conn->results.next; + do + { + res = PQputCopyData(conn->pg_conn, buffer, nbytes); - while (curr_result != &conn->results) + switch (res) + { + case 0: { - ResultEntry *entry = (ResultEntry *) curr_result; + /* Full buffers, wait... */ + WaitEvent event; - curr_result = curr_result->next; + ModifyWaitEvent(conn->wes, conn->sockeventpos, WL_SOCKET_WRITEABLE, NULL); + WaitEventSetWait(conn->wes, -1, &event, 1, PG_WAIT_EXTENSION); - if (subtxid == InvalidSubTransactionId || subtxid == entry->subtxid) + if (event.events & WL_LATCH_SET) { - PQclear(entry->result); - num_results++; + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); } + if (event.events & WL_SOCKET_WRITEABLE) + { + /* Just fall through and try again */ + } + break; } + case 1: + /* Success */ + break; + case -1: + fill_connection_error(err, + ERRCODE_CONNECTION_FAILURE, + "failed sending COPY data", + conn); + break; + default: + pg_unreachable(); + break; } - } - - if (subtxid == InvalidSubTransactionId) - elog(DEBUG3, - "cleaned up %u connections and %u results at %s of transaction", - num_connections, - num_results, - isabort ? "abort" : "commit"); - else - elog(DEBUG3, - "cleaned up %u connections and %u results at %s of sub-transaction %u", - num_connections, - num_results, - isabort ? "abort" : "commit", - subtxid); -} - -static void -remote_connection_xact_end(XactEvent event, void *unused_arg) -{ - /* - * We are deep down in CommitTransaction code path. We do not want our - * emit_log_hook_callback to interfere since it uses its own transaction - */ - emit_log_hook_type prev_emit_log_hook = emit_log_hook; - emit_log_hook = NULL; - - switch (event) - { - case XACT_EVENT_ABORT: - case XACT_EVENT_PARALLEL_ABORT: - /* - * We expect that the waitpoint will be retried and then we - * will return due to the process receiving a SIGTERM if - * the advisory lock is exclusively held by a user call - */ - DEBUG_RETRY_WAITPOINT("remote_conn_xact_end"); - remote_connections_cleanup(InvalidSubTransactionId, true); - break; - case XACT_EVENT_COMMIT: - case XACT_EVENT_PARALLEL_COMMIT: - /* Same retry behavior as above */ - DEBUG_RETRY_WAITPOINT("remote_conn_xact_end"); - remote_connections_cleanup(InvalidSubTransactionId, false); - break; - case XACT_EVENT_PREPARE: - /* - * We expect that the waitpoint will be retried and then we - * will return with a warning on crossing the retry count if - * the advisory lock is exclusively held by a user call - */ - DEBUG_RETRY_WAITPOINT("remote_conn_xact_end"); - break; - default: - /* other events are too early to use DEBUG_WAITPOINT.. */ - break; - } - - /* re-enable the emit_log_hook */ - emit_log_hook = prev_emit_log_hook; -} - -static void -remote_connection_subxact_end(SubXactEvent event, SubTransactionId subtxid, - SubTransactionId parent_subtxid, void *unused_arg) -{ - /* - * We are deep down in CommitTransaction code path. We do not want our - * emit_log_hook_callback to interfere since it uses its own transaction - */ - emit_log_hook_type prev_emit_log_hook = emit_log_hook; - emit_log_hook = NULL; + } while (res == 0); - switch (event) - { - case SUBXACT_EVENT_ABORT_SUB: - remote_connections_cleanup(subtxid, true); - break; - case SUBXACT_EVENT_COMMIT_SUB: - remote_connections_cleanup(subtxid, false); - break; - default: - break; - } + if (res != 1) + return false; - /* re-enable the emit_log_hook */ - emit_log_hook = prev_emit_log_hook; -} - -bool -remote_connection_set_single_row_mode(TSConnection *conn) -{ - return PQsetSingleRowMode(conn->pg_conn); + return remote_connection_flush(conn, err); } static bool @@ -2146,29 +2361,53 @@ send_binary_copy_header(const TSConnection *conn, TSConnectionError *err) 0, 0, 0, 0 /* 4 bytes header extension length (unused) */ }; - int res = PQputCopyData(conn->pg_conn, file_header, sizeof(file_header)); + return remote_connection_put_copy_data(conn, file_header, sizeof(file_header), err); +} - if (res != 1) - return fill_connection_error(err, - ERRCODE_CONNECTION_FAILURE, - "could not set binary COPY mode", - conn); - return true; +static bool +end_copy_in(TSConnection *conn, const char *errmsg) +{ + int ret = 0; + + do + { + ret = PQputCopyEnd(conn->pg_conn, errmsg); + + if (ret == 1) + { + if (!remote_connection_flush(conn, NULL)) + ret = -1; + } + else if (ret == 0) + { + WaitEvent event; + + Assert(ret == 0); + ModifyWaitEvent(conn->wes, conn->sockeventpos, WL_SOCKET_WRITEABLE, NULL); + WaitEventSetWait(conn->wes, -1, &event, 1, PG_WAIT_EXTENSION); + + if (event.events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (event.events & WL_SOCKET_WRITEABLE) + { + /* Try again */ + } + } + } while (ret == 0); + + return ret == 1; } bool -remote_connection_begin_copy(TSConnection *conn, const char *copycmd, bool binary, - TSConnectionError *err) +remote_connection_begin_copy_in(TSConnection *conn, const char *copycmd, bool binary, + TSConnectionError *err) { - PGconn *pg_conn = remote_connection_get_pg_conn(conn); PGresult *volatile res = NULL; - if (PQisnonblocking(pg_conn)) - return fill_simple_error(err, - ERRCODE_FEATURE_NOT_SUPPORTED, - "distributed copy doesn't support non-blocking connections", - conn); - if (conn->status != CONN_IDLE) return fill_simple_error(err, ERRCODE_INTERNAL_ERROR, @@ -2181,9 +2420,10 @@ remote_connection_begin_copy(TSConnection *conn, const char *copycmd, bool binar GetConfigOption("timescaledb.debug_broken_sendrecv_throw_after", true, false); if (throw_after_option) { - res = PQexec(pg_conn, - psprintf("set timescaledb.debug_broken_sendrecv_throw_after = '%s';", - throw_after_option)); + res = remote_connection_exec(conn, + psprintf("set timescaledb.debug_broken_sendrecv_throw_after = " + "'%s';", + throw_after_option)); if (PQresultStatus(res) != PGRES_COMMAND_OK) { remote_connection_get_result_error(res, err); @@ -2195,7 +2435,7 @@ remote_connection_begin_copy(TSConnection *conn, const char *copycmd, bool binar #endif /* Run the COPY query. */ - res = PQexec(pg_conn, copycmd); + res = remote_connection_exec(conn, copycmd); if (PQresultStatus(res) != PGRES_COPY_IN) { @@ -2208,43 +2448,15 @@ remote_connection_begin_copy(TSConnection *conn, const char *copycmd, bool binar } PQclear(res); + remote_connection_set_status(conn, CONN_COPY_IN); if (binary && !send_binary_copy_header(conn, err)) - goto err_end_copy; - - /* Switch the connection into nonblocking mode for the duration of COPY. */ - if (PQsetnonblocking(pg_conn, 1) != 0) { - (void) fill_simple_error(err, - ERRCODE_CONNECTION_EXCEPTION, - "failed to set the connection into nonblocking mode", - conn); - goto err_end_copy; + end_copy_in(conn, err->msg); + return false; } conn->binary_copy = binary; - remote_connection_set_status(conn, CONN_COPY_IN); - - return true; -err_end_copy: - PQputCopyEnd(pg_conn, err->msg); - - return false; -} - -bool -remote_connection_put_copy_data(TSConnection *conn, const char *buffer, size_t len, - TSConnectionError *err) -{ - int res; - - res = PQputCopyData(remote_connection_get_pg_conn(conn), buffer, len); - - if (res != 1) - return fill_connection_error(err, - ERRCODE_CONNECTION_EXCEPTION, - "could not send COPY data", - conn); return true; } @@ -2254,10 +2466,7 @@ send_end_binary_copy_data(const TSConnection *conn, TSConnectionError *err) { const uint16 buf = pg_hton16((uint16) -1); - if (PQputCopyData(conn->pg_conn, (char *) &buf, sizeof(buf)) != 1) - return fill_simple_error(err, ERRCODE_INTERNAL_ERROR, "could not end binary COPY", conn); - - return true; + return remote_connection_put_copy_data(conn, (const char *) &buf, sizeof(buf), err); } /* @@ -2268,81 +2477,10 @@ send_end_binary_copy_data(const TSConnection *conn, TSConnectionError *err) * actual and expected state. */ bool -remote_connection_end_copy(TSConnection *conn, TSConnectionError *err) +remote_connection_end_copy_in(TSConnection *conn, TSConnectionError *err) { PGresult *res = NULL; - /* - * In any case, try to switch the connection into the blocking mode, because - * that's what the non-COPY code expects. - */ - if (PQisnonblocking(conn->pg_conn)) - { - /* - * We have to flush the connection before we can switch it into blocking - * mode. - */ - for (;;) - { - CHECK_FOR_INTERRUPTS(); - - int flush_result = PQflush(conn->pg_conn); - - if (flush_result == 1) - { - /* - * In some rare cases, flush might report that it's busy, but - * actually there was an error and the socket became invalid. - * Check for it. This is something we have observed in COPY - * queries used for performance testing with tsbench, but not - * sure how it happens exactly, must be in the depths of - * pqReadData called by pqFlush. - */ - int socket = PQsocket(conn->pg_conn); - if (socket == PGINVALID_SOCKET) - { - return fill_connection_error(err, - ERRCODE_CONNECTION_EXCEPTION, - "failed to flush the COPY connection", - conn); - } - - /* - * The socket is busy, wait. We don't care about the wait result - * here, because whether it is a timeout or the socket became - * writeable, we just retry. - */ - (void) WaitLatchOrSocket(MyLatch, - WL_TIMEOUT | WL_SOCKET_WRITEABLE | WL_EXIT_ON_PM_DEATH, - socket, - /* timeout = */ 1000, - /* wait_event_info = */ 0); - } - else if (flush_result == 0) - { - /* Flushed all. */ - break; - } - else - { - /* Error. */ - return fill_connection_error(err, - ERRCODE_CONNECTION_EXCEPTION, - "failed to flush the COPY connection", - conn); - } - } - - /* Switch the connection into blocking mode. */ - if (PQsetnonblocking(conn->pg_conn, 0) != 0) - { - return fill_connection_error(err, - ERRCODE_CONNECTION_EXCEPTION, - "failed to set the connection into blocking mode", - conn); - } - } - /* * Shouldn't have been called for a connection we know is not in COPY mode. */ @@ -2352,6 +2490,12 @@ remote_connection_end_copy(TSConnection *conn, TSConnectionError *err) "connection not in COPY_IN state when ending COPY", conn); + if (!remote_connection_flush(conn, err)) + { + elog(WARNING, "Failed to flush connection"); + return false; + } + /* * Check whether it's still in COPY mode. The dist_copy manages COPY * protocol itself because it needs to work with multiple connections @@ -2359,7 +2503,8 @@ remote_connection_end_copy(TSConnection *conn, TSConnectionError *err) * reasons, as well. If we discover this, update our info with the actual * status, but still report the error. */ - res = PQgetResult(conn->pg_conn); + res = remote_connection_get_result(conn); + if (res == NULL || PQresultStatus(res) != PGRES_COPY_IN) { remote_connection_set_status(conn, res == NULL ? CONN_IDLE : CONN_PROCESSING); @@ -2373,7 +2518,7 @@ remote_connection_end_copy(TSConnection *conn, TSConnectionError *err) if (conn->binary_copy && !send_end_binary_copy_data(conn, err)) return false; - if (PQputCopyEnd(conn->pg_conn, NULL) != 1) + if (!end_copy_in(conn, NULL)) return fill_connection_error(err, ERRCODE_CONNECTION_EXCEPTION, "could not end remote COPY", @@ -2383,7 +2528,7 @@ remote_connection_end_copy(TSConnection *conn, TSConnectionError *err) } bool success = true; - while ((res = PQgetResult(conn->pg_conn))) + while ((res = remote_connection_get_result(conn))) { ExecStatusType status = PQresultStatus(res); if (status != PGRES_COMMAND_OK) @@ -2399,6 +2544,123 @@ remote_connection_end_copy(TSConnection *conn, TSConnectionError *err) return success; } +bool +remote_connection_begin_copy_out(TSConnection *conn, const char *stmt, StmtParams *params, + bool binary, TSConnectionError *err) +{ + char *copy_stmt = psprintf("copy (%s) to stdout with (format binary)", stmt); + PGresult *res = remote_connection_exec_params(conn, copy_stmt, params, binary, true); + + pfree(copy_stmt); + + if (PQresultStatus(res) != PGRES_COPY_OUT) + { + remote_connection_get_error(conn, err); + remote_result_close(res); + return false; + } + + remote_result_close(res); + + conn->binary_copy = binary; + remote_connection_set_status(conn, CONN_COPY_OUT); + + return true; +} + +bool +remote_connection_end_copy_out(TSConnection *conn, bool cancel, TSConnectionError *err) +{ + PGresult *final_pgres = NULL; + TimestampTz endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); + ExecStatusType received_status; + + Assert(conn->status == CONN_COPY_OUT); + + if (cancel) + send_cancel(conn); + + remote_connection_drain(conn, endtime, &final_pgres); + received_status = PQresultStatus(final_pgres); + remote_result_close(final_pgres); + + if (cancel) + { + /* If the query was canceled during query execution by the access node + * (e.g., due to reaching a LIMIT), expect either PGRES_COMMAND_OK + * (query completed before cancel happened) or PGRES_FATAL_ERROR + * (query abandoned before completion) */ + if (received_status != PGRES_COMMAND_OK && received_status != PGRES_COPY_OUT && + received_status != PGRES_FATAL_ERROR) + { + remote_connection_get_error(conn, err); + return false; + } + } + else if (received_status != PGRES_COMMAND_OK) + { + Assert(received_status == PGRES_FATAL_ERROR || received_status == PGRES_NONFATAL_ERROR); + remote_connection_get_error(conn, err); + return false; + } + + remote_connection_set_status(conn, CONN_IDLE); + return true; +} + +/* + * Read COPY data, but in a non-blocking way that integrates with PostgreSQL + * signals. + * + * Like PQgetCopyData(), returns > 0 for data, -1 for COPY done and -2 for + * error. This function will never return 0 (COPY in progress), since it will + * wait for the other conditions in that case. + */ +int +remote_connection_get_copy_data(const TSConnection *conn, char **buffer) +{ + int datalen = -2; + + Assert(conn->status == CONN_COPY_OUT); + + do + { + /* Get copy data in async mode. If the call would block, use + an event set to wait for interrupt or a readable socket */ + datalen = PQgetCopyData(conn->pg_conn, buffer, /* async = */ true); + + switch (datalen) + { + case 0: + /* COPY still in progress --> wait */ + if (!wait_and_consume_input(conn)) + { + /* Translate to do-while-loop error to break out */ + datalen = -2; + } + break; + case -1: + /* COPY is done */ + break; + case -2: + /* Error occurred */ + break; + default: + /* Data received */ + Assert(datalen > 0); + break; + } + } while (datalen == 0); + + return datalen; +} + +void +remote_connection_free_mem(char *buffer) +{ + PQfreemem(buffer); +} + #ifdef TS_DEBUG /* * Reset the current connection stats. @@ -2422,15 +2684,10 @@ remote_connection_stats_get(void) void _remote_connection_init(void) { - RegisterXactCallback(remote_connection_xact_end, NULL); - RegisterSubXactCallback(remote_connection_subxact_end, NULL); - unset_libpq_envvar(); } void _remote_connection_fini(void) { - UnregisterXactCallback(remote_connection_xact_end, NULL); - UnregisterSubXactCallback(remote_connection_subxact_end, NULL); } diff --git a/tsl/src/remote/connection.h b/tsl/src/remote/connection.h index f10906c2bd5..272700f099d 100644 --- a/tsl/src/remote/connection.h +++ b/tsl/src/remote/connection.h @@ -67,7 +67,6 @@ extern TSConnection *remote_connection_open_by_id(TSConnectionId id); extern TSConnection *remote_connection_open(Oid server_id, Oid user_id); extern TSConnection *remote_connection_open_nothrow(Oid server_id, Oid user_id, char **errmsg); extern List *remote_connection_prepare_auth_options(const ForeignServer *server, Oid user_id); -extern bool remote_connection_set_autoclose(TSConnection *conn, bool autoclose); extern int remote_connection_xact_depth_get(const TSConnection *conn); extern int remote_connection_xact_depth_inc(TSConnection *conn); extern int remote_connection_xact_depth_dec(TSConnection *conn); @@ -76,6 +75,9 @@ extern void remote_connection_xact_transition_end(TSConnection *conn); extern bool remote_connection_xact_is_transitioning(const TSConnection *conn); extern bool remote_connection_ping(const char *node_name); extern void remote_connection_close(TSConnection *conn); +extern PGresult *remote_connection_exec_params(TSConnection *conn, const char *cmd, + StmtParams *params, bool binary, + bool single_row_mode); extern PGresult *remote_connection_exec(TSConnection *conn, const char *cmd); extern PGresult *remote_connection_execf(TSConnection *conn, const char *fmt, ...) pg_attribute_printf(2, 3); @@ -85,6 +87,8 @@ extern PGresult *remote_connection_queryf_ok(TSConnection *conn, const char *fmt extern void remote_connection_cmd_ok(TSConnection *conn, const char *cmd); extern void remote_connection_cmdf_ok(TSConnection *conn, const char *fmt, ...) pg_attribute_printf(2, 3); +extern bool remote_connection_flush(const TSConnection *conn, TSConnectionError *err); +extern PGresult *remote_connection_get_result(const TSConnection *conn); extern ConnOptionType remote_connection_option_type(const char *keyword); extern bool remote_connection_valid_user_option(const char *keyword); extern bool remote_connection_valid_node_option(const char *keyword); @@ -109,11 +113,12 @@ typedef enum TSConnectionStatus CONN_IDLE, /* No command being processed */ CONN_PROCESSING, /* Command/query is being processed */ CONN_COPY_IN, /* Connection is in COPY_IN mode */ + CONN_COPY_OUT, /* Conenction is in COPY_OUT mode */ } TSConnectionStatus; TSConnectionResult remote_connection_drain(TSConnection *conn, TimestampTz endtime, PGresult **result); -extern bool remote_connection_cancel_query(TSConnection *conn); +extern bool remote_connection_cancel_query(TSConnection *conn, const char **errmsg); extern PGconn *remote_connection_get_pg_conn(const TSConnection *conn); extern bool remote_connection_is_processing(const TSConnection *conn); extern void remote_connection_set_status(TSConnection *conn, TSConnectionStatus status); @@ -149,11 +154,17 @@ extern RemoteConnectionStats *remote_connection_stats_get(void); /* * Connection functions for COPY mode. */ -extern bool remote_connection_begin_copy(TSConnection *conn, const char *copycmd, bool binary, - TSConnectionError *err); -extern bool remote_connection_end_copy(TSConnection *conn, TSConnectionError *err); -extern bool remote_connection_put_copy_data(TSConnection *conn, const char *buffer, size_t len, +extern bool remote_connection_begin_copy_in(TSConnection *conn, const char *copycmd, bool binary, TSConnectionError *err); +extern bool remote_connection_end_copy_in(TSConnection *conn, TSConnectionError *err); +extern bool remote_connection_put_copy_data(const TSConnection *conn, const char *buffer, + size_t nbytes, TSConnectionError *err); +extern bool remote_connection_begin_copy_out(TSConnection *conn, const char *stmt, + StmtParams *params, bool binary, + TSConnectionError *err); +extern int remote_connection_get_copy_data(const TSConnection *conn, char **buffer); +extern bool remote_connection_end_copy_out(TSConnection *conn, bool cancel, TSConnectionError *err); +extern void remote_connection_free_mem(char *buffer); /* Error handling functions for connections */ extern void remote_connection_get_error(const TSConnection *conn, TSConnectionError *err); diff --git a/tsl/src/remote/connection_cache.c b/tsl/src/remote/connection_cache.c index c3889c7cf4a..c148a9b075a 100644 --- a/tsl/src/remote/connection_cache.c +++ b/tsl/src/remote/connection_cache.c @@ -150,6 +150,7 @@ connection_cache_create_entry(Cache *cache, CacheQuery *query) { TSConnectionId *id = (TSConnectionId *) query->data; ConnectionCacheEntry *entry = query->result; + MemoryContext old; /* * Protects against errors in remote_connection_open, necessary since this @@ -162,11 +163,9 @@ connection_cache_create_entry(Cache *cache, CacheQuery *query) * because PGconn allocation happens using malloc. Which is why calling * remote_connection_close at cleanup is critical. */ + old = MemoryContextSwitchTo(ts_cache_memory_ctx(cache)); entry->conn = remote_connection_open_by_id(*id); - - /* Since this connection is managed by the cache, it should not auto-close - * at the end of the transaction */ - remote_connection_set_autoclose(entry->conn, false); + MemoryContextSwitchTo(old); /* Set the hash values of the foreign server and role for cache * invalidation purposes */ diff --git a/tsl/src/remote/copy_fetcher.c b/tsl/src/remote/copy_fetcher.c index 11ced7d22a9..cddcca7d831 100644 --- a/tsl/src/remote/copy_fetcher.c +++ b/tsl/src/remote/copy_fetcher.c @@ -5,8 +5,11 @@ */ #include +#include #include #include +#include +#include #include "copy_fetcher.h" #include "tuplefactory.h" @@ -67,9 +70,9 @@ copy_fetcher_reset(CopyFetcher *fetcher) static void copy_fetcher_send_fetch_request(DataFetcher *df) { - AsyncRequest *volatile req = NULL; MemoryContext oldcontext; CopyFetcher *fetcher = cast_fetcher(CopyFetcher, df); + TSConnectionError err; if (fetcher->state.open) { @@ -79,68 +82,14 @@ copy_fetcher_send_fetch_request(DataFetcher *df) /* make sure to have a clean state */ copy_fetcher_reset(fetcher); + oldcontext = MemoryContextSwitchTo(fetcher->state.req_mctx); - StringInfoData copy_query; - initStringInfo(©_query); - appendStringInfo(©_query, "copy (%s) to stdout with (format binary)", fetcher->state.stmt); + Assert(tuplefactory_is_binary(fetcher->state.tf)); - PG_TRY(); - { - oldcontext = MemoryContextSwitchTo(fetcher->state.req_mctx); - - Assert(tuplefactory_is_binary(fetcher->state.tf)); - req = async_request_send_with_stmt_params_elevel_res_format(fetcher->state.conn, - copy_query.data, - fetcher->state.stmt_params, - ERROR, - FORMAT_BINARY); - Assert(NULL != req); - - /* - * Single-row mode doesn't really influence the COPY queries, but setting - * it here is a convenient way to prevent concurrent COPY requests on the - * same connection. This can happen if we have multiple tables on the same - * data node and still use the row-by-row fetcher. - */ - if (!async_request_set_single_row_mode(req)) - { - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not set single-row mode on connection to \"%s\"", - remote_connection_node_name(fetcher->state.conn)), - errdetail("The aborted statement is: %s.", fetcher->state.stmt), - errhint("Copy fetcher is not supported together with sub-queries." - " Use cursor fetcher instead."))); - } - - PGresult *res = PQgetResult(remote_connection_get_pg_conn(fetcher->state.conn)); - if (res == NULL) - { - /* Shouldn't really happen but technically possible. */ - TSConnectionError err; - remote_connection_get_error(fetcher->state.conn, &err); - remote_connection_error_elog(&err, ERROR); - } - if (PQresultStatus(res) != PGRES_COPY_OUT) - { - TSConnectionError err; - remote_connection_get_result_error(res, &err); - remote_connection_error_elog(&err, ERROR); - } - - fetcher->state.open = true; - PQclear(res); - pfree(req); - } - PG_CATCH(); - { - if (NULL != req) - pfree(req); - - PG_RE_THROW(); - } - PG_END_TRY(); + if (!remote_connection_begin_copy_out(df->conn, df->stmt, df->stmt_params, true, &err)) + remote_connection_error_elog(&err, ERROR); + fetcher->state.open = true; MemoryContextSwitchTo(oldcontext); } @@ -241,54 +190,6 @@ copy_data_check_header(StringInfo copy_data) } } -/* - * End the COPY after receiving EOF or canceling a query (e.g., due to a LIMIT - * being reached). - * - * This should be called after canceling a query, or, after reading all data, - * the file trailer, and getting an EOF return value. - */ -static void -end_copy(CopyFetcher *fetcher, bool canceled) -{ - PGconn *conn = remote_connection_get_pg_conn(fetcher->state.conn); - PGresult *final_pgres = NULL; - PGresult *pgres = NULL; - ExecStatusType received_status; - - Assert(fetcher->state.open); - - /* Read results until NULL */ - while ((pgres = PQgetResult(conn))) - { - if (final_pgres == NULL) - final_pgres = pgres; - else - remote_result_close(pgres); - } - - received_status = PQresultStatus(final_pgres); - remote_result_close(final_pgres); - - if (canceled) - { - /* If the query was canceled during query execution by the access node - * (e.g., due to reaching a LIMIT), expect either PGRES_COMMAND_OK - * (query completed before cancel happened) or PGRES_FATAL_ERROR - * (query abandoned before completion) */ - if (received_status != PGRES_COMMAND_OK && received_status != PGRES_FATAL_ERROR) - remote_connection_elog(fetcher->state.conn, ERROR); - } - else if (received_status != PGRES_COMMAND_OK) - { - Assert(received_status == PGRES_FATAL_ERROR || received_status == PGRES_NONFATAL_ERROR); - remote_connection_elog(fetcher->state.conn, ERROR); - } - - fetcher->state.open = false; - remote_connection_set_status(fetcher->state.conn, CONN_IDLE); -} - /* * Prematurely end the COPY before EOF is received. * @@ -298,6 +199,8 @@ end_copy(CopyFetcher *fetcher, bool canceled) static void end_copy_before_eof(CopyFetcher *fetcher) { + TSConnectionError err; + /* * The fetcher state might not be open if the fetcher got initialized but * never executed due to executor constraints. @@ -306,8 +209,11 @@ end_copy_before_eof(CopyFetcher *fetcher) return; Assert(!fetcher->state.eof); - remote_connection_cancel_query(fetcher->state.conn); - end_copy(fetcher, true); + + if (!remote_connection_end_copy_out(fetcher->state.conn, true, &err)) + remote_connection_error_elog(&err, ERROR); + + fetcher->state.open = false; } /* @@ -316,11 +222,8 @@ end_copy_before_eof(CopyFetcher *fetcher) static int copy_fetcher_complete(CopyFetcher *fetcher) { - /* Marked as volatile since it's modified in PG_TRY used in PG_CATCH */ - AsyncResponseResult *volatile response = NULL; char *volatile dataptr = NULL; MemoryContext oldcontext; - PGconn *conn = remote_connection_get_pg_conn(fetcher->state.conn); Assert(fetcher->state.open); data_fetcher_validate(&fetcher->state); @@ -349,13 +252,10 @@ copy_fetcher_complete(CopyFetcher *fetcher) for (row = 0; row < fetcher->state.fetch_size; row++) { - MemoryContextSwitchTo(fetcher->state.req_mctx); - StringInfoData copy_data = { 0 }; - copy_data.len = PQgetCopyData(conn, - ©_data.data, - /* async = */ false); + MemoryContextSwitchTo(fetcher->state.req_mctx); + copy_data.len = remote_connection_get_copy_data(fetcher->state.conn, ©_data.data); /* Set dataptr to ensure data is freed with PQfreemem() in * PG_CATCH() clause in case error is thrown. */ @@ -366,8 +266,8 @@ copy_fetcher_complete(CopyFetcher *fetcher) /* Note: it is possible to get EOF without having received the * file trailer in case there's e.g., a remote error. */ fetcher->state.eof = true; - /* Should read final result with PQgetResult() until it - * returns NULL. This happens below. */ + /* Should read final result until it returns NULL. This + * happens below. */ break; } else if (copy_data.len == -2) @@ -403,10 +303,10 @@ copy_fetcher_complete(CopyFetcher *fetcher) */ fetcher->file_trailer_received = true; - /* Next PQgetCopyData() should return -1, indicating EOF and - * that the remote side ended the copy. The final result - * (PGRES_COMMAND_OK) should then be read with - * PQgetResult(). */ + /* Next call to remote_connection_get_copy_data() should + * return -1, indicating EOF and that the remote side ended + * the copy. The final result (PGRES_COMMAND_OK) should then + * be read. */ } else { @@ -480,7 +380,7 @@ copy_fetcher_complete(CopyFetcher *fetcher) } } MemoryContextSwitchTo(fetcher->state.batch_mctx); - PQfreemem(copy_data.data); + remote_connection_free_mem(copy_data.data); dataptr = NULL; } @@ -502,15 +402,19 @@ copy_fetcher_complete(CopyFetcher *fetcher) * where tuples are first fetched in COPY mode, then a remote explain * is performed on the same connection within the same scan. */ if (fetcher->state.eof) - end_copy(fetcher, false); + { + TSConnectionError err; + + if (!remote_connection_end_copy_out(fetcher->state.conn, false, &err)) + remote_connection_error_elog(&err, ERROR); + + fetcher->state.open = false; + } } PG_CATCH(); { - if (NULL != response) - async_response_result_close(response); - if (NULL != dataptr) - PQfreemem(dataptr); + remote_connection_free_mem(dataptr); PG_RE_THROW(); } diff --git a/tsl/src/remote/dist_copy.c b/tsl/src/remote/dist_copy.c index 40c3c94596c..126131f3060 100644 --- a/tsl/src/remote/dist_copy.c +++ b/tsl/src/remote/dist_copy.c @@ -313,13 +313,13 @@ get_copy_connection_to_data_node(RemoteCopyContext *context, TSConnectionId requ if (status == CONN_IDLE) { TSConnectionError err; - if (!remote_connection_begin_copy(connection, - psprintf("%s /* batch %d conn %p */", - state->outgoing_copy_cmd, - context->batch_ordinal, - remote_connection_get_pg_conn(connection)), - state->using_binary, - &err)) + if (!remote_connection_begin_copy_in(connection, + psprintf("%s /* batch %d conn %p */", + state->outgoing_copy_cmd, + context->batch_ordinal, + remote_connection_get_pg_conn(connection)), + state->using_binary, + &err)) { remote_connection_error_elog(&err, ERROR); } @@ -524,7 +524,8 @@ end_copy_on_success(CopyConnectionState *state) */ Assert(PQisnonblocking(pg_conn)); - PGresult *res = PQgetResult(pg_conn); + PGresult *res = remote_connection_get_result(conn); + if (res == NULL) { /* @@ -574,23 +575,6 @@ end_copy_on_success(CopyConnectionState *state) flush_active_connections(state); - /* - * Switch the connections back into blocking mode because that's what the - * non-COPY code expects. - */ - foreach (lc, to_end_copy) - { - TSConnection *conn = lfirst(lc); - PGconn *pg_conn = remote_connection_get_pg_conn(conn); - - if (PQsetnonblocking(pg_conn, 0)) - { - ereport(ERROR, - (errmsg("failed to switch the connection into blocking mode"), - errdetail("%s", PQerrorMessage(pg_conn)))); - } - } - /* * Verify that the copy has successfully finished on each connection. */ @@ -598,7 +582,7 @@ end_copy_on_success(CopyConnectionState *state) { TSConnection *conn = lfirst(lc); PGconn *pg_conn = remote_connection_get_pg_conn(conn); - PGresult *res = PQgetResult(pg_conn); + PGresult *res = remote_connection_get_result(conn); if (res == NULL) { ereport(ERROR, (errmsg("unexpected NULL result when ending remote COPY"))); @@ -611,7 +595,8 @@ end_copy_on_success(CopyConnectionState *state) remote_connection_error_elog(&err, ERROR); } - res = PQgetResult(pg_conn); + res = remote_connection_get_result(conn); + if (res != NULL) { ereport(ERROR, @@ -649,7 +634,7 @@ end_copy_on_failure(CopyConnectionState *state) TSConnection *conn = lfirst(lc); if (remote_connection_get_status(conn) == CONN_COPY_IN && - !remote_connection_end_copy(conn, &err)) + !remote_connection_end_copy_in(conn, &err)) { failure = true; } diff --git a/tsl/src/remote/dist_txn.c b/tsl/src/remote/dist_txn.c index 24974cbf047..bc30b10b6a8 100644 --- a/tsl/src/remote/dist_txn.c +++ b/tsl/src/remote/dist_txn.c @@ -178,10 +178,13 @@ dist_txn_xact_callback_abort() remote_txn_store_foreach(store, remote_txn) { - if (remote_txn_is_ongoing(remote_txn) && !remote_txn_abort(remote_txn)) + const char *errmsg = NULL; + + if (remote_txn_is_ongoing(remote_txn) && !remote_txn_abort(remote_txn, &errmsg)) elog(WARNING, - "transaction rollback on data node \"%s\" failed", - remote_connection_node_name(remote_txn_get_connection(remote_txn))); + "transaction rollback on data node \"%s\" failed: %s", + remote_connection_node_name(remote_txn_get_connection(remote_txn)), + errmsg); } } diff --git a/tsl/src/remote/txn.c b/tsl/src/remote/txn.c index 6772ce9579b..37dfefc676f 100644 --- a/tsl/src/remote/txn.c +++ b/tsl/src/remote/txn.c @@ -135,7 +135,7 @@ remote_txn_begin(RemoteTxn *entry, int curlevel) { TSConnectionError err; - if (!remote_connection_end_copy(entry->conn, &err)) + if (!remote_connection_end_copy_in(entry->conn, &err)) remote_connection_error_elog(&err, ERROR); } @@ -364,7 +364,7 @@ remote_txn_check_for_leaked_prepared_statements(RemoteTxn *entry) #endif bool -remote_txn_abort(RemoteTxn *entry) +remote_txn_abort(RemoteTxn *entry, const char **errmsg) { const char *abort_sql; bool success = true; @@ -386,13 +386,16 @@ remote_txn_abort(RemoteTxn *entry) Assert(entry->conn != NULL); Assert(remote_connection_xact_depth_get(entry->conn) > 0); - elog(DEBUG3, "aborting remote transaction on connection %p", entry->conn); + elog(WARNING, " [%s] aborting remote transaction", remote_connection_node_name(entry->conn)); /* Already in bad state */ if (remote_connection_xact_is_transitioning(entry->conn)) + { + if (errmsg != NULL) + *errmsg = "transaction is transitioning"; return false; - else if (in_error_recursion_trouble() || - PQstatus(remote_connection_get_pg_conn(entry->conn)) == CONNECTION_BAD) + } + else if (in_error_recursion_trouble()) { /* * Don't try to recover the connection if we're already in error @@ -402,6 +405,15 @@ remote_txn_abort(RemoteTxn *entry) * this ongoing connection and so no cleanup is necessary. */ remote_connection_xact_transition_begin(entry->conn); + if (errmsg != NULL) + *errmsg = "in error recursion"; + return false; + } + else if (PQstatus(remote_connection_get_pg_conn(entry->conn)) == CONNECTION_BAD) + { + remote_connection_xact_transition_begin(entry->conn); + if (errmsg != NULL) + *errmsg = "bad connection"; return false; } @@ -414,13 +426,21 @@ remote_txn_abort(RemoteTxn *entry) * If so, request cancellation of the command. */ if (PQtransactionStatus(remote_connection_get_pg_conn(entry->conn)) == PQTRANS_ACTIVE) - success = remote_connection_cancel_query(entry->conn); + success = remote_connection_cancel_query(entry->conn, errmsg); - if (success) + if (!success) + return false; + + /* At this point any on going queries should have completed */ + remote_connection_set_status(entry->conn, CONN_IDLE); + success = exec_cleanup_command(entry->conn, abort_sql); + + if (!success) { - /* At this point any on going queries should have completed */ - remote_connection_set_status(entry->conn, CONN_IDLE); - success = exec_cleanup_command(entry->conn, abort_sql); + if (errmsg != NULL) + *errmsg = "ROLLBACK TRANSACTION failed"; + + return false; } /* @@ -430,7 +450,7 @@ remote_txn_abort(RemoteTxn *entry) * prepared stmts are per session not per transaction. But we don't want * prepared_stmts to survive transactions in our use case. */ - if (success && entry->have_prep_stmt) + if (entry->have_prep_stmt) success = exec_cleanup_command(entry->conn, "DEALLOCATE ALL"); if (success) @@ -441,6 +461,8 @@ remote_txn_abort(RemoteTxn *entry) /* Everything succeeded, so we have finished transitioning */ remote_connection_xact_transition_end(entry->conn); } + else if (errmsg != NULL) + *errmsg = "DEALLOCATE ALL failed"; return success; } @@ -633,7 +655,7 @@ remote_txn_sub_txn_abort(RemoteTxn *entry, int curlevel) * data node, and if so, request cancellation of the command. */ if (PQtransactionStatus(pg_conn) == PQTRANS_ACTIVE && - !remote_connection_cancel_query(entry->conn)) + !remote_connection_cancel_query(entry->conn, NULL)) success = false; else { diff --git a/tsl/src/remote/txn.h b/tsl/src/remote/txn.h index 4e03b436c64..0deaf849eb6 100644 --- a/tsl/src/remote/txn.h +++ b/tsl/src/remote/txn.h @@ -25,7 +25,7 @@ typedef enum extern void remote_txn_init(RemoteTxn *entry, TSConnection *conn); extern RemoteTxn *remote_txn_begin_on_connection(TSConnection *conn); extern void remote_txn_begin(RemoteTxn *entry, int curlevel); -extern bool remote_txn_abort(RemoteTxn *entry); +extern bool remote_txn_abort(RemoteTxn *entry, const char **errmsg); extern void remote_txn_write_persistent_record(RemoteTxn *entry); extern void remote_txn_deallocate_prepared_stmts_if_needed(RemoteTxn *entry); extern bool remote_txn_sub_txn_abort(RemoteTxn *entry, int curlevel); diff --git a/tsl/test/expected/remote_connection.out b/tsl/test/expected/remote_connection.out index 45828658d1d..e1285f7a549 100644 --- a/tsl/test/expected/remote_connection.out +++ b/tsl/test/expected/remote_connection.out @@ -55,12 +55,12 @@ SELECT * FROM test.get_connection_stats(); \set ON_ERROR_STOP 0 SELECT test.send_remote_query_that_generates_exception(); ERROR: XX000: bad query error thrown from test -LOCATION: ts_test_bad_remote_query, connection.c:184 +LOCATION: ts_test_bad_remote_query, connection.c:207 \set ON_ERROR_STOP 1 SELECT * FROM test.get_connection_stats(); connections_created | connections_closed | results_created | results_cleared ---------------------+--------------------+-----------------+----------------- - 1 | 1 | 7 | 7 + 1 | 1 | 6 | 6 (1 row) SELECT test.remote_connection_tests(); diff --git a/tsl/test/shared/expected/dist_queries.out b/tsl/test/shared/expected/dist_queries.out index 45b24a4863b..22b62652054 100644 --- a/tsl/test/shared/expected/dist_queries.out +++ b/tsl/test/shared/expected/dist_queries.out @@ -77,3 +77,21 @@ SELECT * FROM matches_reference ORDER BY 1,2,3,4; (3 rows) ROLLBACK; +-- Test query/command cancelation with statement_timeout +SET statement_timeout=200; +-- Execute long-running query on data nodes +\set ON_ERROR_STOP 0 +CALL distributed_exec('SELECT generate_series(1, 1000000000000)'); +ERROR: canceling statement due to statement timeout +RESET statement_timeout; +\set ON_ERROR_STOP 1 +-- Data node connections should be IDLE +SELECT node_name, database, connection_status, transaction_status, processing +FROM _timescaledb_internal.show_connection_cache() ORDER BY 1; + node_name | database | connection_status | transaction_status | processing +-------------+-------------+-------------------+--------------------+------------ + data_node_1 | data_node_1 | OK | IDLE | f + data_node_2 | data_node_2 | OK | IDLE | f + data_node_3 | data_node_3 | OK | IDLE | f +(3 rows) + diff --git a/tsl/test/shared/sql/dist_queries.sql b/tsl/test/shared/sql/dist_queries.sql index e94ec51cac1..888720cca42 100644 --- a/tsl/test/shared/sql/dist_queries.sql +++ b/tsl/test/shared/sql/dist_queries.sql @@ -56,3 +56,15 @@ SELECT 9, 'old trafford', 'MNU', 'MNC' WHERE NOT EXISTS (SELECT 1 FROM upsert); SELECT * FROM matches_reference ORDER BY 1,2,3,4; ROLLBACK; + +-- Test query/command cancelation with statement_timeout +SET statement_timeout=200; +-- Execute long-running query on data nodes +\set ON_ERROR_STOP 0 +\timing +CALL distributed_exec('SELECT generate_series(1, 1000000000000)'); +RESET statement_timeout; +\set ON_ERROR_STOP 1 +-- Data node connections should be IDLE +SELECT node_name, database, connection_status, transaction_status, processing +FROM _timescaledb_internal.show_connection_cache() ORDER BY 1; diff --git a/tsl/test/src/remote/async.c b/tsl/test/src/remote/async.c index ee771ef4612..cb907eafb23 100644 --- a/tsl/test/src/remote/async.c +++ b/tsl/test/src/remote/async.c @@ -245,7 +245,7 @@ test_node_death() async_request_set_add_sql(set, conn, "SELECT 1"); remote_node_killer_kill(&rnk); - TestAssertTrue(false == remote_connection_cancel_query(conn)); + TestAssertTrue(false == remote_connection_cancel_query(conn, NULL)); /* do cancel query after seeing error */ conn = get_connection(); @@ -256,7 +256,7 @@ test_node_death() /* first we get error result */ TestEnsureError(async_request_set_wait_ok_result(set)); - TestAssertTrue(false == remote_connection_cancel_query(conn)); + TestAssertTrue(false == remote_connection_cancel_query(conn, NULL)); remote_connection_close(conn); } @@ -284,7 +284,7 @@ test_timeout() TestAssertTrue(async_response_get_type(response) == RESPONSE_TIMEOUT); /* cancel the locked query and do another query */ - TestAssertTrue(remote_connection_cancel_query(conn)); + TestAssertTrue(remote_connection_cancel_query(conn, NULL)); /* the txn is aborted waiting for abort */ TestEnsureError(async_request_wait_ok_result(async_request_send(conn, "SELECT 1;"))); async_request_wait_ok_command(async_request_send(conn, "ABORT;")); diff --git a/tsl/test/src/remote/connection.c b/tsl/test/src/remote/connection.c index 57dd452822a..464b02df3f9 100644 --- a/tsl/test/src/remote/connection.c +++ b/tsl/test/src/remote/connection.c @@ -24,6 +24,7 @@ #include "export.h" #include "test_utils.h" #include "connection.h" +#include TSConnection * get_connection() @@ -107,14 +108,21 @@ test_simple_queries() static void test_connection_and_result_leaks() { - TSConnection *conn, *subconn; + TSConnection *conn, *subconn, *subconn2; PGresult *res; RemoteConnectionStats *stats; + MemoryContext old; + MemoryContext mcxt; + mcxt = AllocSetContextCreate(CurrentMemoryContext, "test", ALLOCSET_SMALL_SIZES); + + old = MemoryContextSwitchTo(mcxt); stats = remote_connection_stats_get(); remote_connection_stats_reset(); conn = get_connection(); + ASSERT_NUM_OPEN_CONNECTIONS(stats, 1); + res = remote_connection_exec(conn, "SELECT 1"); remote_connection_close(conn); @@ -127,6 +135,8 @@ test_connection_and_result_leaks() BeginInternalSubTransaction("conn leak test"); + /* This connection is created on the subtransaction memory context */ + Assert(CurrentMemoryContext == CurTransactionContext); subconn = get_connection(); ASSERT_NUM_OPEN_CONNECTIONS(stats, 2); @@ -135,7 +145,9 @@ test_connection_and_result_leaks() BeginInternalSubTransaction("conn leak test 2"); - res = remote_connection_exec(subconn, "SELECT 1"); + subconn2 = get_connection(); + ASSERT_NUM_OPEN_CONNECTIONS(stats, 3); + res = remote_connection_exec(subconn2, "SELECT 1"); ASSERT_NUM_OPEN_RESULTS(stats, 2); /* Explicitly close one result */ @@ -149,20 +161,31 @@ test_connection_and_result_leaks() ASSERT_NUM_OPEN_RESULTS(stats, 3); RollbackAndReleaseCurrentSubTransaction(); - - /* Rollback should have cleared the two results created in the - * sub-transaction, but not the one created before the sub-transaction */ - ASSERT_NUM_OPEN_RESULTS(stats, 1); + /* The connection created in the rolled back transaction should be + * destroyed */ + ASSERT_NUM_OPEN_CONNECTIONS(stats, 2); remote_connection_exec(subconn, "SELECT 1"); - ASSERT_NUM_OPEN_RESULTS(stats, 2); + ASSERT_NUM_OPEN_RESULTS(stats, 4); + /* Note that releasing/committing the subtransaction does not delete the memory */ ReleaseCurrentSubTransaction(); - /* Should only leave the original connection created before the first - * sub-transaction, but no results */ + /* The original and subconn still exists */ + ASSERT_NUM_OPEN_CONNECTIONS(stats, 2); + ASSERT_NUM_OPEN_RESULTS(stats, 4); + + remote_connection_exec(conn, "SELECT 1"); + ASSERT_NUM_OPEN_RESULTS(stats, 5); + + MemoryContextSwitchTo(old); + MemoryContextDelete(mcxt); + + /* Original connection should be cleaned up along with its 3 results. The + * subconn object was created on a subtransaction memory context that will + * be cleared when the main transaction ends. */ ASSERT_NUM_OPEN_CONNECTIONS(stats, 1); - ASSERT_NUM_OPEN_RESULTS(stats, 0); + ASSERT_NUM_OPEN_RESULTS(stats, 2); remote_connection_stats_reset(); } diff --git a/tsl/test/src/remote/txn_resolve.c b/tsl/test/src/remote/txn_resolve.c index f28e3e19000..498d802341d 100644 --- a/tsl/test/src/remote/txn_resolve.c +++ b/tsl/test/src/remote/txn_resolve.c @@ -47,9 +47,15 @@ create_prepared_txn(TSConnectionId *id) static void create_rollback_prepared_txn(TSConnectionId *id) { + const char *errmsg = NULL; + RemoteTxn *tx = prepared_txn(id, "INSERT INTO public.table_modified_by_txns VALUES ('rollback prepared');"); - remote_txn_abort(tx); + + if (!remote_txn_abort(tx, &errmsg)) + { + elog(WARNING, "remote transcation abort failed: %s", errmsg); + } } Datum