Skip to content

Commit

Permalink
Make connection establishment interruptible
Browse files Browse the repository at this point in the history
Refactor the data node connection establishment so that it is
interruptible, e.g., by ctrl-c or `statement_timeout`.

Previously, the connection establishment used blocking libpq calls. By
instead using asynchronous connection APIs and integrating with
PostgreSQL interrupt handling, the connection establishment can be
canceled by an interrupt caused by a statement timeout or a user.

Fixes #2757
  • Loading branch information
erimatnor committed Jan 30, 2023
1 parent 01ea255 commit 5d12a38
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 117 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ accidentally triggering the load of a previous DB version.**
**Features**
* #5241 Allow RETURNING clause when inserting into compressed chunks
* #5245 Mange life-cycle of connections via memory contexts
* #5246 Make connection establishment interruptible

**Bugfixes**
* #4804 Skip bucketing when start or end of refresh job is null
Expand Down
28 changes: 18 additions & 10 deletions tsl/src/data_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -510,24 +510,28 @@ data_node_bootstrap_extension(TSConnection *conn)
quote_literal_cstr(EXTENSION_NAME));

if (PQresultStatus(res) != PGRES_TUPLES_OK)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
remote_result_elog(res, ERROR);

if (PQntuples(res) == 0)
{
remote_result_close(res);

if (schema_oid != PG_PUBLIC_NAMESPACE)
{
PGresult *res = remote_connection_execf(conn,
"CREATE SCHEMA %s AUTHORIZATION %s",
schema_name_quoted,
quote_identifier(username));
res = remote_connection_execf(conn,
"CREATE SCHEMA %s AUTHORIZATION %s",
schema_name_quoted,
quote_identifier(username));

if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
const char *const sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
bool schema_exists =
(sqlstate && strcmp(sqlstate, ERRCODE_DUPLICATE_SCHEMA_STR) == 0);
if (!schema_exists)
remote_result_elog(res, ERROR);

remote_result_close(res);
/* If the schema already existed on the remote node, we got a
* duplicate schema error and the schema was not created. In
* that case, we log an error with a hint on how to fix the
Expand All @@ -538,6 +542,8 @@ data_node_bootstrap_extension(TSConnection *conn)
errhint("Make sure that the data node does not contain any "
"existing objects prior to adding it.")));
}

remote_result_close(res);
}

remote_connection_cmdf_ok(conn,
Expand All @@ -556,6 +562,7 @@ data_node_bootstrap_extension(TSConnection *conn)
PQhost(remote_connection_get_pg_conn(conn)),
PQport(remote_connection_get_pg_conn(conn)),
PQgetvalue(res, 0, 1))));
remote_result_close(res);
data_node_validate_extension(conn);
return false;
}
Expand Down Expand Up @@ -592,7 +599,7 @@ connect_for_bootstrapping(const char *node_name, const char *const host, int32 p
{
List *node_options =
create_data_node_options(host, port, bootstrap_databases[i], username, password);
conn = remote_connection_open_with_options_nothrow(node_name, node_options, &err);
conn = remote_connection_open(node_name, node_options, &err);

if (conn)
return conn;
Expand Down Expand Up @@ -635,7 +642,8 @@ data_node_validate_extension_availability(TSConnection *conn)

if (PQresultStatus(res) != PGRES_TUPLES_OK)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("failed to validate remote extension: %s", PQresultErrorMessage(res))));

if (PQntuples(res) == 0)
ereport(ERROR,
Expand Down Expand Up @@ -788,7 +796,7 @@ data_node_add_internal(PG_FUNCTION_ARGS, bool set_distid)
* necessary. Instead using a more straightforward approach here since
* we do not need 2PC support. */
node_options = create_data_node_options(host, port, dbname, username, password);
conn = remote_connection_open_with_options(node_name, node_options, false);
conn = remote_connection_open_session(node_name, node_options, false);
Assert(NULL != conn);
remote_connection_cmd_ok(conn, "BEGIN");

Expand Down Expand Up @@ -1771,7 +1779,7 @@ drop_data_node_database(const ForeignServer *server)
server = data_node_get_foreign_server(nodename, ACL_USAGE, true, false);
/* Open a connection to the bootstrap database using the new server options */
conn_options = remote_connection_prepare_auth_options(server, userid);
conn = remote_connection_open_with_options_nothrow(nodename, conn_options, &err);
conn = remote_connection_open(nodename, conn_options, &err);

if (NULL != conn)
break;
Expand Down
183 changes: 99 additions & 84 deletions tsl/src/remote/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1365,27 +1365,35 @@ setup_full_connection_options(List *connection_options, const char ***all_keywor
}

/*
* This will only open a connection to a specific node, but not do anything
* else. In particular, it will not perform any validation nor configure the
* connection since it cannot know that it connects to a data node database or
* not. For that, please use the `remote_connection_open_with_options`
* function.
* Open a connection and assign it the given node name.
*
* This will only open a connection to a specific node, but not do any other
* session initialization. In particular, it will not perform any validation
* nor configure the connection since it cannot know that it connects to a
* data node database or not. For that, please use the
* `remote_connection_open_session` function.
*
* The connection's life-cycle is tied to the current memory context via its
* delete callback. As a result, the connection will be automatically closed
* and freed when the memory context is deleted.
*
* This function does not (and should not) throw (PostgreSQL) errors. Instead,
* an error message is optionally returned via the "errmsg" parameter.
*/
TSConnection *
remote_connection_open_with_options_nothrow(const char *node_name, List *connection_options,
char **errmsg)
remote_connection_open(const char *node_name, List *connection_options, char **errmsg)
{
PGconn *volatile pg_conn = NULL;
TSConnection *ts_conn;
PGconn *pg_conn = NULL;
TSConnection *ts_conn = NULL;
const char **keywords;
const char **values;
PostgresPollingStatusType status;

if (NULL != errmsg)
*errmsg = NULL;

setup_full_connection_options(connection_options, &keywords, &values);

pg_conn = PQconnectdbParams(keywords, values, 0 /* Do not expand dbname param */);
pg_conn = PQconnectStartParams(keywords, values, 0 /* Do not expand dbname param */);

/* Cast to (char **) to silence warning with MSVC compiler */
pfree((char **) keywords);
Expand All @@ -1394,6 +1402,63 @@ remote_connection_open_with_options_nothrow(const char *node_name, List *connect
if (NULL == pg_conn)
return NULL;

if (PQstatus(pg_conn) == CONNECTION_BAD)
{
finish_connection(pg_conn, errmsg);
return NULL;
}

status = PGRES_POLLING_WRITING;

do
{
int io_flag;
int rc;

if (status == PGRES_POLLING_READING)
io_flag = WL_SOCKET_READABLE;
#ifdef WIN32
/* Windows needs a different test while waiting for connection-made */
else if (PQstatus(pg_conn) == CONNECTION_STARTED)
io_flag = WL_SOCKET_CONNECTED;
#endif
else
io_flag = WL_SOCKET_WRITEABLE;
/*
* Wait for latch or socket event. Note that it is not possible to
* reuse a WaitEventSet using the same socket file descriptor in each
* iteration of the loop since PQconnectPoll() might change the file
* descriptor across calls. Therefore, it is important to create a new
* WaitEventSet in every iteration of the loop and retreiving the
* correct file descriptor (socket) with PQsocket().
*/
rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
PQsocket(pg_conn),
0,
PG_WAIT_EXTENSION);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}

if (rc & io_flag)
{
/*
* PQconnectPoll() is supposed to be non-blocking, but it
* isn't. PQconnectPoll() will internally try to send a startup
* packet and do DNS lookups (if necessary) and can therefore
* block. So, if there is a network issue (e.g., black hole
* routing) the connection attempt will hang on
* PQconnectPoll(). There's nothing that can be done about it,
* unless the blocking operations are moved out of PQconnectPoll()
* and integrated with the wait loop.
*/
status = PQconnectPoll(pg_conn);
}
} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);

if (PQstatus(pg_conn) != CONNECTION_OK)
{
finish_connection(pg_conn, errmsg);
Expand All @@ -1409,22 +1474,15 @@ remote_connection_open_with_options_nothrow(const char *node_name, List *connect
}

/*
* Opens a connection.
* Open a connection to a data node and perform basic session initialization.
*
* Raw connections are not part of the transaction and do not have transactions
* auto-started. They must be explicitly closed by
* remote_connection_close. Note that connections are allocated using malloc
* and so if you do not call remote_connection_close, you'll have a memory
* leak. Note that the connection cache handles all of this for you so use
* that if you can.
* This function will raise errors on failures.
*/
TSConnection *
remote_connection_open_with_options(const char *node_name, List *connection_options,
bool set_dist_id)
remote_connection_open_session(const char *node_name, List *connection_options, bool set_dist_id)
{
char *err = NULL;
TSConnection *conn =
remote_connection_open_with_options_nothrow(node_name, connection_options, &err);
TSConnection *conn = remote_connection_open(node_name, connection_options, &err);

if (NULL == conn)
ereport(ERROR,
Expand Down Expand Up @@ -1477,6 +1535,15 @@ remote_connection_open_with_options(const char *node_name, List *connection_opti
return conn;
}

TSConnection *
remote_connection_open_session_by_id(TSConnectionId id)
{
ForeignServer *server = GetForeignServer(id.server_id);
List *connection_options = remote_connection_prepare_auth_options(server, id.user_id);

return remote_connection_open_session(server->servername, connection_options, true);
}

/*
* Based on PG's GetUserMapping, but this version does not fail when a user
* mapping is not found.
Expand Down Expand Up @@ -1647,86 +1714,34 @@ remote_connection_get_connstr(const char *node_name)
return connstr_escape.data;
}

TSConnection *
remote_connection_open_by_id(TSConnectionId id)
{
ForeignServer *server = GetForeignServer(id.server_id);
List *connection_options = remote_connection_prepare_auth_options(server, id.user_id);

return remote_connection_open_with_options(server->servername, connection_options, true);
}

TSConnection *
remote_connection_open(Oid server_id, Oid user_id)
{
TSConnectionId id = remote_connection_id(server_id, user_id);

return remote_connection_open_by_id(id);
}
#define PING_QUERY "SELECT 1"

/*
* Open a connection without throwing and error.
*
* Returns the connection pointer on success. On failure NULL is returned and
* the errmsg (if given) is used to return an error message.
*/
TSConnection *
remote_connection_open_nothrow(Oid server_id, Oid user_id, char **errmsg)
bool
remote_connection_ping(const char *node_name)
{
Oid server_id = get_foreign_server_oid(node_name, false);
ForeignServer *server = GetForeignServer(server_id);
Oid fdwid = get_foreign_data_wrapper_oid(EXTENSION_FDW_NAME, false);
List *connection_options;
TSConnection *conn;
bool success = false;

if (server->fdwid != fdwid)
{
elog(WARNING, "invalid node type for \"%s\"", server->servername);
return NULL;
}

connection_options = remote_connection_prepare_auth_options(server, user_id);
conn =
remote_connection_open_with_options_nothrow(server->servername, connection_options, errmsg);

if (NULL == conn)
{
if (NULL != errmsg && NULL == *errmsg)
*errmsg = "internal connection error";
return NULL;
}

if (PQstatus(conn->pg_conn) != CONNECTION_OK || !remote_connection_set_peer_dist_id(conn))
{
if (NULL != errmsg)
*errmsg = pchomp(PQerrorMessage(conn->pg_conn));
remote_connection_close(conn);
return NULL;
return false;
}

return conn;
}

#define PING_QUERY "SELECT 1"

bool
remote_connection_ping(const char *node_name)
{
Oid server_id = get_foreign_server_oid(node_name, false);
TSConnection *conn = remote_connection_open_nothrow(server_id, GetUserId(), NULL);
bool success = false;
connection_options = remote_connection_prepare_auth_options(server, GetUserId());
conn = remote_connection_open(server->servername, connection_options, NULL);

if (NULL == conn)
return false;

if (PQstatus(conn->pg_conn) == CONNECTION_OK)
{
if (1 == PQsendQuery(conn->pg_conn, PING_QUERY))
{
PGresult *res = PQgetResult(conn->pg_conn);

success = (PQresultStatus(res) == PGRES_TUPLES_OK);
PQclear(res);
}
PGresult *res = remote_connection_exec(conn, PING_QUERY);
success = (PQresultStatus(res) == PGRES_TUPLES_OK);
}

remote_connection_close(conn);
Expand Down
14 changes: 5 additions & 9 deletions tsl/src/remote/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,11 @@ typedef struct TSConnectionError
* `remote_connection_cache_get_connection` instead. Must be closed with
* `remote_connection_close`
*/
extern TSConnection *remote_connection_open_with_options(const char *node_name,
List *connection_options,
bool set_dist_id);
extern TSConnection *remote_connection_open_with_options_nothrow(const char *node_name,
List *connection_options,
char **errmsg);
extern TSConnection *remote_connection_open_by_id(TSConnectionId id);
extern TSConnection *remote_connection_open(Oid server_id, Oid user_id);
extern TSConnection *remote_connection_open_nothrow(Oid server_id, Oid user_id, char **errmsg);
extern TSConnection *remote_connection_open(const char *node_name, List *connection_options,
char **errmsg);
extern TSConnection *remote_connection_open_session(const char *node_name, List *connection_options,
bool set_dist_id);
extern TSConnection *remote_connection_open_session_by_id(TSConnectionId id);
extern List *remote_connection_prepare_auth_options(const ForeignServer *server, Oid user_id);
extern int remote_connection_xact_depth_get(const TSConnection *conn);
extern int remote_connection_xact_depth_inc(TSConnection *conn);
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/remote/connection_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ connection_cache_create_entry(Cache *cache, CacheQuery *query)
* remote_connection_close at cleanup is critical.
*/
old = MemoryContextSwitchTo(ts_cache_memory_ctx(cache));
entry->conn = remote_connection_open_by_id(*id);
entry->conn = remote_connection_open_session_by_id(*id);
MemoryContextSwitchTo(old);

/* Set the hash values of the foreign server and role for cache
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/remote/txn_resolve.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ Datum
remote_txn_heal_data_node(PG_FUNCTION_ARGS)
{
Oid foreign_server_oid = PG_GETARG_OID(0);
TSConnection *conn = remote_connection_open(foreign_server_oid, GetUserId());
TSConnectionId id = remote_connection_id(foreign_server_oid, GetUserId());
TSConnection *conn = remote_connection_open_session_by_id(id);
int resolved = 0;

/*
Expand Down
2 changes: 1 addition & 1 deletion tsl/test/expected/remote_connection.out
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ SELECT * FROM test.get_connection_stats();
\set ON_ERROR_STOP 0
SELECT test.send_remote_query_that_generates_exception();
ERROR: XX000: bad query error thrown from test
LOCATION: ts_test_bad_remote_query, connection.c:206
LOCATION: ts_test_bad_remote_query, connection.c:216
\set ON_ERROR_STOP 1
SELECT * FROM test.get_connection_stats();
connections_created | connections_closed | results_created | results_cleared
Expand Down
Loading

0 comments on commit 5d12a38

Please sign in to comment.