Skip to content

Commit

Permalink
Remove support for pageserver <-> compute protocol version 1 (#8774)
Browse files Browse the repository at this point in the history
Protocol version 2 has been the default for a while now, and we no
longer have any computes running in production that used protocol
version 1. This completes the migration by removing support for v1 in
both the pageserver and the compute.

See issue #6211.
  • Loading branch information
hlinnaka authored Aug 27, 2024
1 parent 9b9f90c commit 2d10306
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 128 deletions.
41 changes: 8 additions & 33 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
}
}

// In the V2 protocol version, a GetPage request contains two LSN values:
// A GetPage request contains two LSN values:
//
// request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
// "get the latest version present". It's used by the primary server, which knows that no one else
Expand All @@ -1076,23 +1076,19 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
// passing an earlier LSN can speed up the request, by allowing the pageserver to process the
// request without waiting for 'request_lsn' to arrive.
//
// The legacy V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
// The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
// sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
// 'latest' was set to true. The V2 interface was added because there was no correct way for a
// standby to request a page at a particular non-latest LSN, and also include the
// 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
// request, if the standby knows that the page hasn't been modified since, and risk getting an error
// if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
// require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
// interface allows sending both LSNs, and let the pageserver do the right thing. There is no
// interface allows sending both LSNs, and let the pageserver do the right thing. There was no
// difference in the responses between V1 and V2.
//
// The Request structs below reflect the V2 interface. If V1 is used, the parse function
// maps the old format requests to the new format.
//
#[derive(Clone, Copy)]
pub enum PagestreamProtocolVersion {
V1,
V2,
}

Expand Down Expand Up @@ -1231,36 +1227,17 @@ impl PagestreamFeMessage {
bytes.into()
}

pub fn parse<R: std::io::Read>(
body: &mut R,
protocol_version: PagestreamProtocolVersion,
) -> anyhow::Result<PagestreamFeMessage> {
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;

let (request_lsn, not_modified_since) = match protocol_version {
PagestreamProtocolVersion::V2 => (
Lsn::from(body.read_u64::<BigEndian>()?),
Lsn::from(body.read_u64::<BigEndian>()?),
),
PagestreamProtocolVersion::V1 => {
// In the old protocol, each message starts with a boolean 'latest' flag,
// followed by 'lsn'. Convert that to the two LSNs, 'request_lsn' and
// 'not_modified_since', used in the new protocol version.
let latest = body.read_u8()? != 0;
let request_lsn = Lsn::from(body.read_u64::<BigEndian>()?);
if latest {
(Lsn::MAX, request_lsn) // get latest version
} else {
(request_lsn, request_lsn) // get version at specified LSN
}
}
};
// these two fields are the same for every request type
let request_lsn = Lsn::from(body.read_u64::<BigEndian>()?);
let not_modified_since = Lsn::from(body.read_u64::<BigEndian>()?);

// The rest of the messages are the same between V1 and V2
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
request_lsn,
Expand Down Expand Up @@ -1468,9 +1445,7 @@ mod tests {
];
for msg in messages {
let bytes = msg.serialize();
let reconstructed =
PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V2)
.unwrap();
let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
assert!(msg == reconstructed);
}
}
Expand Down
1 change: 0 additions & 1 deletion pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,6 @@ pub(crate) static LIVE_CONNECTIONS: Lazy<IntCounterPairVec> = Lazy::new(|| {
#[derive(Clone, Copy, enum_map::Enum, IntoStaticStr)]
pub(crate) enum ComputeCommandKind {
PageStreamV2,
PageStream,
Basebackup,
Fullbackup,
LeaseLsn,
Expand Down
34 changes: 2 additions & 32 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ impl PageServerHandler {
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
protocol_version: PagestreamProtocolVersion,
_protocol_version: PagestreamProtocolVersion,
ctx: RequestContext,
) -> Result<(), QueryError>
where
Expand Down Expand Up @@ -601,8 +601,7 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message");

// parse request
let neon_fe_msg =
PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;

// invoke handler function
let (handler_result, span) = match neon_fe_msg {
Expand Down Expand Up @@ -1275,35 +1274,6 @@ where
ctx,
)
.await?;
} else if let Some(params) = parts.strip_prefix(&["pagestream"]) {
if params.len() != 2 {
return Err(QueryError::Other(anyhow::anyhow!(
"invalid param number for pagestream command"
)));
}
let tenant_id = TenantId::from_str(params[0])
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
let timeline_id = TimelineId::from_str(params[1])
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;

tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
.record("timeline_id", field::display(timeline_id));

self.check_permission(Some(tenant_id))?;

COMPUTE_COMMANDS_COUNTERS
.for_command(ComputeCommandKind::PageStream)
.inc();

self.handle_pagerequests(
pgb,
tenant_id,
timeline_id,
PagestreamProtocolVersion::V1,
ctx,
)
.await?;
} else if let Some(params) = parts.strip_prefix(&["basebackup"]) {
if params.len() < 2 {
return Err(QueryError::Other(anyhow::anyhow!(
Expand Down
5 changes: 1 addition & 4 deletions pgxn/neon/libpagestore.c
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
case 2:
pagestream_query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline);
break;
case 1:
pagestream_query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
break;
default:
elog(ERROR, "unexpected neon_protocol_version %d", neon_protocol_version);
}
Expand Down Expand Up @@ -1063,7 +1060,7 @@ pg_init_libpagestore(void)
NULL,
&neon_protocol_version,
2, /* use protocol version 2 */
1, /* min */
2, /* min */
2, /* max */
PGC_SU_BACKEND,
0, /* no flags required */
Expand Down
5 changes: 2 additions & 3 deletions pgxn/neon/pagestore_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ typedef enum {
* can skip traversing through recent layers which we know to not contain any
* versions for the requested page.
*
* These structs describe the V2 of these requests. The old V1 protocol contained
* just one LSN and a boolean 'latest' flag. If the neon_protocol_version GUC is
* set to 1, we will convert these to the V1 requests before sending.
* These structs describe the V2 of these requests. (The old now-defunct V1
* protocol contained just one LSN and a boolean 'latest' flag.)
*/
typedef struct
{
Expand Down
47 changes: 3 additions & 44 deletions pgxn/neon/pagestore_smgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -1001,51 +1001,10 @@ nm_pack_request(NeonRequest *msg)

initStringInfo(&s);

if (neon_protocol_version >= 2)
{
pq_sendbyte(&s, msg->tag);
pq_sendint64(&s, msg->lsn);
pq_sendint64(&s, msg->not_modified_since);
}
else
{
bool latest;
XLogRecPtr lsn;

/*
* In primary, we always request the latest page version.
*/
if (!RecoveryInProgress())
{
latest = true;
lsn = msg->not_modified_since;
}
else
{
/*
* In the protocol V1, we cannot represent that we want to read
* page at LSN X, and we know that it hasn't been modified since
* Y. We can either use 'not_modified_lsn' as the request LSN, and
* risk getting an error if that LSN is too old and has already
* fallen out of the pageserver's GC horizon, or we can send
* 'request_lsn', causing the pageserver to possibly wait for the
* recent WAL to arrive unnecessarily. Or something in between. We
* choose to use the old LSN and risk GC errors, because that's
* what we've done historically.
*/
latest = false;
lsn = msg->not_modified_since;
}
pq_sendbyte(&s, msg->tag);
pq_sendint64(&s, msg->lsn);
pq_sendint64(&s, msg->not_modified_since);

pq_sendbyte(&s, msg->tag);
pq_sendbyte(&s, latest);
pq_sendint64(&s, lsn);
}

/*
* The rest of the request messages are the same between protocol V1 and
* V2
*/
switch (messageTag(msg))
{
/* pagestore_client -> pagestore */
Expand Down
2 changes: 1 addition & 1 deletion test_runner/regress/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def op():
def check_pageserver(expect_success: bool, **conn_kwargs):
check_connection(
env.pageserver,
f"pagestream {env.initial_tenant} {env.initial_timeline}",
f"pagestream_v2 {env.initial_tenant} {env.initial_timeline}",
expect_success,
**conn_kwargs,
)
Expand Down
10 changes: 0 additions & 10 deletions test_runner/regress/test_read_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ def test_read_validation(neon_simple_env: NeonEnv):

endpoint = env.endpoints.create_start(
"test_read_validation",
# Use protocol version 2, because the code that constructs the V1 messages
# assumes that a primary always wants to read the latest version of a page,
# and therefore doesn't work with the test functions below to read an older
# page version.
config_lines=["neon.protocol_version=2"],
)

with closing(endpoint.connect()) as con:
Expand Down Expand Up @@ -142,11 +137,6 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):

endpoint = env.endpoints.create_start(
"test_read_validation_neg",
# Use protocol version 2, because the code that constructs the V1 messages
# assumes that a primary always wants to read the latest version of a page,
# and therefore doesn't work with the test functions below to read an older
# page version.
config_lines=["neon.protocol_version=2"],
)

with closing(endpoint.connect()) as con:
Expand Down

1 comment on commit 2d10306

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3780 tests run: 3673 passed, 1 failed, 106 skipped (full report)


Failures on Postgres 15

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_create_churn_during_restart[release-pg15]"
Flaky tests (1)

Postgres 14

Test coverage report is not available

The comment gets automatically updated with the latest test results
2d10306 at 2024-08-27T16:22:18.566Z :recycle:

Please sign in to comment.