Skip to content

Commit

Permalink
Revert "extend test_change_pageserver for failure case, rework changi…
Browse files Browse the repository at this point in the history
…ng pageserver (#5693)"

This reverts commit b989ad1.
  • Loading branch information
jcsp committed Nov 9, 2023
1 parent 733877a commit 27e9cb9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 199 deletions.
10 changes: 3 additions & 7 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,12 +710,8 @@ impl ComputeNode {
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip_all)]
fn pg_reload_conf(&self) -> Result<()> {
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
Command::new(pgctl_bin)
.args(["reload", "-D", &self.pgdata])
.output()
.expect("cannot run pg_ctl process");
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
Ok(())
}

Expand All @@ -728,9 +724,9 @@ impl ComputeNode {
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
self.pg_reload_conf()?;

let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;

// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
Expand Down
140 changes: 13 additions & 127 deletions pgxn/neon/libpagestore.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "storage/buf_internals.h"
#include "storage/lwlock.h"
#include "storage/ipc.h"
#include "c.h"
#include "postmaster/interrupt.h"

#include "libpq-fe.h"
#include "libpq/pqformat.h"
Expand Down Expand Up @@ -64,63 +61,23 @@ int flush_every_n_requests = 8;
int n_reconnect_attempts = 0;
int max_reconnect_attempts = 60;

#define MAX_PAGESERVER_CONNSTRING_SIZE 256

typedef struct
{
LWLockId lock;
pg_atomic_uint64 update_counter;
char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];
} PagestoreShmemState;

#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
#endif
static shmem_startup_hook_type prev_shmem_startup_hook;
static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE];

bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;

static bool pageserver_flush(void);
static void pageserver_disconnect(void);

static bool
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
{
return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE;
}

static void
AssignPageserverConnstring(const char *newval, void *extra)
{
if(!pagestore_shared)
return;
LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE);
strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE);
pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1);
LWLockRelease(pagestore_shared->lock);
}

static bool
CheckConnstringUpdated()
{
if(!pagestore_shared)
return false;
return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter);
}
static pqsigfunc prev_signal_handler;

static void
ReloadConnstring()
pageserver_sighup_handler(SIGNAL_ARGS)
{
if(!pagestore_shared)
return;
LWLockAcquire(pagestore_shared->lock, LW_SHARED);
strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring));
pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter);
LWLockRelease(pagestore_shared->lock);
if (prev_signal_handler)
{
prev_signal_handler(postgres_signal_arg);
}
neon_log(LOG, "Received SIGHUP, disconnecting pageserver. New pageserver connstring is %s", page_server_connstring);
pageserver_disconnect();
}

static bool
Expand All @@ -134,11 +91,6 @@ pageserver_connect(int elevel)

Assert(!connected);

if(CheckConnstringUpdated())
{
ReloadConnstring();
}

/*
* Connect using the connection string we got from the
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
Expand All @@ -158,7 +110,7 @@ pageserver_connect(int elevel)
n++;
}
keywords[n] = "dbname";
values[n] = local_pageserver_connstring;
values[n] = page_server_connstring;
n++;
keywords[n] = NULL;
values[n] = NULL;
Expand Down Expand Up @@ -302,12 +254,6 @@ pageserver_send(NeonRequest * request)
{
StringInfoData req_buff;

if(CheckConnstringUpdated())
{
pageserver_disconnect();
ReloadConnstring();
}

/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
Expand All @@ -328,7 +274,6 @@ pageserver_send(NeonRequest * request)
{
while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
n_reconnect_attempts += 1;
pg_usleep(RECONNECT_INTERVAL_USEC);
}
Expand Down Expand Up @@ -446,8 +391,7 @@ pageserver_flush(void)
return true;
}

page_server_api api =
{
page_server_api api = {
.send = pageserver_send,
.flush = pageserver_flush,
.receive = pageserver_receive
Expand All @@ -461,80 +405,20 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}

static Size
PagestoreShmemSize(void)
{
return sizeof(PagestoreShmemState);
}

static bool
PagestoreShmemInit(void)
{
bool found;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
pagestore_shared = ShmemInitStruct("libpagestore shared state",
PagestoreShmemSize(),
&found);
if(!found)
{
pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock);
pg_atomic_init_u64(&pagestore_shared->update_counter, 0);
AssignPageserverConnstring(page_server_connstring, NULL);
}
LWLockRelease(AddinShmemInitLock);
return found;
}

static void
pagestore_shmem_startup_hook(void)
{
if(prev_shmem_startup_hook)
prev_shmem_startup_hook();

PagestoreShmemInit();
}

static void
pagestore_shmem_request(void)
{
#if PG_VERSION_NUM >= 150000
if(prev_shmem_request_hook)
prev_shmem_request_hook();
#endif

RequestAddinShmemSpace(PagestoreShmemSize());
RequestNamedLWLockTranche("neon_libpagestore", 1);
}

static void
pagestore_prepare_shmem(void)
{
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = pagestore_shmem_request;
#else
pagestore_shmem_request();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = pagestore_shmem_startup_hook;
}

/*
* Module initialization function
*/
void
pg_init_libpagestore(void)
{
pagestore_prepare_shmem();

DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
&page_server_connstring,
"",
PGC_SIGHUP,
0, /* no flags required */
CheckPageserverConnstring, AssignPageserverConnstring, NULL);
NULL, NULL, NULL);

DefineCustomStringVariable("neon.timeline_id",
"Neon timeline_id the server is running on",
Expand Down Expand Up @@ -615,5 +499,7 @@ pg_init_libpagestore(void)
redo_read_buffer_filter = neon_redo_read_buffer_filter;
}

prev_signal_handler = pqsignal(SIGHUP, pageserver_sighup_handler);

lfc_init();
}
77 changes: 12 additions & 65 deletions test_runner/regress/test_change_pageserver.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import asyncio

from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.remote_storage import RemoteStorageKind


def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
num_connections = 3

neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
Expand All @@ -20,24 +16,15 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
alt_pageserver_id = env.pageservers[1].id
env.pageservers[1].tenant_attach(env.initial_tenant)

pg_conns = [endpoint.connect() for i in range(num_connections)]
curs = [pg_conn.cursor() for pg_conn in pg_conns]

def execute(statement: str):
for cur in curs:
cur.execute(statement)

def fetchone():
results = [cur.fetchone() for cur in curs]
assert all(result == results[0] for result in results)
return results[0]
pg_conn = endpoint.connect()
cur = pg_conn.cursor()

# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
curs[0].execute("CREATE TABLE foo (t text)")
curs[0].execute(
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
Expand All @@ -46,25 +33,25 @@ def fetchone():
)

# Verify that the table is larger than shared_buffers
curs[0].execute(
cur.execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
from pg_settings where name = 'shared_buffers'
"""
)
row = curs[0].fetchone()
row = cur.fetchone()
assert row is not None
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])

execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)

endpoint.reconfigure(pageserver_id=alt_pageserver_id)

# Verify that the neon.pageserver_connstring GUC is set to the correct thing
execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = fetchone()
cur.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
connstring = cur.fetchone()
assert connstring is not None
expected_connstring = f"postgresql://no_user:@localhost:{env.pageservers[1].service_port.pg}"
assert expected_connstring == expected_connstring
Expand All @@ -73,45 +60,5 @@ def fetchone():
0
].stop() # Stop the old pageserver just to make sure we're reading from the new one

execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)

# Try failing back, and this time we will stop the current pageserver before reconfiguring
# the endpoint. Whereas the previous reconfiguration was like a healthy migration, this
# is more like what happens in an unexpected pageserver failure.
env.pageservers[0].start()
env.pageservers[1].stop()

endpoint.reconfigure(pageserver_id=env.pageservers[0].id)

execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)

env.pageservers[0].stop()
env.pageservers[1].start()

# Test a (former) bug where a child process spins without updating its connection string
# by executing a query separately. This query will hang until we issue the reconfigure.
async def reconfigure_async():
await asyncio.sleep(
1
) # Sleep for 1 second just to make sure we actually started our count(*) query
endpoint.reconfigure(pageserver_id=env.pageservers[1].id)

def execute_count():
execute("SELECT count(*) FROM FOO")

async def execute_and_reconfigure():
task_exec = asyncio.to_thread(execute_count)
task_reconfig = asyncio.create_task(reconfigure_async())
await asyncio.gather(
task_exec,
task_reconfig,
)

asyncio.run(execute_and_reconfigure())
assert fetchone() == (100000,)

# One final check that nothing hangs
execute("SELECT count(*) FROM foo")
assert fetchone() == (100000,)
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)

0 comments on commit 27e9cb9

Please sign in to comment.