Skip to content

Commit

Permalink
extend test_change_pageserver for failure case, rework changing pages…
Browse files Browse the repository at this point in the history
…erver (#5693)

Reproducer for #5692

The test change in this PR intentionally fails, to demonstrate the
issue.

---------

Co-authored-by: Sasha Krassovsky <krassovskysasha@gmail.com>
  • Loading branch information
jcsp and save-buffer authored Nov 8, 2023
1 parent acef742 commit b989ad1
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 28 deletions.
10 changes: 7 additions & 3 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,12 @@ impl ComputeNode {
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip_all)]
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
fn pg_reload_conf(&self) -> Result<()> {
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
Command::new(pgctl_bin)
.args(["reload", "-D", &self.pgdata])
.output()
.expect("cannot run pg_ctl process");
Ok(())
}

Expand All @@ -724,9 +728,9 @@ impl ComputeNode {
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
self.pg_reload_conf()?;

let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;

// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
Expand Down
140 changes: 127 additions & 13 deletions pgxn/neon/libpagestore.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "storage/buf_internals.h"
#include "storage/lwlock.h"
#include "storage/ipc.h"
#include "c.h"
#include "postmaster/interrupt.h"

#include "libpq-fe.h"
#include "libpq/pqformat.h"
Expand Down Expand Up @@ -61,23 +64,63 @@ int flush_every_n_requests = 8;
int n_reconnect_attempts = 0;
int max_reconnect_attempts = 60;

#define MAX_PAGESERVER_CONNSTRING_SIZE 256

typedef struct
{
LWLockId lock;
pg_atomic_uint64 update_counter;
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
} PagestoreShmemState;

#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
#endif
static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];

bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;

static bool pageserver_flush(void);
static void pageserver_disconnect(void);

static bool
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
{
return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE;
}

static pqsigfunc prev_signal_handler;
static void
AssignPageserverConnstring(const char *newval, void *extra)
{
if(!pagestore_shared)
return;
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE);
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1);
LWLockRelease(pagestore_shared->lock);
}

static bool
CheckConnstringUpdated()
{
if(!pagestore_shared)
return false;
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
}

static void
pageserver_sighup_handler(SIGNAL_ARGS)
ReloadConnstring()
{
if (prev_signal_handler)
{
prev_signal_handler(postgres_signal_arg);
}
neon_log(LOG, "Received SIGHUP, disconnecting pageserver. New pageserver connstring is %s", page_server_connstring);
pageserver_disconnect();
if(!pagestore_shared)
return;
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
LWLockRelease(pagestore_shared->lock);
}

static bool
Expand All @@ -91,6 +134,11 @@ pageserver_connect(int elevel)

Assert(!connected);

if(CheckConnstringUpdated())
{
ReloadConnstring();
}

/*
* Connect using the connection string we got from the
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
Expand All @@ -110,7 +158,7 @@ pageserver_connect(int elevel)
n++;
}
keywords[n] = "dbname";
values[n] = page_server_connstring;
values[n] = local_pageserver_connstring;
n++;
keywords[n] = NULL;
values[n] = NULL;
Expand Down Expand Up @@ -254,6 +302,12 @@ pageserver_send(NeonRequest * request)
{
StringInfoData req_buff;

if(CheckConnstringUpdated())
{
pageserver_disconnect();
ReloadConnstring();
}

/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
Expand All @@ -274,6 +328,7 @@ pageserver_send(NeonRequest * request)
{
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
n_reconnect_attempts += 1;
pg_usleep(RECONNECT_INTERVAL_USEC);
}
Expand Down Expand Up @@ -391,7 +446,8 @@ pageserver_flush(void)
return true;
}

page_server_api api = {
page_server_api api =
{
.send = pageserver_send,
.flush = pageserver_flush,
.receive = pageserver_receive
Expand All @@ -405,20 +461,80 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}

static Size
PagestoreShmemSize(void)
{
return sizeof(PagestoreShmemState);
}

static bool
PagestoreShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pagestore_shared = ShmemInitStruct("libpagestore shared state",
PagestoreShmemSize(),
&found);
if(!found)
{
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock);
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
return found;
}

static void
pagestore_shmem_startup_hook(void)
{
if(prev_shmem_startup_hook)
prev_shmem_startup_hook();

PagestoreShmemInit();
}

static void
pagestore_shmem_request(void)
{
#if PG_VERSION_NUM >= 150000
if(prev_shmem_request_hook)
prev_shmem_request_hook();
#endif

RequestAddinShmemSpace(PagestoreShmemSize());
RequestNamedLWLockTranche("neon_libpagestore", 1);
}

static void
pagestore_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = pagestore_shmem_request;
#else
pagestore_shmem_request();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pagestore_shmem_startup_hook;
}

/*
* Module initialization function
*/
void
pg_init_libpagestore(void)
{
pagestore_prepare_shmem();

DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
&page_server_connstring,
"",
PGC_SIGHUP,
0, /* no flags required */
NULL, NULL, NULL);
CheckPageserverConnstring, AssignPageserverConnstring, NULL);

DefineCustomStringVariable("neon.timeline_id",
"Neon timeline_id the server is running on",
Expand Down Expand Up @@ -499,7 +615,5 @@ pg_init_libpagestore(void)
redo_read_buffer_filter = neon_redo_read_buffer_filter;
}

prev_signal_handler = pqsignal(SIGHUP, pageserver_sighup_handler);

lfc_init();
}
77 changes: 65 additions & 12 deletions test_runner/regress/test_change_pageserver.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import asyncio

from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.remote_storage import RemoteStorageKind


def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
num_connections = 3

neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
Expand All @@ -16,15 +20,24 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
alt_pageserver_id = env.pageservers[1].id
env.pageservers[1].tenant_attach(env.initial_tenant)

pg_conn = endpoint.connect()
cur = pg_conn.cursor()
pg_conns = [endpoint.connect() for i in range(num_connections)]
curs = [pg_conn.cursor() for pg_conn in pg_conns]

def execute(statement: str):
for cur in curs:
cur.execute(statement)

def fetchone():
results = [cur.fetchone() for cur in curs]
assert all(result == results[0] for result in results)
return results[0]

# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
curs[0].execute("CREATE TABLE foo (t text)")
curs[0].execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
Expand All @@ -33,25 +46,25 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
)

# Verify that the table is larger than shared_buffers
cur.execute(
curs[0].execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)
row = cur.fetchone()
row = curs[0].fetchone()
assert row is not None
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])

cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)

endpoint.reconfigure(pageserver_id=alt_pageserver_id)

# Verify that the neon.pageserver_connstring GUC is set to the correct thing
cur.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = cur.fetchone()
execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = fetchone()
assert connstring is not None
expected_connstring = f"postgresql://no_user:@localhost:{env.pageservers[1].service_port.pg}"
assert expected_connstring == expected_connstring
Expand All @@ -60,5 +73,45 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
0
].stop() # Stop the old pageserver just to make sure we're reading from the new one

cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)

# Try failing back, and this time we will stop the current pageserver before reconfiguring
# the endpoint. Whereas the previous reconfiguration was like a healthy migration, this
# is more like what happens in an unexpected pageserver failure.
env.pageservers[0].start()
env.pageservers[1].stop()

endpoint.reconfigure(pageserver_id=env.pageservers[0].id)

execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)

env.pageservers[0].stop()
env.pageservers[1].start()

# Test a (former) bug where a child process spins without updating its connection string
# by executing a query separately. This query will hang until we issue the reconfigure.
async def reconfigure_async():
await asyncio.sleep(
1
) # Sleep for 1 second just to make sure we actually started our count(*) query
endpoint.reconfigure(pageserver_id=env.pageservers[1].id)

def execute_count():
execute("SELECT count(*) FROM FOO")

async def execute_and_reconfigure():
task_exec = asyncio.to_thread(execute_count)
task_reconfig = asyncio.create_task(reconfigure_async())
await asyncio.gather(
task_exec,
task_reconfig,
)

asyncio.run(execute_and_reconfigure())
assert fetchone() == (100000,)

# One final check that nothing hangs
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)

1 comment on commit b989ad1

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2438 tests run: 2316 passed, 1 failed, 121 skipped (full report)


Failures on Postgres 15

  • test_metrics_while_ignoring_broken_tenant_and_reloading: debug
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_metrics_while_ignoring_broken_tenant_and_reloading[debug-pg15]"
Flaky tests (1)

Postgres 16

  • test_emergency_mode: debug

Test coverage report is not available

The comment gets automatically updated with the latest test results
b989ad1 at 2023-11-08T12:09:28.756Z :recycle:

Please sign in to comment.