Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extend test_change_pageserver for failure case, rework changing pageserver #5693

Merged
merged 6 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,12 @@ impl ComputeNode {
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip_all)]
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
fn pg_reload_conf(&self) -> Result<()> {
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
Command::new(pgctl_bin)
.args(["reload", "-D", &self.pgdata])
.output()
.expect("cannot run pg_ctl process");
Ok(())
}

Expand All @@ -724,9 +728,9 @@ impl ComputeNode {
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
self.pg_reload_conf()?;

let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;

// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
Expand Down
140 changes: 127 additions & 13 deletions pgxn/neon/libpagestore.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "storage/buf_internals.h"
#include "storage/lwlock.h"
#include "storage/ipc.h"
#include "c.h"
#include "postmaster/interrupt.h"

#include "libpq-fe.h"
#include "libpq/pqformat.h"
Expand Down Expand Up @@ -61,23 +64,63 @@ int flush_every_n_requests = 8;
int n_reconnect_attempts = 0;
int max_reconnect_attempts = 60;

#define MAX_PAGESERVER_CONNSTRING_SIZE 256

typedef struct
{
LWLockId lock;
pg_atomic_uint64 update_counter;
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
} PagestoreShmemState;

#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
#endif
static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];

bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;

static bool pageserver_flush(void);
static void pageserver_disconnect(void);

static bool
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
{
return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE;
}

static pqsigfunc prev_signal_handler;
static void
AssignPageserverConnstring(const char *newval, void *extra)
{
if(!pagestore_shared)
return;
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE);
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1);
LWLockRelease(pagestore_shared->lock);
}

static bool
CheckConnstringUpdated()
{
if(!pagestore_shared)
return false;
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
}

static void
pageserver_sighup_handler(SIGNAL_ARGS)
ReloadConnstring()
{
if (prev_signal_handler)
{
prev_signal_handler(postgres_signal_arg);
}
neon_log(LOG, "Received SIGHUP, disconnecting pageserver. New pageserver connstring is %s", page_server_connstring);
pageserver_disconnect();
if(!pagestore_shared)
return;
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
LWLockRelease(pagestore_shared->lock);
}

static bool
Expand All @@ -91,6 +134,11 @@ pageserver_connect(int elevel)

Assert(!connected);

if(CheckConnstringUpdated())
{
ReloadConnstring();
}

/*
* Connect using the connection string we got from the
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
Expand All @@ -110,7 +158,7 @@ pageserver_connect(int elevel)
n++;
}
keywords[n] = "dbname";
values[n] = page_server_connstring;
values[n] = local_pageserver_connstring;
n++;
keywords[n] = NULL;
values[n] = NULL;
Expand Down Expand Up @@ -254,6 +302,12 @@ pageserver_send(NeonRequest * request)
{
StringInfoData req_buff;

if(CheckConnstringUpdated())
{
pageserver_disconnect();
ReloadConnstring();
}

/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
Expand All @@ -274,6 +328,7 @@ pageserver_send(NeonRequest * request)
{
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
n_reconnect_attempts += 1;
pg_usleep(RECONNECT_INTERVAL_USEC);
}
Expand Down Expand Up @@ -391,7 +446,8 @@ pageserver_flush(void)
return true;
}

page_server_api api = {
page_server_api api =
{
.send = pageserver_send,
.flush = pageserver_flush,
.receive = pageserver_receive
Expand All @@ -405,20 +461,80 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}

static Size
PagestoreShmemSize(void)
{
return sizeof(PagestoreShmemState);
}

static bool
PagestoreShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pagestore_shared = ShmemInitStruct("libpagestore shared state",
PagestoreShmemSize(),
&found);
if(!found)
{
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock);
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
return found;
}

static void
pagestore_shmem_startup_hook(void)
{
if(prev_shmem_startup_hook)
prev_shmem_startup_hook();

PagestoreShmemInit();
}

static void
pagestore_shmem_request(void)
{
#if PG_VERSION_NUM >= 150000
if(prev_shmem_request_hook)
prev_shmem_request_hook();
#endif

RequestAddinShmemSpace(PagestoreShmemSize());
RequestNamedLWLockTranche("neon_libpagestore", 1);
}

static void
pagestore_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = pagestore_shmem_request;
#else
pagestore_shmem_request();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pagestore_shmem_startup_hook;
}

/*
* Module initialization function
*/
void
pg_init_libpagestore(void)
{
pagestore_prepare_shmem();

DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
&page_server_connstring,
"",
PGC_SIGHUP,
0, /* no flags required */
NULL, NULL, NULL);
CheckPageserverConnstring, AssignPageserverConnstring, NULL);

DefineCustomStringVariable("neon.timeline_id",
"Neon timeline_id the server is running on",
Expand Down Expand Up @@ -499,7 +615,5 @@ pg_init_libpagestore(void)
redo_read_buffer_filter = neon_redo_read_buffer_filter;
}

prev_signal_handler = pqsignal(SIGHUP, pageserver_sighup_handler);

lfc_init();
}
77 changes: 65 additions & 12 deletions test_runner/regress/test_change_pageserver.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import asyncio

from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.remote_storage import RemoteStorageKind


def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
num_connections = 3

neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
Expand All @@ -16,15 +20,24 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
alt_pageserver_id = env.pageservers[1].id
env.pageservers[1].tenant_attach(env.initial_tenant)

pg_conn = endpoint.connect()
cur = pg_conn.cursor()
pg_conns = [endpoint.connect() for i in range(num_connections)]
curs = [pg_conn.cursor() for pg_conn in pg_conns]

def execute(statement: str):
for cur in curs:
cur.execute(statement)

def fetchone():
results = [cur.fetchone() for cur in curs]
assert all(result == results[0] for result in results)
return results[0]

# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
curs[0].execute("CREATE TABLE foo (t text)")
curs[0].execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
Expand All @@ -33,25 +46,25 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
)

# Verify that the table is larger than shared_buffers
cur.execute(
curs[0].execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)
row = cur.fetchone()
row = curs[0].fetchone()
assert row is not None
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])

cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)

endpoint.reconfigure(pageserver_id=alt_pageserver_id)

# Verify that the neon.pageserver_connstring GUC is set to the correct thing
cur.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = cur.fetchone()
execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = fetchone()
assert connstring is not None
expected_connstring = f"postgresql://no_user:@localhost:{env.pageservers[1].service_port.pg}"
assert expected_connstring == expected_connstring
Expand All @@ -60,5 +73,45 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
0
].stop() # Stop the old pageserver just to make sure we're reading from the new one

cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)

# Try failing back, and this time we will stop the current pageserver before reconfiguring
# the endpoint. Whereas the previous reconfiguration was like a healthy migration, this
# is more like what happens in an unexpected pageserver failure.
env.pageservers[0].start()
env.pageservers[1].stop()

endpoint.reconfigure(pageserver_id=env.pageservers[0].id)

execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)

env.pageservers[0].stop()
env.pageservers[1].start()

# Test a (former) bug where a child process spins without updating its connection string
# by executing a query separately. This query will hang until we issue the reconfigure.
async def reconfigure_async():
await asyncio.sleep(
1
) # Sleep for 1 second just to make sure we actually started our count(*) query
endpoint.reconfigure(pageserver_id=env.pageservers[1].id)

def execute_count():
execute("SELECT count(*) FROM FOO")

async def execute_and_reconfigure():
task_exec = asyncio.to_thread(execute_count)
task_reconfig = asyncio.create_task(reconfigure_async())
await asyncio.gather(
task_exec,
task_reconfig,
)

asyncio.run(execute_and_reconfigure())
assert fetchone() == (100000,)

# One final check that nothing hangs
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)