Skip to content

Commit

Permalink
Make data node calls non-blocking and interruptable
Browse files Browse the repository at this point in the history
Refactor the use of libpq calls to data nodes so that they honor
PostgreSQL interrupt handling (e.g., ctrl-c or `statement_timeout`)
and don't block unnecessarily.

To implement this behavior, data node connections are made
non-blocking by default and all `libpq` functions are wrapped to
integrate with PostgreSQL's signal handling (via `WaitEventSets`) when
waiting for read or write readiness.

A change is also made to the life-cycle management of `libpq` objects,
including connections, and remote query results. Instead of tying
these to transactions, they are now tied to the life-cycle of memory
contexts using a callback. In most cases, the memory context a
connection is allocated on has the same lifetime as transactions, but
not always. For example, the connection cache lives across connections
and is using a longer lived memory context. Previously that was
handled as a special case where connections were marked to not auto
close on transaction end.

Closes timescale#4958, timescale#2757
  • Loading branch information
erimatnor committed Dec 8, 2022
1 parent c76dfa0 commit c8bc867
Show file tree
Hide file tree
Showing 16 changed files with 892 additions and 667 deletions.
4 changes: 1 addition & 3 deletions src/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ extern TSDLLEXPORT void ts_cache_init(Cache *cache);
extern TSDLLEXPORT void ts_cache_invalidate(Cache *cache);
extern TSDLLEXPORT void *ts_cache_fetch(Cache *cache, CacheQuery *query);
extern TSDLLEXPORT bool ts_cache_remove(Cache *cache, void *key);

extern MemoryContext ts_cache_memory_ctx(Cache *cache);

extern TSDLLEXPORT MemoryContext ts_cache_memory_ctx(Cache *cache);
extern TSDLLEXPORT Cache *ts_cache_pin(Cache *cache);
extern TSDLLEXPORT int ts_cache_release(Cache *cache);

Expand Down
23 changes: 16 additions & 7 deletions tsl/src/data_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -510,24 +510,28 @@ data_node_bootstrap_extension(TSConnection *conn)
quote_literal_cstr(EXTENSION_NAME));

if (PQresultStatus(res) != PGRES_TUPLES_OK)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
remote_result_elog(res, ERROR);

if (PQntuples(res) == 0)
{
remote_result_close(res);

if (schema_oid != PG_PUBLIC_NAMESPACE)
{
PGresult *res = remote_connection_execf(conn,
"CREATE SCHEMA %s AUTHORIZATION %s",
schema_name_quoted,
quote_identifier(username));
res = remote_connection_execf(conn,
"CREATE SCHEMA %s AUTHORIZATION %s",
schema_name_quoted,
quote_identifier(username));

if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
const char *const sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
bool schema_exists =
(sqlstate && strcmp(sqlstate, ERRCODE_DUPLICATE_SCHEMA_STR) == 0);
if (!schema_exists)
remote_result_elog(res, ERROR);

remote_result_close(res);
/* If the schema already existed on the remote node, we got a
* duplicate schema error and the schema was not created. In
* that case, we log an error with a hint on how to fix the
Expand All @@ -538,6 +542,8 @@ data_node_bootstrap_extension(TSConnection *conn)
errhint("Make sure that the data node does not contain any "
"existing objects prior to adding it.")));
}

remote_result_close(res);
}

remote_connection_cmdf_ok(conn,
Expand All @@ -556,6 +562,7 @@ data_node_bootstrap_extension(TSConnection *conn)
PQhost(remote_connection_get_pg_conn(conn)),
PQport(remote_connection_get_pg_conn(conn)),
PQgetvalue(res, 0, 1))));
remote_result_close(res);
data_node_validate_extension(conn);
return false;
}
Expand Down Expand Up @@ -635,7 +642,8 @@ data_node_validate_extension_availability(TSConnection *conn)

if (PQresultStatus(res) != PGRES_TUPLES_OK)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("failed to validate remote extension: %s", PQresultErrorMessage(res))));

if (PQntuples(res) == 0)
ereport(ERROR,
Expand Down Expand Up @@ -1450,6 +1458,7 @@ create_alter_data_node_tuple(TupleDesc tupdesc, const char *node_name, List *opt
MemSet(nulls, false, sizeof(nulls));

values[AttrNumberGetAttrOffset(Anum_alter_data_node_node_name)] = CStringGetDatum(node_name);
values[AttrNumberGetAttrOffset(Anum_alter_data_node_available)] = BoolGetDatum(true);

foreach (lc, options)
{
Expand Down
57 changes: 24 additions & 33 deletions tsl/src/remote/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ async_request_set_state(AsyncRequest *req, AsyncRequestState new_state)
static AsyncRequest *
async_request_send_internal(AsyncRequest *req, int elevel)
{
int ret = 0;

if (req->state != DEFERRED)
elog(elevel, "can't send async request in state \"%d\"", req->state);

Expand All @@ -170,39 +172,30 @@ async_request_send_internal(AsyncRequest *req, int elevel)
* the prepared statements we use in this module are simple enough that
* the data node will make the right choices.
*/
if (0 == PQsendPrepare(remote_connection_get_pg_conn(req->conn),
req->stmt_name,
req->sql,
req->prep_stmt_params,
NULL))
{
/*
* null is fine to pass down as the res, the connection error message
* will get through
*/
remote_connection_elog(req->conn, elevel);
return NULL;
}
ret = PQsendPrepare(remote_connection_get_pg_conn(req->conn),
req->stmt_name,
req->sql,
req->prep_stmt_params,
NULL);
}
else
{
if (0 == PQsendQueryParams(remote_connection_get_pg_conn(req->conn),
req->sql,
stmt_params_total_values(req->params),
/* param types - see note above */ NULL,
stmt_params_values(req->params),
stmt_params_lengths(req->params),
stmt_params_formats(req->params),
req->res_format))
{
/*
* null is fine to pass down as the res, the connection error message
* will get through
*/
remote_connection_elog(req->conn, elevel);
return NULL;
}
ret = PQsendQueryParams(remote_connection_get_pg_conn(req->conn),
req->sql,
stmt_params_total_values(req->params),
/* param types - see note above */ NULL,
stmt_params_values(req->params),
stmt_params_lengths(req->params),
stmt_params_formats(req->params),
req->res_format);
}

if (ret == 0 || !remote_connection_flush(req->conn, NULL))
{
remote_connection_elog(req->conn, elevel);
return NULL;
}

async_request_set_state(req, EXECUTING);
remote_connection_set_status(req->conn, CONN_PROCESSING);
return req;
Expand Down Expand Up @@ -684,7 +677,6 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)
ListCell *lc;
int rc;
WaitEvent event;
uint32 wait_event_info = PG_WAIT_EXTENSION;
AsyncRequest *wait_req;
AsyncResponse *result;
long timeout_ms = -1L;
Expand Down Expand Up @@ -723,16 +715,14 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)
while (true)
{
wait_req = NULL;
rc = WaitEventSetWait(we_set, timeout_ms, &event, 1, wait_event_info);
rc = WaitEventSetWait(we_set, timeout_ms, &event, 1, PG_WAIT_EXTENSION);

if (rc == 0)
{
result = async_response_timeout_create();
break;
}

CHECK_FOR_INTERRUPTS();

if (event.events & ~(WL_SOCKET_READABLE | WL_LATCH_SET))
{
/*
Expand All @@ -748,6 +738,7 @@ wait_to_consume_data(AsyncRequestSet *set, TimestampTz end_time)
if (event.events & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}

if (event.events & WL_SOCKET_READABLE)
Expand Down
Loading

0 comments on commit c8bc867

Please sign in to comment.