diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 373f05ab2f64..9fd88e581836 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -710,12 +710,8 @@ impl ComputeNode { // `pg_ctl` for start / stop, so this just seems much easier to do as we already // have opened connection to Postgres and superuser access. #[instrument(skip_all)] - fn pg_reload_conf(&self) -> Result<()> { - let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl"); - Command::new(pgctl_bin) - .args(["reload", "-D", &self.pgdata]) - .output() - .expect("cannot run pg_ctl process"); + fn pg_reload_conf(&self, client: &mut Client) -> Result<()> { + client.simple_query("SELECT pg_reload_conf()")?; Ok(()) } @@ -728,9 +724,9 @@ impl ComputeNode { // Write new config let pgdata_path = Path::new(&self.pgdata); config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?; - self.pg_reload_conf()?; let mut client = Client::connect(self.connstr.as_str(), NoTls)?; + self.pg_reload_conf(&mut client)?; // Proceed with post-startup configuration. Note, that order of operations is important. // Disable DDL forwarding because control plane already knows about these roles/databases. diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index cc09fb849d59..0a944a6bf008 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -19,10 +19,7 @@ #include "access/xlog.h" #include "access/xlogutils.h" #include "storage/buf_internals.h" -#include "storage/lwlock.h" -#include "storage/ipc.h" #include "c.h" -#include "postmaster/interrupt.h" #include "libpq-fe.h" #include "libpq/pqformat.h" @@ -64,63 +61,23 @@ int flush_every_n_requests = 8; int n_reconnect_attempts = 0; int max_reconnect_attempts = 60; -#define MAX_PAGESERVER_CONNSTRING_SIZE 256 - -typedef struct -{ - LWLockId lock; - pg_atomic_uint64 update_counter; - char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE]; -} PagestoreShmemState; - -#if PG_VERSION_NUM >= 150000 -static shmem_request_hook_type prev_shmem_request_hook = NULL; -static void walproposer_shmem_request(void); -#endif -static shmem_startup_hook_type prev_shmem_startup_hook; -static PagestoreShmemState *pagestore_shared; -static uint64 pagestore_local_counter = 0; -static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE]; - bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL; static bool pageserver_flush(void); static void pageserver_disconnect(void); -static bool -CheckPageserverConnstring(char **newval, void **extra, GucSource source) -{ - return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE; -} -static void -AssignPageserverConnstring(const char *newval, void *extra) -{ - if(!pagestore_shared) - return; - LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE); - strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE); - pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1); - LWLockRelease(pagestore_shared->lock); -} - -static bool -CheckConnstringUpdated() -{ - if(!pagestore_shared) - return false; - return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter); -} +static pqsigfunc prev_signal_handler; static void -ReloadConnstring() +pageserver_sighup_handler(SIGNAL_ARGS) { - if(!pagestore_shared) - return; - LWLockAcquire(pagestore_shared->lock, LW_SHARED); - strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring)); - pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter); - LWLockRelease(pagestore_shared->lock); + if (prev_signal_handler) + { + prev_signal_handler(postgres_signal_arg); + } + neon_log(LOG, "Received SIGHUP, disconnecting pageserver. New pageserver connstring is %s", page_server_connstring); + pageserver_disconnect(); } static bool @@ -134,11 +91,6 @@ pageserver_connect(int elevel) Assert(!connected); - if(CheckConnstringUpdated()) - { - ReloadConnstring(); - } - /* * Connect using the connection string we got from the * neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment @@ -158,7 +110,7 @@ pageserver_connect(int elevel) n++; } keywords[n] = "dbname"; - values[n] = local_pageserver_connstring; + values[n] = page_server_connstring; n++; keywords[n] = NULL; values[n] = NULL; @@ -302,12 +254,6 @@ pageserver_send(NeonRequest * request) { StringInfoData req_buff; - if(CheckConnstringUpdated()) - { - pageserver_disconnect(); - ReloadConnstring(); - } - /* If the connection was lost for some reason, reconnect */ if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD) { @@ -328,7 +274,6 @@ pageserver_send(NeonRequest * request) { while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) { - HandleMainLoopInterrupts(); n_reconnect_attempts += 1; pg_usleep(RECONNECT_INTERVAL_USEC); } @@ -446,8 +391,7 @@ pageserver_flush(void) return true; } -page_server_api api = -{ +page_server_api api = { .send = pageserver_send, .flush = pageserver_flush, .receive = pageserver_receive @@ -461,72 +405,12 @@ check_neon_id(char **newval, void **extra, GucSource source) return **newval == '\0' || HexDecodeString(id, *newval, 16); } -static Size -PagestoreShmemSize(void) -{ - return sizeof(PagestoreShmemState); -} - -static bool -PagestoreShmemInit(void) -{ - bool found; - LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - pagestore_shared = ShmemInitStruct("libpagestore shared state", - PagestoreShmemSize(), - &found); - if(!found) - { - pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock); - pg_atomic_init_u64(&pagestore_shared->update_counter, 0); - AssignPageserverConnstring(page_server_connstring, NULL); - } - LWLockRelease(AddinShmemInitLock); - return found; -} - -static void -pagestore_shmem_startup_hook(void) -{ - if(prev_shmem_startup_hook) - prev_shmem_startup_hook(); - - PagestoreShmemInit(); -} - -static void -pagestore_shmem_request(void) -{ -#if PG_VERSION_NUM >= 150000 - if(prev_shmem_request_hook) - prev_shmem_request_hook(); -#endif - - RequestAddinShmemSpace(PagestoreShmemSize()); - RequestNamedLWLockTranche("neon_libpagestore", 1); -} - -static void -pagestore_prepare_shmem(void) -{ -#if PG_VERSION_NUM >= 150000 - prev_shmem_request_hook = shmem_request_hook; - shmem_request_hook = pagestore_shmem_request; -#else - pagestore_shmem_request(); -#endif - prev_shmem_startup_hook = shmem_startup_hook; - shmem_startup_hook = pagestore_shmem_startup_hook; -} - /* * Module initialization function */ void pg_init_libpagestore(void) { - pagestore_prepare_shmem(); - DefineCustomStringVariable("neon.pageserver_connstring", "connection string to the page server", NULL, @@ -534,7 +418,7 @@ pg_init_libpagestore(void) "", PGC_SIGHUP, 0, /* no flags required */ - CheckPageserverConnstring, AssignPageserverConnstring, NULL); + NULL, NULL, NULL); DefineCustomStringVariable("neon.timeline_id", "Neon timeline_id the server is running on", @@ -615,5 +499,7 @@ pg_init_libpagestore(void) redo_read_buffer_filter = neon_redo_read_buffer_filter; } + prev_signal_handler = pqsignal(SIGHUP, pageserver_sighup_handler); + lfc_init(); } diff --git a/test_runner/regress/test_change_pageserver.py b/test_runner/regress/test_change_pageserver.py index 410bf03c2bc4..31092b70b96d 100644 --- a/test_runner/regress/test_change_pageserver.py +++ b/test_runner/regress/test_change_pageserver.py @@ -1,13 +1,9 @@ -import asyncio - from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder from fixtures.remote_storage import RemoteStorageKind def test_change_pageserver(neon_env_builder: NeonEnvBuilder): - num_connections = 3 - neon_env_builder.num_pageservers = 2 neon_env_builder.enable_pageserver_remote_storage( remote_storage_kind=RemoteStorageKind.MOCK_S3, @@ -20,24 +16,15 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder): alt_pageserver_id = env.pageservers[1].id env.pageservers[1].tenant_attach(env.initial_tenant) - pg_conns = [endpoint.connect() for i in range(num_connections)] - curs = [pg_conn.cursor() for pg_conn in pg_conns] - - def execute(statement: str): - for cur in curs: - cur.execute(statement) - - def fetchone(): - results = [cur.fetchone() for cur in curs] - assert all(result == results[0] for result in results) - return results[0] + pg_conn = endpoint.connect() + cur = pg_conn.cursor() # Create table, and insert some rows. Make it big enough that it doesn't fit in # shared_buffers, otherwise the SELECT after restart will just return answer # from shared_buffers without hitting the page server, which defeats the point # of this test. - curs[0].execute("CREATE TABLE foo (t text)") - curs[0].execute( + cur.execute("CREATE TABLE foo (t text)") + cur.execute( """ INSERT INTO foo SELECT 'long string to consume some space' || g @@ -46,25 +33,25 @@ def fetchone(): ) # Verify that the table is larger than shared_buffers - curs[0].execute( + cur.execute( """ select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size from pg_settings where name = 'shared_buffers' """ ) - row = curs[0].fetchone() + row = cur.fetchone() assert row is not None log.info(f"shared_buffers is {row[0]}, table size {row[1]}") assert int(row[0]) < int(row[1]) - execute("SELECT count(*) FROM foo") - assert fetchone() == (100000,) + cur.execute("SELECT count(*) FROM foo") + assert cur.fetchone() == (100000,) endpoint.reconfigure(pageserver_id=alt_pageserver_id) # Verify that the neon.pageserver_connstring GUC is set to the correct thing - execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'") - connstring = fetchone() + cur.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'") + connstring = cur.fetchone() assert connstring is not None expected_connstring = f"postgresql://no_user:@localhost:{env.pageservers[1].service_port.pg}" assert expected_connstring == expected_connstring @@ -73,45 +60,5 @@ def fetchone(): 0 ].stop() # Stop the old pageserver just to make sure we're reading from the new one - execute("SELECT count(*) FROM foo") - assert fetchone() == (100000,) - - # Try failing back, and this time we will stop the current pageserver before reconfiguring - # the endpoint. Whereas the previous reconfiguration was like a healthy migration, this - # is more like what happens in an unexpected pageserver failure. - env.pageservers[0].start() - env.pageservers[1].stop() - - endpoint.reconfigure(pageserver_id=env.pageservers[0].id) - - execute("SELECT count(*) FROM foo") - assert fetchone() == (100000,) - - env.pageservers[0].stop() - env.pageservers[1].start() - - # Test a (former) bug where a child process spins without updating its connection string - # by executing a query separately. This query will hang until we issue the reconfigure. - async def reconfigure_async(): - await asyncio.sleep( - 1 - ) # Sleep for 1 second just to make sure we actually started our count(*) query - endpoint.reconfigure(pageserver_id=env.pageservers[1].id) - - def execute_count(): - execute("SELECT count(*) FROM FOO") - - async def execute_and_reconfigure(): - task_exec = asyncio.to_thread(execute_count) - task_reconfig = asyncio.create_task(reconfigure_async()) - await asyncio.gather( - task_exec, - task_reconfig, - ) - - asyncio.run(execute_and_reconfigure()) - assert fetchone() == (100000,) - - # One final check that nothing hangs - execute("SELECT count(*) FROM foo") - assert fetchone() == (100000,) + cur.execute("SELECT count(*) FROM foo") + assert cur.fetchone() == (100000,)