diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 2511de00d59c..1a3e8ea35fc7 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -324,6 +324,9 @@ impl Key { // AuxFiles: // 03 00000000 00000000 00000000 00 00000002 // +// ReplOrigin: +// 04 00000000 00000000 00000000 00 ORIGIN_ID +// //-- Section 01: relation data and metadata @@ -569,6 +572,20 @@ pub const AUX_FILES_KEY: Key = Key { field6: 2, }; +//-- Section 04: Replication origin + +#[inline(always)] +pub fn replorigin_key(origin_id: u16) -> Key { + Key { + field1: 0x04, + field2: 0, + field3: 0, + field4: 0, + field5: 0, + field6: origin_id as u32, + } +} + // Reverse mappings for a few Keys. // These are needed by WAL redo manager. diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 2701ddf5e062..e77a0e96d953 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -102,7 +102,7 @@ pub const XACT_XINFO_HAS_SUBXACTS: u32 = 1u32 << 1; pub const XACT_XINFO_HAS_RELFILENODES: u32 = 1u32 << 2; pub const XACT_XINFO_HAS_INVALS: u32 = 1u32 << 3; pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; -// pub const XACT_XINFO_HAS_ORIGIN: u32 = 1u32 << 5; +pub const XACT_XINFO_HAS_ORIGIN: u32 = 1u32 << 5; // pub const XACT_XINFO_HAS_AE_LOCKS: u32 = 1u32 << 6; // pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7; diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 58b18dae7d97..1819702ede86 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -13,7 +13,7 @@ use anyhow::{anyhow, Context}; use bytes::{BufMut, Bytes, BytesMut}; use fail::fail_point; -use pageserver_api::key::{key_to_slru_block, Key}; +use pageserver_api::key::{key_to_slru_block, replorigin_key, Key}; use postgres_ffi::pg_constants; use std::fmt::Write as FmtWrite; use std::time::SystemTime; @@ -355,6 +355,7 @@ where .await .map_err(|e| BasebackupError::Server(e.into()))? { + let header = new_tar_header(&path, content.len() as u64)?; if path.starts_with("pg_replslot") { let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN; let restart_lsn = Lsn(u64::from_le_bytes( @@ -362,8 +363,37 @@ where )); info!("Replication slot {} restart LSN={}", path, restart_lsn); min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn); + } else if path.starts_with("pg_logical/replorigin_checkpoint") { + let n_origins = (content.len() - 4 /* magic */ - 4 /* crc32 */) / 16 /* sizeof(ReplicationStateOnDisk) */; + let mut new_content = Vec::with_capacity(content.len()); + new_content.extend_from_slice(&content[0..4]); // magic + for i in 0..n_origins { + let offs = 4 + i * 16; + let origin_id = + u16::from_le_bytes(content[offs..offs + 2].try_into().unwrap()); + let origin_lsn = + u64::from_le_bytes(content[offs + 8..offs + 16].try_into().unwrap()); + new_content.extend_from_slice(&content[offs..offs + 8]); // aligned origin id + + // Try to get orgin_lsn for this origin_id at the moment of basebackup + let key = replorigin_key(origin_id); + if let Ok(buf) = self.timeline.get(key, self.lsn, self.ctx).await { + let tx_origin_lsn = u64::from_le_bytes(buf[0..8].try_into().unwrap()); + if tx_origin_lsn > origin_lsn { + new_content.extend_from_slice(&buf[0..8]); + } else { + new_content.extend_from_slice(&content[offs + 8..offs + 16]); + } + } + } + let crc32 = crc32c::crc32c(&new_content); + new_content.extend_from_slice(&crc32.to_le_bytes()); + self.ar + .append(&header, &*new_content) + .await + .context("could not add aux file to basebackup tarball")?; + continue; } - let header = new_tar_header(&path, content.len() as u64)?; self.ar .append(&header, &*content) .await diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index a4215ee107b2..e0c39d311171 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -19,9 +19,9 @@ use enum_map::Enum; use itertools::Itertools; use pageserver_api::key::{ dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key, - rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key, - slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range, - AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, + rel_key_range, rel_size_to_key, relmap_file_key, replorigin_key, slru_block_to_key, + slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, + twophase_key_range, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, }; use pageserver_api::keyspace::SparseKeySpace; use pageserver_api::models::AuxFilePolicy; @@ -1140,6 +1140,12 @@ impl<'a> DatadirModification<'a> { Ok(()) } + pub fn put_replorigin(&mut self, origin_id: u16, origin_lsn: Lsn) -> anyhow::Result<()> { + let key = replorigin_key(origin_id); + self.put(key, Value::Image(Bytes::from(origin_lsn.ser().unwrap()))); + Ok(()) + } + pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> { self.put(CONTROLFILE_KEY, Value::Image(img)); Ok(()) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 79f075b877a4..2681311fbfc6 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -234,6 +234,7 @@ impl WalIngest { modification, &parsed_xact, info == pg_constants::XLOG_XACT_COMMIT, + decoded.origin_id, ctx, ) .await?; @@ -246,6 +247,7 @@ impl WalIngest { modification, &parsed_xact, info == pg_constants::XLOG_XACT_COMMIT_PREPARED, + decoded.origin_id, ctx, ) .await?; @@ -1178,6 +1180,7 @@ impl WalIngest { modification: &mut DatadirModification<'_>, parsed: &XlXactParsedRecord, is_commit: bool, + origin_id: u16, ctx: &RequestContext, ) -> anyhow::Result<()> { // Record update of CLOG pages @@ -1243,6 +1246,9 @@ impl WalIngest { } } } + if origin_id != 0 { + modification.put_replorigin(origin_id, parsed.origin_lsn)?; + } Ok(()) } diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 02f6f4969457..91750a5b4188 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -12,7 +12,7 @@ use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, Transacti use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD}; use serde::{Deserialize, Serialize}; use tracing::*; -use utils::bin_ser::DeserializeError; +use utils::{bin_ser::DeserializeError, lsn::Lsn}; /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper /// around a PostgreSQL WAL record, or a custom neon-specific "record". @@ -116,6 +116,7 @@ pub struct DecodedWALRecord { pub blocks: Vec, pub main_data_offset: usize, + pub origin_id: u16, } #[repr(C)] @@ -573,6 +574,7 @@ pub struct XlXactParsedRecord { pub subxacts: Vec, pub xnodes: Vec, + pub origin_lsn: Lsn, } impl XlXactParsedRecord { @@ -651,6 +653,11 @@ impl XlXactParsedRecord { debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid); } + let origin_lsn = if xinfo & pg_constants::XACT_XINFO_HAS_ORIGIN != 0 { + Lsn(buf.get_u64_le()) + } else { + Lsn::INVALID + }; XlXactParsedRecord { xid, info, @@ -660,6 +667,7 @@ impl XlXactParsedRecord { ts_id, subxacts, xnodes, + origin_lsn, } } } @@ -844,6 +852,7 @@ pub fn decode_wal_record( let mut rnode_dbnode: u32 = 0; let mut rnode_relnode: u32 = 0; let mut got_rnode = false; + let mut origin_id: u16 = 0; let mut buf = record.clone(); @@ -891,7 +900,7 @@ pub fn decode_wal_record( pg_constants::XLR_BLOCK_ID_ORIGIN => { // RepOriginId is uint16 - buf.advance(2); + origin_id = buf.get_u16_le(); } pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => { @@ -1088,6 +1097,7 @@ pub fn decode_wal_record( decoded.xl_info = xlogrec.xl_info; decoded.xl_rmid = xlogrec.xl_rmid; decoded.record = record; + decoded.origin_id = origin_id; decoded.main_data_offset = main_data_offset; Ok(())