Skip to content

Commit

Permalink
refer #6977
Browse files Browse the repository at this point in the history
Store logical replication origin in KV storage
  • Loading branch information
Konstantin Knizhnik committed May 9, 2024
1 parent d5399b7 commit 8f47edc
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 8 deletions.
17 changes: 17 additions & 0 deletions libs/pageserver_api/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ impl Key {
// AuxFiles:
// 03 00000000 00000000 00000000 00 00000002
//
// ReplOrigin:
// 04 00000000 00000000 00000000 00 ORIGIN_ID
//

//-- Section 01: relation data and metadata

Expand Down Expand Up @@ -569,6 +572,20 @@ pub const AUX_FILES_KEY: Key = Key {
field6: 2,
};

//-- Section 04: Replication origin

#[inline(always)]
pub fn replorigin_key(origin_id: u16) -> Key {
Key {
field1: 0x04,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: origin_id as u32,
}
}

// Reverse mappings for a few Keys.
// These are needed by WAL redo manager.

Expand Down
2 changes: 1 addition & 1 deletion libs/postgres_ffi/src/pg_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub const XACT_XINFO_HAS_SUBXACTS: u32 = 1u32 << 1;
pub const XACT_XINFO_HAS_RELFILENODES: u32 = 1u32 << 2;
pub const XACT_XINFO_HAS_INVALS: u32 = 1u32 << 3;
pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4;
// pub const XACT_XINFO_HAS_ORIGIN: u32 = 1u32 << 5;
pub const XACT_XINFO_HAS_ORIGIN: u32 = 1u32 << 5;
// pub const XACT_XINFO_HAS_AE_LOCKS: u32 = 1u32 << 6;
// pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7;

Expand Down
34 changes: 32 additions & 2 deletions pageserver/src/basebackup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use anyhow::{anyhow, Context};
use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use pageserver_api::key::{key_to_slru_block, Key};
use pageserver_api::key::{key_to_slru_block, replorigin_key, Key};
use postgres_ffi::pg_constants;
use std::fmt::Write as FmtWrite;
use std::time::SystemTime;
Expand Down Expand Up @@ -355,15 +355,45 @@ where
.await
.map_err(|e| BasebackupError::Server(e.into()))?
{
let header = new_tar_header(&path, content.len() as u64)?;
if path.starts_with("pg_replslot") {
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
let restart_lsn = Lsn(u64::from_le_bytes(
content[offs..offs + 8].try_into().unwrap(),
));
info!("Replication slot {} restart LSN={}", path, restart_lsn);
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
} else if path.starts_with("pg_logical/replorigin_checkpoint") {
let n_origins = (content.len() - 4 /* magic */ - 4 /* crc32 */) / 16 /* sizeof(ReplicationStateOnDisk) */;
let mut new_content = Vec::with_capacity(content.len());
new_content.extend_from_slice(&content[0..4]); // magic
for i in 0..n_origins {
let offs = 4 + i * 16;
let origin_id =
u16::from_le_bytes(content[offs..offs + 2].try_into().unwrap());
let origin_lsn =
u64::from_le_bytes(content[offs + 8..offs + 16].try_into().unwrap());
new_content.extend_from_slice(&content[offs..offs + 8]); // aligned origin id

// Try to get orgin_lsn for this origin_id at the moment of basebackup
let key = replorigin_key(origin_id);
if let Ok(buf) = self.timeline.get(key, self.lsn, self.ctx).await {
let tx_origin_lsn = u64::from_le_bytes(buf[0..8].try_into().unwrap());
if tx_origin_lsn > origin_lsn {
new_content.extend_from_slice(&buf[0..8]);
} else {
new_content.extend_from_slice(&content[offs + 8..offs + 16]);
}
}
}
let crc32 = crc32c::crc32c(&new_content);
new_content.extend_from_slice(&crc32.to_le_bytes());
self.ar
.append(&header, &*new_content)
.await
.context("could not add aux file to basebackup tarball")?;
continue;
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
.append(&header, &*content)
.await
Expand Down
12 changes: 9 additions & 3 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::{
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
rel_key_range, rel_size_to_key, relmap_file_key, replorigin_key, slru_block_to_key,
slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key, twophase_file_key,
twophase_key_range, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
};
use pageserver_api::keyspace::SparseKeySpace;
use pageserver_api::models::AuxFilePolicy;
Expand Down Expand Up @@ -1140,6 +1140,12 @@ impl<'a> DatadirModification<'a> {
Ok(())
}

pub fn put_replorigin(&mut self, origin_id: u16, origin_lsn: Lsn) -> anyhow::Result<()> {
let key = replorigin_key(origin_id);
self.put(key, Value::Image(Bytes::from(origin_lsn.ser().unwrap())));
Ok(())
}

pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> {
self.put(CONTROLFILE_KEY, Value::Image(img));
Ok(())
Expand Down
6 changes: 6 additions & 0 deletions pageserver/src/walingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ impl WalIngest {
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT,
decoded.origin_id,
ctx,
)
.await?;
Expand All @@ -246,6 +247,7 @@ impl WalIngest {
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
decoded.origin_id,
ctx,
)
.await?;
Expand Down Expand Up @@ -1178,6 +1180,7 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
parsed: &XlXactParsedRecord,
is_commit: bool,
origin_id: u16,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Record update of CLOG pages
Expand Down Expand Up @@ -1243,6 +1246,9 @@ impl WalIngest {
}
}
}
if origin_id != 0 {
modification.put_replorigin(origin_id, parsed.origin_lsn)?;
}
Ok(())
}

Expand Down
14 changes: 12 additions & 2 deletions pageserver/src/walrecord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, Transacti
use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
use serde::{Deserialize, Serialize};
use tracing::*;
use utils::bin_ser::DeserializeError;
use utils::{bin_ser::DeserializeError, lsn::Lsn};

/// Each update to a page is represented by a NeonWalRecord. It can be a wrapper
/// around a PostgreSQL WAL record, or a custom neon-specific "record".
Expand Down Expand Up @@ -116,6 +116,7 @@ pub struct DecodedWALRecord {

pub blocks: Vec<DecodedBkpBlock>,
pub main_data_offset: usize,
pub origin_id: u16,
}

#[repr(C)]
Expand Down Expand Up @@ -573,6 +574,7 @@ pub struct XlXactParsedRecord {
pub subxacts: Vec<TransactionId>,

pub xnodes: Vec<RelFileNode>,
pub origin_lsn: Lsn,
}

impl XlXactParsedRecord {
Expand Down Expand Up @@ -651,6 +653,11 @@ impl XlXactParsedRecord {
debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid);
}

let origin_lsn = if xinfo & pg_constants::XACT_XINFO_HAS_ORIGIN != 0 {
Lsn(buf.get_u64_le())
} else {
Lsn::INVALID
};
XlXactParsedRecord {
xid,
info,
Expand All @@ -660,6 +667,7 @@ impl XlXactParsedRecord {
ts_id,
subxacts,
xnodes,
origin_lsn,
}
}
}
Expand Down Expand Up @@ -844,6 +852,7 @@ pub fn decode_wal_record(
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
let mut got_rnode = false;
let mut origin_id: u16 = 0;

let mut buf = record.clone();

Expand Down Expand Up @@ -891,7 +900,7 @@ pub fn decode_wal_record(

pg_constants::XLR_BLOCK_ID_ORIGIN => {
// RepOriginId is uint16
buf.advance(2);
origin_id = buf.get_u16_le();
}

pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => {
Expand Down Expand Up @@ -1088,6 +1097,7 @@ pub fn decode_wal_record(
decoded.xl_info = xlogrec.xl_info;
decoded.xl_rmid = xlogrec.xl_rmid;
decoded.record = record;
decoded.origin_id = origin_id;
decoded.main_data_offset = main_data_offset;

Ok(())
Expand Down

0 comments on commit 8f47edc

Please sign in to comment.