Skip to content

Commit

Permalink
Make pageserver_connect poll for changes in config
Browse files Browse the repository at this point in the history
  • Loading branch information
save-buffer committed Nov 3, 2023
1 parent 5ef888a commit e0f9c8d
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 30 deletions.
29 changes: 13 additions & 16 deletions pgxn/neon/libpagestore.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "access/xlogutils.h"
#include "storage/buf_internals.h"
#include "c.h"
#include "postmaster/interrupt.h"

#include "libpq-fe.h"
#include "libpq/pqformat.h"
Expand Down Expand Up @@ -66,20 +67,6 @@ bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) =
static bool pageserver_flush(void);
static void pageserver_disconnect(void);


static pqsigfunc prev_signal_handler;

static void
pageserver_sighup_handler(SIGNAL_ARGS)
{
if (prev_signal_handler)
{
prev_signal_handler(postgres_signal_arg);
}
neon_log(LOG, "Received SIGHUP, disconnecting pageserver. New pageserver connstring is %s", page_server_connstring);
pageserver_disconnect();
}

static bool
pageserver_connect(int elevel)
{
Expand All @@ -91,6 +78,11 @@ pageserver_connect(int elevel)

Assert(!connected);

if (ConfigReloadPending)
{
HandleMainLoopInterrupts();
}

/*
* Connect using the connection string we got from the
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
Expand Down Expand Up @@ -254,6 +246,12 @@ pageserver_send(NeonRequest * request)
{
StringInfoData req_buff;

if (ConfigReloadPending)
{
pageserver_disconnect();
HandleMainLoopInterrupts();
}

/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
Expand All @@ -274,6 +272,7 @@ pageserver_send(NeonRequest * request)
{
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
n_reconnect_attempts += 1;
pg_usleep(RECONNECT_INTERVAL_USEC);
}
Expand Down Expand Up @@ -499,7 +498,5 @@ pg_init_libpagestore(void)
redo_read_buffer_filter = neon_redo_read_buffer_filter;
}

prev_signal_handler = pqsignal(SIGHUP, pageserver_sighup_handler);

lfc_init();
}
71 changes: 57 additions & 14 deletions test_runner/regress/test_change_pageserver.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import asyncio

from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.remote_storage import RemoteStorageKind


def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
num_connections = 3

neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
Expand All @@ -16,15 +20,24 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
alt_pageserver_id = env.pageservers[1].id
env.pageservers[1].tenant_attach(env.initial_tenant)

pg_conn = endpoint.connect()
cur = pg_conn.cursor()
pg_conns = [endpoint.connect() for i in range(num_connections)]
curs = [pg_conn.cursor() for pg_conn in pg_conns]

def execute(statement: str):
for cur in curs:
cur.execute(statement)

def fetchone():
results = [cur.fetchone() for cur in curs]
assert all(result == results[0] for result in results)
return results[0]

# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
curs[0].execute("CREATE TABLE foo (t text)")
curs[0].execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
Expand All @@ -33,25 +46,25 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
)

# Verify that the table is larger than shared_buffers
cur.execute(
curs[0].execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)
row = cur.fetchone()
row = curs[0].fetchone()
assert row is not None
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])

cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)

endpoint.reconfigure(pageserver_id=alt_pageserver_id)

# Verify that the neon.pageserver_connstring GUC is set to the correct thing
cur.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = cur.fetchone()
execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = fetchone()
assert connstring is not None
expected_connstring = f"postgresql://no_user:@localhost:{env.pageservers[1].service_port.pg}"
assert expected_connstring == expected_connstring
Expand All @@ -60,15 +73,45 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
0
].stop() # Stop the old pageserver just to make sure we're reading from the new one

cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)

# Try failing back, and this time we will stop the current pageserver before reconfiguring
# the endpoint. Whereas the previous reconfiguration was like a healthy migration, this
# is more like what happens in an unexpected pageserver failure.
env.pageservers[0].start()
env.pageservers[1].stop()

endpoint.reconfigure(pageserver_id=env.pageservers[0].id)

cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)

env.pageservers[0].stop()
env.pageservers[1].start()

# Test a (former) bug where a child process spins without updating its connection string
# by executing a query separately. This query will hang until we issue the reconfigure.
async def reconfigure_async():
await asyncio.sleep(
1
) # Sleep for 1 second just to make sure we actually started our count(*) query
endpoint.reconfigure(pageserver_id=env.pageservers[1].id)

def execute_count():
execute("SELECT count(*) FROM FOO")

async def execute_and_reconfigure():
task_exec = asyncio.to_thread(execute_count)
task_reconfig = asyncio.create_task(reconfigure_async())
await asyncio.gather(
task_exec,
task_reconfig,
)

asyncio.run(execute_and_reconfigure())
assert fetchone() == (100000,)

# One final check that nothing hangs
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)

0 comments on commit e0f9c8d

Please sign in to comment.