diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a60c3f9047..3f30363599d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ accidentally triggering the load of a previous DB version.** **Features** * #5241 Allow RETURNING clause when inserting into compressed chunks * #5245 Mange life-cycle of connections via memory contexts +* #5246 Make connection establishment interruptible **Bugfixes** * #4804 Skip bucketing when start or end of refresh job is null diff --git a/tsl/src/data_node.c b/tsl/src/data_node.c index be03d46baab..7c5dcaee915 100644 --- a/tsl/src/data_node.c +++ b/tsl/src/data_node.c @@ -510,17 +510,19 @@ data_node_bootstrap_extension(TSConnection *conn) quote_literal_cstr(EXTENSION_NAME)); if (PQresultStatus(res) != PGRES_TUPLES_OK) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res)))); + remote_result_elog(res, ERROR); if (PQntuples(res) == 0) { + remote_result_close(res); + if (schema_oid != PG_PUBLIC_NAMESPACE) { - PGresult *res = remote_connection_execf(conn, - "CREATE SCHEMA %s AUTHORIZATION %s", - schema_name_quoted, - quote_identifier(username)); + res = remote_connection_execf(conn, + "CREATE SCHEMA %s AUTHORIZATION %s", + schema_name_quoted, + quote_identifier(username)); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { const char *const sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); @@ -528,6 +530,8 @@ data_node_bootstrap_extension(TSConnection *conn) (sqlstate && strcmp(sqlstate, ERRCODE_DUPLICATE_SCHEMA_STR) == 0); if (!schema_exists) remote_result_elog(res, ERROR); + + remote_result_close(res); /* If the schema already existed on the remote node, we got a * duplicate schema error and the schema was not created. In * that case, we log an error with a hint on how to fix the @@ -538,6 +542,8 @@ data_node_bootstrap_extension(TSConnection *conn) errhint("Make sure that the data node does not contain any " "existing objects prior to adding it."))); } + + remote_result_close(res); } remote_connection_cmdf_ok(conn, @@ -556,6 +562,7 @@ data_node_bootstrap_extension(TSConnection *conn) PQhost(remote_connection_get_pg_conn(conn)), PQport(remote_connection_get_pg_conn(conn)), PQgetvalue(res, 0, 1)))); + remote_result_close(res); data_node_validate_extension(conn); return false; } @@ -592,7 +599,7 @@ connect_for_bootstrapping(const char *node_name, const char *const host, int32 p { List *node_options = create_data_node_options(host, port, bootstrap_databases[i], username, password); - conn = remote_connection_open_with_options_nothrow(node_name, node_options, &err); + conn = remote_connection_open(node_name, node_options, &err); if (conn) return conn; @@ -635,7 +642,8 @@ data_node_validate_extension_availability(TSConnection *conn) if (PQresultStatus(res) != PGRES_TUPLES_OK) ereport(ERROR, - (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res)))); + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("failed to validate remote extension: %s", PQresultErrorMessage(res)))); if (PQntuples(res) == 0) ereport(ERROR, @@ -788,7 +796,7 @@ data_node_add_internal(PG_FUNCTION_ARGS, bool set_distid) * necessary. Instead using a more straightforward approach here since * we do not need 2PC support. */ node_options = create_data_node_options(host, port, dbname, username, password); - conn = remote_connection_open_with_options(node_name, node_options, false); + conn = remote_connection_open_session(node_name, node_options, false); Assert(NULL != conn); remote_connection_cmd_ok(conn, "BEGIN"); @@ -1771,7 +1779,7 @@ drop_data_node_database(const ForeignServer *server) server = data_node_get_foreign_server(nodename, ACL_USAGE, true, false); /* Open a connection to the bootstrap database using the new server options */ conn_options = remote_connection_prepare_auth_options(server, userid); - conn = remote_connection_open_with_options_nothrow(nodename, conn_options, &err); + conn = remote_connection_open(nodename, conn_options, &err); if (NULL != conn) break; diff --git a/tsl/src/remote/connection.c b/tsl/src/remote/connection.c index 9896b98c6e1..0f31c6f0157 100644 --- a/tsl/src/remote/connection.c +++ b/tsl/src/remote/connection.c @@ -1365,27 +1365,35 @@ setup_full_connection_options(List *connection_options, const char ***all_keywor } /* - * This will only open a connection to a specific node, but not do anything - * else. In particular, it will not perform any validation nor configure the - * connection since it cannot know that it connects to a data node database or - * not. For that, please use the `remote_connection_open_with_options` - * function. + * Open a connection and assign it the given node name. + * + * This will only open a connection to a specific node, but not do any other + * session initialization. In particular, it will not perform any validation + * nor configure the connection since it cannot know that it connects to a + * data node database or not. For that, please use the + * `remote_connection_open_session` function. + * + * The connection's life-cycle is tied to the current memory context via its + * delete callback. As a result, the connection will be automatically closed + * and freed when the memory context is deleted. + * + * This function does not (and should not) throw (PostgreSQL) errors. Instead, + * an error message is optionally returned via the "errmsg" parameter. */ TSConnection * -remote_connection_open_with_options_nothrow(const char *node_name, List *connection_options, - char **errmsg) +remote_connection_open(const char *node_name, List *connection_options, char **errmsg) { - PGconn *volatile pg_conn = NULL; - TSConnection *ts_conn; + PGconn *pg_conn = NULL; + TSConnection *ts_conn = NULL; const char **keywords; const char **values; + PostgresPollingStatusType status; if (NULL != errmsg) *errmsg = NULL; setup_full_connection_options(connection_options, &keywords, &values); - - pg_conn = PQconnectdbParams(keywords, values, 0 /* Do not expand dbname param */); + pg_conn = PQconnectStartParams(keywords, values, 0 /* Do not expand dbname param */); /* Cast to (char **) to silence warning with MSVC compiler */ pfree((char **) keywords); @@ -1394,6 +1402,63 @@ remote_connection_open_with_options_nothrow(const char *node_name, List *connect if (NULL == pg_conn) return NULL; + if (PQstatus(pg_conn) == CONNECTION_BAD) + { + finish_connection(pg_conn, errmsg); + return NULL; + } + + status = PGRES_POLLING_WRITING; + + do + { + int io_flag; + int rc; + + if (status == PGRES_POLLING_READING) + io_flag = WL_SOCKET_READABLE; +#ifdef WIN32 + /* Windows needs a different test while waiting for connection-made */ + else if (PQstatus(pg_conn) == CONNECTION_STARTED) + io_flag = WL_SOCKET_CONNECTED; +#endif + else + io_flag = WL_SOCKET_WRITEABLE; + /* + * Wait for latch or socket event. Note that it is not possible to + * reuse a WaitEventSet using the same socket file descriptor in each + * iteration of the loop since PQconnectPoll() might change the file + * descriptor across calls. Therefore, it is important to create a new + * WaitEventSet in every iteration of the loop and retreiving the + * correct file descriptor (socket) with PQsocket(). + */ + rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, + PQsocket(pg_conn), + 0, + PG_WAIT_EXTENSION); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (rc & io_flag) + { + /* + * PQconnectPoll() is supposed to be non-blocking, but it + * isn't. PQconnectPoll() will internally try to send a startup + * packet and do DNS lookups (if necessary) and can therefore + * block. So, if there is a network issue (e.g., black hole + * routing) the connection attempt will hang on + * PQconnectPoll(). There's nothing that can be done about it, + * unless the blocking operations are moved out of PQconnectPoll() + * and integrated with the wait loop. + */ + status = PQconnectPoll(pg_conn); + } + } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); + if (PQstatus(pg_conn) != CONNECTION_OK) { finish_connection(pg_conn, errmsg); @@ -1409,22 +1474,15 @@ remote_connection_open_with_options_nothrow(const char *node_name, List *connect } /* - * Opens a connection. + * Open a connection to a data node and perform basic session initialization. * - * Raw connections are not part of the transaction and do not have transactions - * auto-started. They must be explicitly closed by - * remote_connection_close. Note that connections are allocated using malloc - * and so if you do not call remote_connection_close, you'll have a memory - * leak. Note that the connection cache handles all of this for you so use - * that if you can. + * This function will raise errors on failures. */ TSConnection * -remote_connection_open_with_options(const char *node_name, List *connection_options, - bool set_dist_id) +remote_connection_open_session(const char *node_name, List *connection_options, bool set_dist_id) { char *err = NULL; - TSConnection *conn = - remote_connection_open_with_options_nothrow(node_name, connection_options, &err); + TSConnection *conn = remote_connection_open(node_name, connection_options, &err); if (NULL == conn) ereport(ERROR, @@ -1477,6 +1535,15 @@ remote_connection_open_with_options(const char *node_name, List *connection_opti return conn; } +TSConnection * +remote_connection_open_session_by_id(TSConnectionId id) +{ + ForeignServer *server = GetForeignServer(id.server_id); + List *connection_options = remote_connection_prepare_auth_options(server, id.user_id); + + return remote_connection_open_session(server->servername, connection_options, true); +} + /* * Based on PG's GetUserMapping, but this version does not fail when a user * mapping is not found. @@ -1647,86 +1714,34 @@ remote_connection_get_connstr(const char *node_name) return connstr_escape.data; } -TSConnection * -remote_connection_open_by_id(TSConnectionId id) -{ - ForeignServer *server = GetForeignServer(id.server_id); - List *connection_options = remote_connection_prepare_auth_options(server, id.user_id); - - return remote_connection_open_with_options(server->servername, connection_options, true); -} - -TSConnection * -remote_connection_open(Oid server_id, Oid user_id) -{ - TSConnectionId id = remote_connection_id(server_id, user_id); - - return remote_connection_open_by_id(id); -} +#define PING_QUERY "SELECT 1" -/* - * Open a connection without throwing and error. - * - * Returns the connection pointer on success. On failure NULL is returned and - * the errmsg (if given) is used to return an error message. - */ -TSConnection * -remote_connection_open_nothrow(Oid server_id, Oid user_id, char **errmsg) +bool +remote_connection_ping(const char *node_name) { + Oid server_id = get_foreign_server_oid(node_name, false); ForeignServer *server = GetForeignServer(server_id); Oid fdwid = get_foreign_data_wrapper_oid(EXTENSION_FDW_NAME, false); List *connection_options; TSConnection *conn; + bool success = false; if (server->fdwid != fdwid) { elog(WARNING, "invalid node type for \"%s\"", server->servername); - return NULL; - } - - connection_options = remote_connection_prepare_auth_options(server, user_id); - conn = - remote_connection_open_with_options_nothrow(server->servername, connection_options, errmsg); - - if (NULL == conn) - { - if (NULL != errmsg && NULL == *errmsg) - *errmsg = "internal connection error"; - return NULL; - } - - if (PQstatus(conn->pg_conn) != CONNECTION_OK || !remote_connection_set_peer_dist_id(conn)) - { - if (NULL != errmsg) - *errmsg = pchomp(PQerrorMessage(conn->pg_conn)); - remote_connection_close(conn); - return NULL; + return false; } - return conn; -} - -#define PING_QUERY "SELECT 1" - -bool -remote_connection_ping(const char *node_name) -{ - Oid server_id = get_foreign_server_oid(node_name, false); - TSConnection *conn = remote_connection_open_nothrow(server_id, GetUserId(), NULL); - bool success = false; + connection_options = remote_connection_prepare_auth_options(server, GetUserId()); + conn = remote_connection_open(server->servername, connection_options, NULL); if (NULL == conn) return false; if (PQstatus(conn->pg_conn) == CONNECTION_OK) { - if (1 == PQsendQuery(conn->pg_conn, PING_QUERY)) - { - PGresult *res = PQgetResult(conn->pg_conn); - - success = (PQresultStatus(res) == PGRES_TUPLES_OK); - PQclear(res); - } + PGresult *res = remote_connection_exec(conn, PING_QUERY); + success = (PQresultStatus(res) == PGRES_TUPLES_OK); } remote_connection_close(conn); diff --git a/tsl/src/remote/connection.h b/tsl/src/remote/connection.h index 884406c5e68..a31f5f5b16e 100644 --- a/tsl/src/remote/connection.h +++ b/tsl/src/remote/connection.h @@ -57,15 +57,11 @@ typedef struct TSConnectionError * `remote_connection_cache_get_connection` instead. Must be closed with * `remote_connection_close` */ -extern TSConnection *remote_connection_open_with_options(const char *node_name, - List *connection_options, - bool set_dist_id); -extern TSConnection *remote_connection_open_with_options_nothrow(const char *node_name, - List *connection_options, - char **errmsg); -extern TSConnection *remote_connection_open_by_id(TSConnectionId id); -extern TSConnection *remote_connection_open(Oid server_id, Oid user_id); -extern TSConnection *remote_connection_open_nothrow(Oid server_id, Oid user_id, char **errmsg); +extern TSConnection *remote_connection_open(const char *node_name, List *connection_options, + char **errmsg); +extern TSConnection *remote_connection_open_session(const char *node_name, List *connection_options, + bool set_dist_id); +extern TSConnection *remote_connection_open_session_by_id(TSConnectionId id); extern List *remote_connection_prepare_auth_options(const ForeignServer *server, Oid user_id); extern int remote_connection_xact_depth_get(const TSConnection *conn); extern int remote_connection_xact_depth_inc(TSConnection *conn); diff --git a/tsl/src/remote/connection_cache.c b/tsl/src/remote/connection_cache.c index c148a9b075a..cbdf2b3711f 100644 --- a/tsl/src/remote/connection_cache.c +++ b/tsl/src/remote/connection_cache.c @@ -164,7 +164,7 @@ connection_cache_create_entry(Cache *cache, CacheQuery *query) * remote_connection_close at cleanup is critical. */ old = MemoryContextSwitchTo(ts_cache_memory_ctx(cache)); - entry->conn = remote_connection_open_by_id(*id); + entry->conn = remote_connection_open_session_by_id(*id); MemoryContextSwitchTo(old); /* Set the hash values of the foreign server and role for cache diff --git a/tsl/src/remote/txn_resolve.c b/tsl/src/remote/txn_resolve.c index 7b1f69534b1..4e1ee4bf342 100644 --- a/tsl/src/remote/txn_resolve.c +++ b/tsl/src/remote/txn_resolve.c @@ -61,7 +61,8 @@ Datum remote_txn_heal_data_node(PG_FUNCTION_ARGS) { Oid foreign_server_oid = PG_GETARG_OID(0); - TSConnection *conn = remote_connection_open(foreign_server_oid, GetUserId()); + TSConnectionId id = remote_connection_id(foreign_server_oid, GetUserId()); + TSConnection *conn = remote_connection_open_session_by_id(id); int resolved = 0; /* diff --git a/tsl/test/expected/remote_connection.out b/tsl/test/expected/remote_connection.out index ddc03afd7c4..ac86e3d8812 100644 --- a/tsl/test/expected/remote_connection.out +++ b/tsl/test/expected/remote_connection.out @@ -55,7 +55,7 @@ SELECT * FROM test.get_connection_stats(); \set ON_ERROR_STOP 0 SELECT test.send_remote_query_that_generates_exception(); ERROR: XX000: bad query error thrown from test -LOCATION: ts_test_bad_remote_query, connection.c:206 +LOCATION: ts_test_bad_remote_query, connection.c:216 \set ON_ERROR_STOP 1 SELECT * FROM test.get_connection_stats(); connections_created | connections_closed | results_created | results_cleared diff --git a/tsl/test/src/remote/connection.c b/tsl/test/src/remote/connection.c index 28443efc182..a5e6ae862be 100644 --- a/tsl/test/src/remote/connection.c +++ b/tsl/test/src/remote/connection.c @@ -28,17 +28,27 @@ TSConnection * get_connection() { - return remote_connection_open_with_options( - "testdb", - list_make4(makeDefElem("user", - (Node *) makeString(GetUserNameFromId(GetUserId(), false)), - -1), - makeDefElem("host", (Node *) makeString("localhost"), -1), - makeDefElem("dbname", (Node *) makeString(get_database_name(MyDatabaseId)), -1), - makeDefElem("port", - (Node *) makeString(pstrdup(GetConfigOption("port", false, false))), - -1)), - false); + return remote_connection_open_session("testdb", + list_make4(makeDefElem("user", + (Node *) makeString( + GetUserNameFromId(GetUserId(), + false)), + -1), + makeDefElem("host", + (Node *) makeString("localhost"), + -1), + makeDefElem("dbname", + (Node *) makeString( + get_database_name( + MyDatabaseId)), + -1), + makeDefElem("port", + (Node *) makeString(pstrdup( + GetConfigOption("port", + false, + false))), + -1)), + false); } static void