diff --git a/CHANGELOG.md b/CHANGELOG.md index b589fd5d768..b57b964dab3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ accidentally triggering the load of a previous DB version.** * #5241 Allow RETURNING clause when inserting into compressed chunks * #5245 Mange life-cycle of connections via memory contexts * #5246 Make connection establishment interruptible +* #5253 Make data node command execution interruptible **Bugfixes** * #4804 Skip bucketing when start or end of refresh job is null diff --git a/tsl/src/data_node.c b/tsl/src/data_node.c index 7c5dcaee915..a13c465c989 100644 --- a/tsl/src/data_node.c +++ b/tsl/src/data_node.c @@ -257,7 +257,6 @@ data_node_get_connection(const char *const data_node, RemoteTxnPrepStmtOption co const ForeignServer *server; TSConnectionId id; - Assert(data_node != NULL); server = data_node_get_foreign_server(data_node, ACL_NO_CHECK, false, false); id = remote_connection_id(server->serverid, GetUserId()); diff --git a/tsl/src/remote/connection.c b/tsl/src/remote/connection.c index 0f31c6f0157..0e47845ab52 100644 --- a/tsl/src/remote/connection.c +++ b/tsl/src/remote/connection.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -169,6 +170,8 @@ typedef struct TSConnection MemoryContext mcxt; MemoryContextCallback mcxt_cb; bool mcxt_cb_invoked; + WaitEventSet *wes; + int sockeventpos; } TSConnection; /* @@ -608,12 +611,10 @@ extract_connection_options(List *defelems, const char **keywords, const char **v } static bool -get_update_conn_cmd(TSConnection *conn, StringInfo cmdbuf) +prepend_enforced_conn_settings(TSConnection *conn, StringInfo cmdbuf) { const char *local_tz_name = pg_get_timezone_name(session_timezone); - initStringInfo(cmdbuf); - /* * We need to enforce the same timezone setting across nodes. Otherwise, * we might get the wrong result when we push down things like @@ -628,8 +629,16 @@ get_update_conn_cmd(TSConnection *conn, StringInfo cmdbuf) if (conn->tz_name[0] == '\0' || (local_tz_name && pg_strcasecmp(conn->tz_name, local_tz_name) != 0)) { + StringInfo newcmd = makeStringInfo(); + strncpy(conn->tz_name, local_tz_name, TZ_STRLEN_MAX); - appendStringInfo(cmdbuf, "SET TIMEZONE = '%s'", local_tz_name); + appendStringInfo(newcmd, "SET TIMEZONE = '%s'", local_tz_name); + + if (cmdbuf->len > 0) + appendStringInfo(newcmd, ";%s", cmdbuf->data); + + *cmdbuf = *newcmd; + return true; } @@ -667,7 +676,7 @@ remote_connection_configure_if_changed(TSConnection *conn) * command, we will send a SET TIMEZONE command with the new timezone * first. */ - if (get_update_conn_cmd(conn, &cmd)) + if (prepend_enforced_conn_settings(conn, &cmd)) { PGresult *result = remote_connection_exec(conn, cmd.data); success = (PQresultStatus(result) == PGRES_COMMAND_OK); @@ -729,7 +738,7 @@ remote_connection_configure(TSConnection *conn) i++; } - result = PQexec(conn->pg_conn, sql.data); + result = remote_connection_exec(conn, sql.data); success = PQresultStatus(result) == PGRES_COMMAND_OK; PQclear(result); pfree(sql.data); @@ -790,6 +799,14 @@ remote_connection_create(PGconn *pg_conn, bool processing, const char *node_name conn->results.prev = &conn->results; conn->binary_copy = false; conn->mcxt = mcxt; + conn->wes = CreateWaitEventSet(mcxt, 3); + AddWaitEventToSet(conn->wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); + AddWaitEventToSet(conn->wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); + /* Register the socket to get the position in the events array. The actual + * events used here does not matter, since it will be modified as + * appropriate when needed. */ + conn->sockeventpos = + AddWaitEventToSet(conn->wes, WL_SOCKET_READABLE, PQsocket(conn->pg_conn), NULL, NULL); /* Register a memory context callback that will ensure the connection is * always closed and the resources are freed */ @@ -900,30 +917,145 @@ remote_connection_get_result_error(const PGresult *res, TSConnectionError *err) fill_result_error(err, ERRCODE_CONNECTION_EXCEPTION, "", res); } +PGresult * +remote_connection_get_result(const TSConnection *conn) +{ + PGresult *pgres = NULL; + int ret = 1; + + do + { + ret = PQisBusy(conn->pg_conn); + + if (ret == 1) + { + /* Busy, wait for readable */ + WaitEvent event; + + ModifyWaitEvent(conn->wes, conn->sockeventpos, WL_SOCKET_READABLE, NULL); + WaitEventSetWait(conn->wes, -1, &event, 1, PG_WAIT_EXTENSION); + + if (event.events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + if (event.events & WL_SOCKET_READABLE) + { + Assert(event.pos == conn->sockeventpos); + Assert(event.fd == PQsocket(conn->pg_conn)); + + if (PQconsumeInput(conn->pg_conn) == 0) + { + pgres = PQmakeEmptyPGresult(conn->pg_conn, PGRES_FATAL_ERROR); + PQfireResultCreateEvents(conn->pg_conn, pgres); + return pgres; + } + } + } + else if (ret == 0) + { + /* PQgetResult would not block */ + pgres = PQgetResult(conn->pg_conn); + } + else + { + pg_unreachable(); + Assert(false); + } + } while (ret != 0); + + return pgres; +} + /* * Execute a remote command. * - * Like PQexec, which this functions uses internally, the PGresult returned - * describes only the last command executed in a multi-command string. + * The execution blocks until a result is received or a failure occurs. Unlike + * PQexec() and PQexecParams(), however, this function observes PostgreSQL + * interrupts (e.g., a query is canceled). Like PQexecParams(), the PGresult + * returned describes only the last command executed in a multi-command + * string. */ PGresult * remote_connection_exec(TSConnection *conn, const char *cmd) { - PGresult *res; + WaitEvent event; + PGresult *res = NULL; + int ret = 0; + size_t cmdlen = strlen(cmd); + StringInfoData cmd_buf = { + .data = (char *) cmd, + .len = cmdlen, + .maxlen = cmdlen + 1, + }; + + prepend_enforced_conn_settings(conn, &cmd_buf); - if (!remote_connection_configure_if_changed(conn)) + do { - res = PQmakeEmptyPGresult(conn->pg_conn, PGRES_FATAL_ERROR); - PQfireResultCreateEvents(conn->pg_conn, res); - return res; - } + /* Wait for writable socket in outer loop */ + ModifyWaitEvent(conn->wes, conn->sockeventpos, WL_SOCKET_WRITEABLE, NULL); + WaitEventSetWait(conn->wes, -1, &event, 1, PG_WAIT_EXTENSION); - res = PQexec(conn->pg_conn, cmd); + if (event.events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + if (event.events & WL_SOCKET_WRITEABLE) + { + PGresult *result, *last_result; + + ret = PQsendQuery(conn->pg_conn, cmd_buf.data); + + if (ret == 0) + { + res = PQmakeEmptyPGresult(conn->pg_conn, PGRES_FATAL_ERROR); + PQfireResultCreateEvents(conn->pg_conn, res); + return res; + } + + /* Command sent, so now wait for readable result in inner loop */ + last_result = NULL; + + while ((result = remote_connection_get_result(conn)) != NULL) + { + if (last_result) + { +#if PG12 + /* PostgreSQL 12 need to keep the first result instead for + * same behvaior */ + if (PQresultStatus(last_result) == PGRES_FATAL_ERROR && + PQresultStatus(result) == PGRES_FATAL_ERROR) + { + PQclear(result); + result = last_result; + } + else + PQclear(last_result); +#else + PQclear(last_result); +#endif + } + + last_result = result; + + if (PQresultStatus(result) == PGRES_COPY_IN || + PQresultStatus(result) == PGRES_COPY_OUT || + PQresultStatus(result) == PGRES_COPY_BOTH || + PQstatus(conn->pg_conn) == CONNECTION_BAD) + break; + } + + res = last_result; + } + } while (res == NULL); /* * Workaround for the libpq disconnect case. * - * libpq disconnect will create an result object without creating + * libpq disconnect will create an empty result object without generating * events, which is usually done for a regular errors. * * In order to be compatible with our error handling code, force @@ -932,15 +1064,12 @@ remote_connection_exec(TSConnection *conn, const char *cmd) */ if (res) { - ExecStatusType status = PQresultStatus(res); ResultEntry *entry = PQresultInstanceData(res, eventproc); - if (status == PGRES_FATAL_ERROR && entry == NULL) - { - res = PQmakeEmptyPGresult(conn->pg_conn, PGRES_FATAL_ERROR); + if (entry == NULL) PQfireResultCreateEvents(conn->pg_conn, res); - } } + return res; } @@ -1825,17 +1954,20 @@ remote_connection_drain(TSConnection *conn, TimestampTz endtime, PGresult **resu /* In what follows, do not leak any PGresults on an error. */ PG_TRY(); { + ModifyWaitEvent(conn->wes, conn->sockeventpos, WL_SOCKET_READABLE, NULL); + for (;;) { PGresult *res; while (PQisBusy(pg_conn)) { - int wc; TimestampTz now = GetCurrentTimestamp(); long remaining_secs; int remaining_usecs; long cur_timeout_ms; + WaitEvent event; + int ret; /* If timeout has expired, give up, else get sleep time. */ if (now >= endtime) @@ -1850,26 +1982,36 @@ remote_connection_drain(TSConnection *conn, TimestampTz endtime, PGresult **resu cur_timeout_ms = Min(MAX_CONN_WAIT_TIMEOUT_MS, remaining_secs * USECS_PER_SEC + remaining_usecs); - /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | WL_EXIT_ON_PM_DEATH | - WL_TIMEOUT, - PQsocket(pg_conn), - cur_timeout_ms, - PG_WAIT_EXTENSION); - ResetLatch(MyLatch); + /* Wait until there's something to do, or timeout */ + ret = WaitEventSetWait(conn->wes, cur_timeout_ms, &event, 1, PG_WAIT_EXTENSION); - CHECK_FOR_INTERRUPTS(); - - /* Data available in socket? */ - if ((wc & WL_SOCKET_READABLE) && (0 == PQconsumeInput(pg_conn))) + if (ret == 0) { - connresult = CONN_DISCONNECT; + /* Timeout */ + connresult = CONN_TIMEOUT; goto exit; } + + if (event.events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (event.events & WL_SOCKET_READABLE) + { + Assert(event.pos == conn->sockeventpos); + Assert(event.fd == PQsocket(conn->pg_conn)); + + if (PQconsumeInput(conn->pg_conn) == 0) + { + connresult = CONN_DISCONNECT; + goto exit; + } + } } - res = PQgetResult(pg_conn); + res = PQgetResult(conn->pg_conn); if (res == NULL) { @@ -1927,18 +2069,42 @@ remote_connection_drain(TSConnection *conn, TimestampTz endtime, PGresult **resu return connresult; } +static bool +send_cancel(TSConnection *conn) +{ + PGcancel *cancel; + char errbuf[256]; + bool success = true; + + cancel = PQgetCancel(conn->pg_conn); + + if (cancel == NULL) + return false; + + if (PQcancel(cancel, errbuf, sizeof(errbuf)) == 0) + { + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send cancel request: %s", errbuf))); + + success = false; + } + + PQfreeCancel(cancel); + + return success; +} + /* * Cancel the currently-in-progress query and ignore the result. Returns true if we successfully * cancel the query and discard any pending result, and false if not. */ bool -remote_connection_cancel_query(TSConnection *conn) +remote_connection_cancel_query(TSConnection *conn, const char **errmsg) { - PGcancel *cancel; - char errbuf[256]; TimestampTz endtime; TSConnectionError err; - bool success; + bool volatile success = false; if (!conn) return true; @@ -1956,40 +2122,44 @@ remote_connection_cancel_query(TSConnection *conn) if (conn->status == CONN_COPY_IN && !remote_connection_end_copy(conn, &err)) remote_connection_error_elog(&err, WARNING); - /* - * If it takes too long to cancel the query and discard the result, assume - * the connection is dead. - */ - endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); - /* * Issue cancel request. Unfortunately, there's no good way to limit the * amount of time that we might block inside PQcancel(). */ - if ((cancel = PQgetCancel(conn->pg_conn))) + if (!send_cancel(conn)) { - if (!PQcancel(cancel, errbuf, sizeof(errbuf))) - { - ereport(WARNING, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not send cancel request: %s", errbuf))); - PQfreeCancel(cancel); - remote_connection_set_status(conn, CONN_IDLE); - return false; - } - PQfreeCancel(cancel); + remote_connection_set_status(conn, CONN_IDLE); + + if (errmsg != NULL) + *errmsg = "cancelation failed"; + return false; } + /* + * If it takes too long to cancel the query and discard the result, assume + * the connection is dead. + */ + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); + switch (remote_connection_drain(conn, endtime, NULL)) { case CONN_OK: /* Successfully, drained */ + success = true; + break; case CONN_NO_RESPONSE: /* No response, likely beceause there was nothing to cancel */ success = true; break; - default: + case CONN_DISCONNECT: success = false; + if (errmsg != NULL) + *errmsg = "connection drain failed due to disconnect"; + break; + case CONN_TIMEOUT: + success = false; + if (errmsg != NULL) + *errmsg = "connection drain failed due to timeout"; break; } } @@ -2062,9 +2232,10 @@ remote_connection_begin_copy(TSConnection *conn, const char *copycmd, bool binar GetConfigOption("timescaledb.debug_broken_sendrecv_throw_after", true, false); if (throw_after_option) { - res = PQexec(pg_conn, - psprintf("set timescaledb.debug_broken_sendrecv_throw_after = '%s';", - throw_after_option)); + res = remote_connection_exec(conn, + psprintf("set timescaledb.debug_broken_sendrecv_throw_after = " + "'%s';", + throw_after_option)); if (PQresultStatus(res) != PGRES_COMMAND_OK) { remote_connection_get_result_error(res, err); @@ -2076,7 +2247,7 @@ remote_connection_begin_copy(TSConnection *conn, const char *copycmd, bool binar #endif /* Run the COPY query. */ - res = PQexec(pg_conn, copycmd); + res = remote_connection_exec(conn, copycmd); if (PQresultStatus(res) != PGRES_COPY_IN) { @@ -2240,7 +2411,8 @@ remote_connection_end_copy(TSConnection *conn, TSConnectionError *err) * reasons, as well. If we discover this, update our info with the actual * status, but still report the error. */ - res = PQgetResult(conn->pg_conn); + res = remote_connection_get_result(conn); + if (res == NULL || PQresultStatus(res) != PGRES_COPY_IN) { remote_connection_set_status(conn, res == NULL ? CONN_IDLE : CONN_PROCESSING); diff --git a/tsl/src/remote/connection.h b/tsl/src/remote/connection.h index a31f5f5b16e..c3f28a81116 100644 --- a/tsl/src/remote/connection.h +++ b/tsl/src/remote/connection.h @@ -71,6 +71,7 @@ extern void remote_connection_xact_transition_end(TSConnection *conn); extern bool remote_connection_xact_is_transitioning(const TSConnection *conn); extern bool remote_connection_ping(const char *node_name); extern void remote_connection_close(TSConnection *conn); +extern PGresult *remote_connection_get_result(const TSConnection *conn); extern PGresult *remote_connection_exec(TSConnection *conn, const char *cmd); extern PGresult *remote_connection_execf(TSConnection *conn, const char *fmt, ...) pg_attribute_printf(2, 3); @@ -108,7 +109,7 @@ typedef enum TSConnectionStatus TSConnectionResult remote_connection_drain(TSConnection *conn, TimestampTz endtime, PGresult **result); -extern bool remote_connection_cancel_query(TSConnection *conn); +extern bool remote_connection_cancel_query(TSConnection *conn, const char **errmsg); extern PGconn *remote_connection_get_pg_conn(const TSConnection *conn); extern bool remote_connection_is_processing(const TSConnection *conn); extern void remote_connection_set_status(TSConnection *conn, TSConnectionStatus status); diff --git a/tsl/src/remote/copy_fetcher.c b/tsl/src/remote/copy_fetcher.c index 11ced7d22a9..89ef77ed667 100644 --- a/tsl/src/remote/copy_fetcher.c +++ b/tsl/src/remote/copy_fetcher.c @@ -113,7 +113,8 @@ copy_fetcher_send_fetch_request(DataFetcher *df) " Use cursor fetcher instead."))); } - PGresult *res = PQgetResult(remote_connection_get_pg_conn(fetcher->state.conn)); + PGresult *res = remote_connection_get_result(fetcher->state.conn); + if (res == NULL) { /* Shouldn't really happen but technically possible. */ @@ -251,7 +252,6 @@ copy_data_check_header(StringInfo copy_data) static void end_copy(CopyFetcher *fetcher, bool canceled) { - PGconn *conn = remote_connection_get_pg_conn(fetcher->state.conn); PGresult *final_pgres = NULL; PGresult *pgres = NULL; ExecStatusType received_status; @@ -259,7 +259,7 @@ end_copy(CopyFetcher *fetcher, bool canceled) Assert(fetcher->state.open); /* Read results until NULL */ - while ((pgres = PQgetResult(conn))) + while ((pgres = remote_connection_get_result(fetcher->state.conn))) { if (final_pgres == NULL) final_pgres = pgres; @@ -298,6 +298,7 @@ end_copy(CopyFetcher *fetcher, bool canceled) static void end_copy_before_eof(CopyFetcher *fetcher) { + const char *errormsg; /* * The fetcher state might not be open if the fetcher got initialized but * never executed due to executor constraints. @@ -306,7 +307,15 @@ end_copy_before_eof(CopyFetcher *fetcher) return; Assert(!fetcher->state.eof); - remote_connection_cancel_query(fetcher->state.conn); + + if (!remote_connection_cancel_query(fetcher->state.conn, &errormsg)) + { + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("could not cancel query"), + errdetail("%s", errormsg))); + } + end_copy(fetcher, true); } @@ -366,8 +375,9 @@ copy_fetcher_complete(CopyFetcher *fetcher) /* Note: it is possible to get EOF without having received the * file trailer in case there's e.g., a remote error. */ fetcher->state.eof = true; - /* Should read final result with PQgetResult() until it - * returns NULL. This happens below. */ + /* Should read final result with + * remote_connection_get_result() until it returns NULL. This + * happens below. */ break; } else if (copy_data.len == -2) @@ -406,7 +416,7 @@ copy_fetcher_complete(CopyFetcher *fetcher) /* Next PQgetCopyData() should return -1, indicating EOF and * that the remote side ended the copy. The final result * (PGRES_COMMAND_OK) should then be read with - * PQgetResult(). */ + * remote_connection_get_result(). */ } else { diff --git a/tsl/src/remote/dist_copy.c b/tsl/src/remote/dist_copy.c index 8fca21ce15b..7d2e05a6614 100644 --- a/tsl/src/remote/dist_copy.c +++ b/tsl/src/remote/dist_copy.c @@ -5,6 +5,7 @@ */ #include "dist_copy.h" +#include "remote/connection.h" #include #include #include @@ -524,7 +525,8 @@ end_copy_on_success(CopyConnectionState *state) */ Assert(PQisnonblocking(pg_conn)); - PGresult *res = PQgetResult(pg_conn); + PGresult *res = remote_connection_get_result(conn); + if (res == NULL) { /* diff --git a/tsl/src/remote/txn.c b/tsl/src/remote/txn.c index 6772ce9579b..3442cba9452 100644 --- a/tsl/src/remote/txn.c +++ b/tsl/src/remote/txn.c @@ -414,7 +414,17 @@ remote_txn_abort(RemoteTxn *entry) * If so, request cancellation of the command. */ if (PQtransactionStatus(remote_connection_get_pg_conn(entry->conn)) == PQTRANS_ACTIVE) - success = remote_connection_cancel_query(entry->conn); + { + const char *errormsg; + + success = remote_connection_cancel_query(entry->conn, &errormsg); + + if (!success) + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("could not cancel query"), + errdetail("%s", errormsg))); + } if (success) { @@ -632,9 +642,18 @@ remote_txn_sub_txn_abort(RemoteTxn *entry, int curlevel) * completed. Check to see if a command is still being processed by the * data node, and if so, request cancellation of the command. */ - if (PQtransactionStatus(pg_conn) == PQTRANS_ACTIVE && - !remote_connection_cancel_query(entry->conn)) - success = false; + if (PQtransactionStatus(pg_conn) == PQTRANS_ACTIVE) + { + const char *errormsg; + + success = remote_connection_cancel_query(entry->conn, &errormsg); + + if (!success) + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("could not cancel query"), + errdetail("%s", errormsg))); + } else { /* Rollback all remote subtransactions during abort */ diff --git a/tsl/test/expected/remote_connection.out b/tsl/test/expected/remote_connection.out index ac86e3d8812..7ce71f1909e 100644 --- a/tsl/test/expected/remote_connection.out +++ b/tsl/test/expected/remote_connection.out @@ -60,7 +60,7 @@ LOCATION: ts_test_bad_remote_query, connection.c:216 SELECT * FROM test.get_connection_stats(); connections_created | connections_closed | results_created | results_cleared ---------------------+--------------------+-----------------+----------------- - 1 | 1 | 8 | 8 + 1 | 1 | 7 | 7 (1 row) SELECT test.remote_connection_tests(); diff --git a/tsl/test/shared/expected/dist_queries.out b/tsl/test/shared/expected/dist_queries.out index 45b24a4863b..5eae1fd2519 100644 --- a/tsl/test/shared/expected/dist_queries.out +++ b/tsl/test/shared/expected/dist_queries.out @@ -1,6 +1,11 @@ -- This file and its contents are licensed under the Timescale License. -- Please see the included NOTICE for copyright information and -- LICENSE-TIMESCALE for a copy of the license. +\c :TEST_DBNAME :ROLE_SUPERUSER +-- Function for testing command execution on data nodes +CREATE OR REPLACE PROCEDURE test.data_node_exec(node_name NAME, command TEXT) +AS :TSL_MODULE_PATHNAME, 'ts_data_node_exec' LANGUAGE C; +SET ROLE :ROLE_DEFAULT_PERM_USER; -- Test DataNodeScan with subquery with one-time filter SELECT id @@ -77,3 +82,32 @@ SELECT * FROM matches_reference ORDER BY 1,2,3,4; (3 rows) ROLLBACK; +-- Test query/command cancelation with statement_timeout +SET statement_timeout=200; +-- Execute long-running query on data nodes +\set ON_ERROR_STOP 0 +-- distribute_exec() uses async functions +CALL distributed_exec('SELECT pg_sleep(200)'); +ERROR: canceling statement due to statement timeout +-- data_node_exec() directly calls PQexec-style functions +CALL test.data_node_exec('data_node_1', 'SELECT pg_sleep(200)'); +ERROR: canceling statement due to statement timeout +-- test weird parameters +CALL test.data_node_exec(NULL, 'SELECT pg_sleep(200)'); +ERROR: data node name cannot be NULL +CALL test.data_node_exec('-', 'SELECT pg_sleep(200)'); +ERROR: server "-" does not exist +CALL test.data_node_exec('data_node_1', NULL); +ERROR: command string cannot be NULL +RESET statement_timeout; +\set ON_ERROR_STOP 1 +-- Data node connections should be IDLE +SELECT node_name, database, connection_status, transaction_status, processing +FROM _timescaledb_internal.show_connection_cache() ORDER BY 1; + node_name | database | connection_status | transaction_status | processing +-------------+-------------+-------------------+--------------------+------------ + data_node_1 | data_node_1 | OK | IDLE | f + data_node_2 | data_node_2 | OK | IDLE | f + data_node_3 | data_node_3 | OK | IDLE | f +(3 rows) + diff --git a/tsl/test/shared/sql/CMakeLists.txt b/tsl/test/shared/sql/CMakeLists.txt index 9be3f894d01..daa20ebbb1a 100644 --- a/tsl/test/shared/sql/CMakeLists.txt +++ b/tsl/test/shared/sql/CMakeLists.txt @@ -8,7 +8,6 @@ set(TEST_FILES_SHARED dist_distinct_pushdown.sql dist_gapfill.sql dist_insert.sql - dist_queries.sql extension.sql subtract_integer_from_now.sql) @@ -27,8 +26,8 @@ if((${PG_VERSION_MAJOR} GREATER_EQUAL "14")) endif() if(CMAKE_BUILD_TYPE MATCHES Debug) - list(APPEND TEST_FILES_SHARED dist_parallel_agg.sql timestamp_limits.sql - with_clause_parser.sql) + list(APPEND TEST_FILES_SHARED dist_parallel_agg.sql dist_queries.sql + timestamp_limits.sql with_clause_parser.sql) list(APPEND TEST_TEMPLATES_SHARED constify_now.sql.in space_constraint.sql.in dist_remote_error.sql.in) endif(CMAKE_BUILD_TYPE MATCHES Debug) diff --git a/tsl/test/shared/sql/dist_queries.sql b/tsl/test/shared/sql/dist_queries.sql index e94ec51cac1..de2b06062ed 100644 --- a/tsl/test/shared/sql/dist_queries.sql +++ b/tsl/test/shared/sql/dist_queries.sql @@ -2,6 +2,12 @@ -- Please see the included NOTICE for copyright information and -- LICENSE-TIMESCALE for a copy of the license. +\c :TEST_DBNAME :ROLE_SUPERUSER +-- Function for testing command execution on data nodes +CREATE OR REPLACE PROCEDURE test.data_node_exec(node_name NAME, command TEXT) +AS :TSL_MODULE_PATHNAME, 'ts_data_node_exec' LANGUAGE C; +SET ROLE :ROLE_DEFAULT_PERM_USER; + -- Test DataNodeScan with subquery with one-time filter SELECT id @@ -56,3 +62,21 @@ SELECT 9, 'old trafford', 'MNU', 'MNC' WHERE NOT EXISTS (SELECT 1 FROM upsert); SELECT * FROM matches_reference ORDER BY 1,2,3,4; ROLLBACK; + +-- Test query/command cancelation with statement_timeout +SET statement_timeout=200; +-- Execute long-running query on data nodes +\set ON_ERROR_STOP 0 +-- distribute_exec() uses async functions +CALL distributed_exec('SELECT pg_sleep(200)'); +-- data_node_exec() directly calls PQexec-style functions +CALL test.data_node_exec('data_node_1', 'SELECT pg_sleep(200)'); +-- test weird parameters +CALL test.data_node_exec(NULL, 'SELECT pg_sleep(200)'); +CALL test.data_node_exec('-', 'SELECT pg_sleep(200)'); +CALL test.data_node_exec('data_node_1', NULL); +RESET statement_timeout; +\set ON_ERROR_STOP 1 +-- Data node connections should be IDLE +SELECT node_name, database, connection_status, transaction_status, processing +FROM _timescaledb_internal.show_connection_cache() ORDER BY 1; diff --git a/tsl/test/shared/sql/include/shared_setup.sql b/tsl/test/shared/sql/include/shared_setup.sql index 1bdc989f75d..700689287fb 100644 --- a/tsl/test/shared/sql/include/shared_setup.sql +++ b/tsl/test/shared/sql/include/shared_setup.sql @@ -91,6 +91,7 @@ GRANT USAGE ON FOREIGN SERVER data_node_1, data_node_2, data_node_3 TO PUBLIC; GRANT CREATE ON SCHEMA public TO :ROLE_DEFAULT_PERM_USER; -- set artificially low fetch_size to test fetcher behavior ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (ADD fetch_size '10'); + -- Import testsupport.sql file to data nodes \unset ECHO \o /dev/null @@ -312,4 +313,3 @@ VALUES -- Create a reference table (non hypertable) to compare results CREATE TABLE matches_reference (LIKE matches); INSERT INTO matches_reference SELECT * FROM matches; - diff --git a/tsl/test/src/data_node.c b/tsl/test/src/data_node.c index 3d5805315df..0247927980a 100644 --- a/tsl/test/src/data_node.c +++ b/tsl/test/src/data_node.c @@ -15,6 +15,7 @@ TS_FUNCTION_INFO_V1(ts_test_data_node_show); TS_FUNCTION_INFO_V1(ts_unchecked_add_data_node); +TS_FUNCTION_INFO_V1(ts_data_node_exec); /* * Tests the ts_data_node_get_node_name_list() function. @@ -96,3 +97,33 @@ ts_unchecked_add_data_node(PG_FUNCTION_ARGS) { return data_node_add_without_dist_id(fcinfo); } + +/* + * Execute a command on a data node. + * + * Mostly for debugging connection execution functions. + */ +Datum +ts_data_node_exec(PG_FUNCTION_ARGS) +{ + const char *nodename = PG_GETARG_CSTRING(0); + TSConnection *conn = data_node_get_connection(nodename, REMOTE_TXN_NO_PREP_STMT, true); + const text *cmdstr; + PGresult *res; + + if (PG_ARGISNULL(1)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("command string cannot be NULL"))); + + cmdstr = PG_GETARG_TEXT_P(1); + res = remote_connection_exec(conn, text_to_cstring(cmdstr)); + + if (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + remote_connection_elog(conn, ERROR); + } + + PG_RETURN_VOID(); +} diff --git a/tsl/test/src/remote/async.c b/tsl/test/src/remote/async.c index ee771ef4612..cb907eafb23 100644 --- a/tsl/test/src/remote/async.c +++ b/tsl/test/src/remote/async.c @@ -245,7 +245,7 @@ test_node_death() async_request_set_add_sql(set, conn, "SELECT 1"); remote_node_killer_kill(&rnk); - TestAssertTrue(false == remote_connection_cancel_query(conn)); + TestAssertTrue(false == remote_connection_cancel_query(conn, NULL)); /* do cancel query after seeing error */ conn = get_connection(); @@ -256,7 +256,7 @@ test_node_death() /* first we get error result */ TestEnsureError(async_request_set_wait_ok_result(set)); - TestAssertTrue(false == remote_connection_cancel_query(conn)); + TestAssertTrue(false == remote_connection_cancel_query(conn, NULL)); remote_connection_close(conn); } @@ -284,7 +284,7 @@ test_timeout() TestAssertTrue(async_response_get_type(response) == RESPONSE_TIMEOUT); /* cancel the locked query and do another query */ - TestAssertTrue(remote_connection_cancel_query(conn)); + TestAssertTrue(remote_connection_cancel_query(conn, NULL)); /* the txn is aborted waiting for abort */ TestEnsureError(async_request_wait_ok_result(async_request_send(conn, "SELECT 1;"))); async_request_wait_ok_command(async_request_send(conn, "ABORT;"));