diff --git a/tools/rp_storage_tool/src/bucket_reader.rs b/tools/rp_storage_tool/src/bucket_reader.rs index 41e5b108cc8fb..0f59cb81b7d96 100644 --- a/tools/rp_storage_tool/src/bucket_reader.rs +++ b/tools/rp_storage_tool/src/bucket_reader.rs @@ -1,12 +1,14 @@ use crate::batch_reader::BatchStream; use crate::error::BucketReaderError; use crate::fundamental::{ - raw_to_kafka, KafkaOffset, RaftTerm, RawOffset, Timestamp, NTP, NTPR, NTR, + raw_to_kafka, KafkaOffset, LabeledNTPR, LabeledNTR, RaftTerm, RawOffset, Timestamp, NTP, NTPR, + NTR, }; use crate::ntp_mask::NTPFilter; use crate::remote_types::{ parse_segment_shortname, ArchivePartitionManifest, ClusterMetadataManifest, LifecycleMarker, - LifecycleStatus, PartitionManifest, PartitionManifestSegment, RpSerde, TopicManifest, + LifecycleStatus, PartitionManifest, PartitionManifestSegment, RemoteLabel, RpSerde, + TopicManifest, }; use crate::repair::{maybe_adjust_manifest, project_repairs, RepairEdit}; use async_stream::stream; @@ -236,10 +238,10 @@ pub struct Anomalies { pub malformed_topic_manifests: HashSet, /// NTPR that had segment objects, but no partition manifest - pub ntpr_no_manifest: HashSet, + pub ntpr_no_manifest: HashSet, /// NTR that had segment objects and/or partition manifests, but no topic manifest - pub ntr_no_topic_manifest: HashSet, + pub ntr_no_topic_manifest: HashSet, /// Keys that do not look like any object we expect pub unknown_keys: HashSet, @@ -248,17 +250,17 @@ pub struct Anomalies { pub missing_segments: HashSet, /// NTPR that failed consistency checks on its segments' metadata - pub ntpr_bad_deltas: HashSet, + pub ntpr_bad_deltas: HashSet, /// Consistency checks found overlapping segments, which may be readable but /// indicate a bug in the code that wrote them. - pub ntpr_overlap_offsets: HashSet, + pub ntpr_overlap_offsets: HashSet, /// Where a partition manifest has two segments whose commited+base offsets /// are discontinuous. This gap is reported as an anomaly, and may also be /// used to cue subsequent data scans. /// Ref Incident 259 - pub metadata_offset_gaps: HashMap>, + pub metadata_offset_gaps: HashMap>, /// Files referenced by the cluster manifest with the highest metadata ID which do not exist in /// the bucket. @@ -290,7 +292,7 @@ pub struct PartitionMetadataSummary { #[derive(Serialize)] pub struct MetadataSummary { pub anomalies: Anomalies, - pub partitions: BTreeMap, + pub partitions: BTreeMap, } impl Anomalies { @@ -508,10 +510,10 @@ pub struct ClusterMetadata { /// Find all the partitions and their segments within a bucket pub struct BucketReader { - pub partitions: HashMap, - pub partition_manifests: HashMap, - pub topic_manifests: HashMap, - pub lifecycle_markers: HashMap, + pub partitions: HashMap, + pub partition_manifests: HashMap, + pub topic_manifests: HashMap, + pub lifecycle_markers: HashMap, pub cluster_metadata: HashMap, pub anomalies: Anomalies, pub client: Arc, @@ -519,10 +521,10 @@ pub struct BucketReader { #[derive(Serialize, Deserialize)] struct SavedBucketReader { - pub partitions: HashMap, - pub partition_manifests: HashMap, - pub topic_manifests: HashMap, - pub lifecycle_markers: HashMap, + pub partitions: HashMap, + pub partition_manifests: HashMap, + pub topic_manifests: HashMap, + pub lifecycle_markers: HashMap, pub cluster_metadata: HashMap, } @@ -585,19 +587,19 @@ impl BucketReader { self.partitions = self .partitions .drain() - .filter(|i| filter.match_ntpr(&i.0)) + .filter(|i| filter.match_lntpr(&i.0)) .collect(); self.partition_manifests = self .partition_manifests .drain() - .filter(|i| filter.match_ntpr(&i.0)) + .filter(|i| filter.match_lntpr(&i.0)) .collect(); self.topic_manifests = self .topic_manifests .drain() - .filter(|i| filter.match_ntr(&i.0)) + .filter(|i| filter.match_lntr(&i.0)) .collect(); } @@ -747,17 +749,21 @@ impl BucketReader { pub fn get_summary(&self) -> MetadataSummary { let mut partitions = BTreeMap::new(); - for (ntpr, partition_meta) in &self.partition_manifests { + for (lntpr, partition_meta) in &self.partition_manifests { if let Some(manifest) = &partition_meta.head_manifest { let ntr = NTR { namespace: manifest.namespace.clone(), topic: manifest.namespace.clone(), revision_id: manifest.revision, }; + let labeled_ntr = LabeledNTR { + ntr, + label: lntpr.label.clone(), + }; let kafka_offsets = manifest.kafka_watermarks(); partitions.insert( - ntpr.clone(), + lntpr.clone(), PartitionMetadataSummary { bytes: manifest.get_size_bytes(), raw_start_offset: manifest.start_offsets().0, @@ -766,7 +772,7 @@ impl BucketReader { kafka_hwm: kafka_offsets.map(|x| x.1), lifecycle_status: self .lifecycle_markers - .get(&ntr) + .get(&labeled_ntr) .and_then(|m| Some(m.status.clone())), }, ); @@ -787,33 +793,39 @@ impl BucketReader { // to deleted topics. To avoid making output hard to read when filtering on NTP (without R), // suppress old revisions. let mut latest_revision: HashMap<(String, String), i64> = HashMap::new(); - for ntpr in self.partitions.keys() { + for lntpr in self.partitions.keys() { // FIXME: these clones are gratuitous - let nt = (ntpr.ntp.namespace.clone(), ntpr.ntp.topic.clone()); + let nt = ( + lntpr.ntpr.ntp.namespace.clone(), + lntpr.ntpr.ntp.topic.clone(), + ); if let Some(current_max) = latest_revision.get(&nt) { - if current_max >= &ntpr.revision_id { + if current_max >= &lntpr.ntpr.revision_id { continue; } } - latest_revision.insert(nt, ntpr.revision_id); + latest_revision.insert(nt, lntpr.ntpr.revision_id); } - let match_ntpr = |ntpr: &NTPR| { + let match_ntpr = |lntpr: &LabeledNTPR| { // FIXME: these clones are gratuitous - let nt = (ntpr.ntp.namespace.clone(), ntpr.ntp.topic.clone()); + let nt = ( + lntpr.ntpr.ntp.namespace.clone(), + lntpr.ntpr.ntp.topic.clone(), + ); if let Some(latest) = latest_revision.get(&nt) { - ntpr.revision_id == *latest + lntpr.ntpr.revision_id == *latest } else { true } }; - let match_ntr = |ntr: &NTR| { + let match_ntr = |lntr: &LabeledNTR| { // FIXME: these clones are gratuitous - let nt = (ntr.namespace.clone(), ntr.topic.clone()); + let nt = (lntr.ntr.namespace.clone(), lntr.ntr.topic.clone()); if let Some(latest) = latest_revision.get(&nt) { - ntr.revision_id == *latest + lntr.ntr.revision_id == *latest } else { true } @@ -845,27 +857,28 @@ impl BucketReader { pub async fn repair_manifest_ntp( &mut self, gaps: &Vec, - ntpr: &NTPR, + lntpr: &LabeledNTPR, ) -> Result, BucketReaderError> { - let initial_repairs = maybe_adjust_manifest(&ntpr, &gaps, self).await?; + let initial_repairs = maybe_adjust_manifest(&lntpr, &gaps, self).await?; info!( "[{}] Found {} repairs to manifest, projecting.", - ntpr, + lntpr, initial_repairs.len() ); - let objects = if let Some(o) = self.partitions.get_mut(&ntpr) { + let objects = if let Some(o) = self.partitions.get_mut(&lntpr) { o } else { return Ok(Vec::new()); }; - if let Some(metadata) = self.partition_manifests.get_mut(&ntpr) { + if let Some(metadata) = self.partition_manifests.get_mut(&lntpr) { if let Some(manifest) = metadata.head_manifest.as_mut() { project_repairs(manifest, &initial_repairs); + let remote_label = RemoteLabel::from_string(&lntpr.label); for seg in manifest.segments.values() { - if let Some(segment_key) = manifest.segment_key(seg) { + if let Some(segment_key) = manifest.segment_key(seg, &remote_label) { // Our repair might mean that a segment from the 'dropped' list // is now referenced by the manifest: use the 'adjust' side effect // of this function to swap that segment into the main list.:w @@ -888,25 +901,25 @@ impl BucketReader { // during the initial bucket scan. Accumulate them here for ingesting afterwards. let mut discovered_objects: Vec = vec![]; - for (ntpr, partition_objects) in &mut self.partitions { - if !filter.match_ntpr(ntpr) { + for (lntpr, partition_objects) in &mut self.partitions { + if !filter.match_lntpr(lntpr) { continue; } - if ntpr.ntp.partition_id == 0 { - let t_manifest_o = self.topic_manifests.get(&ntpr.to_ntr()); + if lntpr.ntpr.ntp.partition_id == 0 { + let t_manifest_o = self.topic_manifests.get(&lntpr.to_ntr()); if let None = t_manifest_o { - self.anomalies.ntr_no_topic_manifest.insert(ntpr.to_ntr()); + self.anomalies.ntr_no_topic_manifest.insert(lntpr.to_ntr()); } } - let p_metadata_o = self.partition_manifests.get(ntpr); + let p_metadata_o = self.partition_manifests.get(lntpr); match p_metadata_o { None => { // The manifest may be missing because we couldn't load it, in which // case that is already tracked in malformed_manifests - let manifest_key_bin = PartitionManifest::manifest_key(ntpr, "bin"); - let manifest_key_json = PartitionManifest::manifest_key(ntpr, "json"); + let manifest_key_bin = PartitionManifest::manifest_key(&lntpr, "bin"); + let manifest_key_json = PartitionManifest::manifest_key(&lntpr, "json"); if self .anomalies .malformed_manifests @@ -916,9 +929,9 @@ impl BucketReader { .malformed_manifests .contains(&manifest_key_json) { - debug!("Not reporting {} as missing because it's already reported as malformed", ntpr); + debug!("Not reporting {} as missing because it's already reported as malformed", lntpr); } else { - self.anomalies.ntpr_no_manifest.insert(ntpr.clone()); + self.anomalies.ntpr_no_manifest.insert(lntpr.clone()); } } Some(p_metadata) => { @@ -941,10 +954,10 @@ impl BucketReader { } let mut new_anomalies: Anomalies = Default::default(); - for (ntpr, partition_metadata) in &self.partition_manifests { - let mut raw_objects = self.partitions.get_mut(&ntpr); + for (lntpr, partition_metadata) in &self.partition_manifests { + let mut raw_objects = self.partitions.get_mut(&lntpr); - if !filter.match_ntpr(&ntpr) { + if !filter.match_lntpr(&lntpr) { continue; } @@ -955,14 +968,14 @@ impl BucketReader { for am in &partition_metadata.archive_manifests { self.anomalies .archive_manifests_outside_manifest - .insert(am.key(&ntpr)); + .insert(am.key(&lntpr)); } } if partition_metadata.head_manifest.is_some() { let anomalies = Self::analyze_manifest( self.client.clone(), - &ntpr, + &lntpr, partition_metadata.head_manifest.as_ref().unwrap(), &mut raw_objects, &mut discovered_objects, @@ -975,7 +988,7 @@ impl BucketReader { for archive_manifest in &partition_metadata.archive_manifests { let anomalies = Self::analyze_archive_manifest( self.client.clone(), - &ntpr, + &lntpr, partition_metadata.head_manifest.as_ref().unwrap(), archive_manifest, &mut raw_objects, @@ -988,13 +1001,13 @@ impl BucketReader { self.anomalies.merge(new_anomalies); - for (ntpr, _) in &mut self.partition_manifests { - if !filter.match_ntpr(ntpr) { + for (lntpr, _) in &mut self.partition_manifests { + if !filter.match_lntpr(lntpr) { continue; } - let t_manifest_o = self.topic_manifests.get(&ntpr.to_ntr()); + let t_manifest_o = self.topic_manifests.get(&lntpr.to_ntr()); if let None = t_manifest_o { - self.anomalies.ntr_no_topic_manifest.insert(ntpr.to_ntr()); + self.anomalies.ntr_no_topic_manifest.insert(lntpr.to_ntr()); } } @@ -1045,7 +1058,7 @@ impl BucketReader { async fn analyze_archive_manifest( client: Arc, - ntpr: &NTPR, + lntpr: &LabeledNTPR, head_manifest: &PartitionManifest, archive_manifest: &ArchivePartitionManifest, raw_objects: &mut Option<&mut PartitionObjects>, @@ -1054,7 +1067,7 @@ impl BucketReader { // TODO(vlad): validate parsed name against contents Self::analyze_manifest( client, - ntpr, + lntpr, &archive_manifest.manifest, raw_objects, discovered, @@ -1065,7 +1078,7 @@ impl BucketReader { async fn analyze_manifest( client: Arc, - ntpr: &NTPR, + lntpr: &LabeledNTPR, partition_manifest: &PartitionManifest, raw_objects: &mut Option<&mut PartitionObjects>, discovered: &mut Vec, @@ -1097,7 +1110,8 @@ impl BucketReader { partition_manifest.ntp(), segment_short_name ); - if let Some(expect_key) = partition_manifest.segment_key(segment) { + let remote_label = RemoteLabel::from_string(&lntpr.label); + if let Some(expect_key) = partition_manifest.segment_key(segment, &remote_label) { debug!("Calculated segment {}", expect_key); let so = archive_start_offset.unwrap_or(RawOffset::MIN); let bo = segment.base_offset; @@ -1137,17 +1151,17 @@ impl BucketReader { // as well. warn!( "[{}] Segment {} has missing delta_offset", - ntpr, segment.base_offset + lntpr, segment.base_offset ); - anomalies.ntpr_bad_deltas.insert(ntpr.clone()); + anomalies.ntpr_bad_deltas.insert(lntpr.clone()); } Some(seg_delta) => { if seg_delta < last_delta { warn!( "[{}] Segment {} has delta lower than previous", - ntpr, segment.base_offset + lntpr, segment.base_offset ); - anomalies.ntpr_bad_deltas.insert(ntpr.clone()); + anomalies.ntpr_bad_deltas.insert(lntpr.clone()); } } } @@ -1159,9 +1173,9 @@ impl BucketReader { if d_off > d_off_end { warn!( "[{}] Segment {} has end delta lower than base delta", - ntpr, segment.base_offset + lntpr, segment.base_offset ); - anomalies.ntpr_bad_deltas.insert(ntpr.clone()); + anomalies.ntpr_bad_deltas.insert(lntpr.clone()); } } @@ -1174,7 +1188,7 @@ impl BucketReader { warn!( "[{}] Segment {} has gap between base offset and previous segment's committed offset ({}). Missing {} records, from ts {} to ts {} ({})", - ntpr, + lntpr, segment.base_offset, last_committed_offset, segment.base_offset as RawOffset - (last_committed_offset + 1), @@ -1183,7 +1197,7 @@ impl BucketReader { dt.to_rfc3339()); let gap_list = anomalies .metadata_offset_gaps - .entry(ntpr.clone()) + .entry(lntpr.clone()) .or_insert_with(|| Vec::new()); let kafka_gap_begin = @@ -1203,9 +1217,9 @@ impl BucketReader { } else if (segment.base_offset as RawOffset) < last_committed_offset + 1 { warn!( "[{}] Segment {} has overlap between base offset and previous segment's committed offset ({})", - ntpr, segment.base_offset, last_committed_offset + lntpr, segment.base_offset, last_committed_offset ); - anomalies.ntpr_overlap_offsets.insert(ntpr.clone()); + anomalies.ntpr_overlap_offsets.insert(lntpr.clone()); } } @@ -1286,7 +1300,7 @@ impl BucketReader { fn maybe_stash_partition_key(keys: &mut Vec, k: FetchKey, filter: &NTPFilter) { lazy_static! { static ref META_NTP_PREFIX: Regex = - Regex::new("[a-f0-9]+/meta/([^]]+)/([^]]+)/(\\d+)_(\\d+)/.+").unwrap(); + Regex::new("[-a-f0-9]+/meta/([^]]+)/([^]]+)/(\\d+)_(\\d+)/.+").unwrap(); } if let Some(grps) = META_NTP_PREFIX.captures(k.as_str()) { let ns = grps.get(1).unwrap().as_str(); @@ -1309,10 +1323,10 @@ impl BucketReader { fn maybe_stash_topic_key(keys: &mut Vec, k: FetchKey, filter: &NTPFilter) { lazy_static! { - static ref META_NTP_PREFIX: Regex = + static ref META_TP_PREFIX: Regex = Regex::new("[a-f0-9]+/meta/([^]]+)/([^]]+)/.+").unwrap(); } - if let Some(grps) = META_NTP_PREFIX.captures(k.as_str()) { + if let Some(grps) = META_TP_PREFIX.captures(k.as_str()) { let ns = grps.get(1).unwrap().as_str(); let topic = grps.get(2).unwrap().as_str(); @@ -1402,14 +1416,14 @@ impl BucketReader { /// Yield a byte stream for each segment pub fn stream( &self, - ntpr: &NTPR, + lntpr: &LabeledNTPR, seek: Option, //) -> Pin>, BucketReaderError> + '_>> ) -> impl Stream + '_ { // TODO error handling for parittion DNE // TODO go via metadata: if we have no manifest, we should synthesize one and validate // rather than just stepping throuhg objects naively. - let partition_objects = self.partitions.get(ntpr).unwrap(); + let partition_objects = self.partitions.get(lntpr).unwrap(); // Box::pin( // futures::stream::iter(0..partition_objects.segment_objects.len()) // .then(|i| self.stream_one(&partition_objects.segment_objects[i])), @@ -1527,13 +1541,16 @@ impl BucketReader { }; let key = PartitionManifest::manifest_key( - &NTPR { - ntp: NTP { - namespace: manifest.namespace, - topic: manifest.topic, - partition_id: manifest.partition, + &LabeledNTPR { + ntpr: NTPR { + ntp: NTP { + namespace: manifest.namespace, + topic: manifest.topic, + partition_id: manifest.partition, + }, + revision_id: manifest.revision, }, - revision_id: manifest.revision, + label: None, }, extension, ); @@ -1548,16 +1565,17 @@ impl BucketReader { ) -> Result<(), BucketReaderError> { lazy_static! { static ref PARTITION_MANIFEST_KEY: Regex = - Regex::new("[a-f0-9]+/meta/([^]]+)/([^]]+)/(\\d+)_(\\d+)/manifest.(json|bin)") + Regex::new("([-a-f0-9]+)/meta/([^]]+)/([^]]+)/(\\d+)_(\\d+)/manifest.(json|bin)") .unwrap(); } if let Some(grps) = PARTITION_MANIFEST_KEY.captures(key) { + let prefix = grps.get(1).unwrap().as_str().to_string(); // Group::get calls are safe to unwrap() because regex always has those groups if it matched - let ns = grps.get(1).unwrap().as_str().to_string(); - let topic = grps.get(2).unwrap().as_str().to_string(); + let ns = grps.get(2).unwrap().as_str().to_string(); + let topic = grps.get(3).unwrap().as_str().to_string(); // (TODO: these aren't really-truly safe to unwrap because the string might have had too many digits) - let partition_id = grps.get(3).unwrap().as_str().parse::().unwrap(); - let partition_revision = grps.get(4).unwrap().as_str().parse::().unwrap(); + let partition_id = grps.get(4).unwrap().as_str().parse::().unwrap(); + let partition_revision = grps.get(5).unwrap().as_str().parse::().unwrap(); let ntpr = NTPR { ntp: NTP { namespace: ns, @@ -1566,7 +1584,7 @@ impl BucketReader { }, revision_id: partition_revision, }; - + let labeled_ntpr = LabeledNTPR::from_ntpr_and_prefix(ntpr, prefix); let manifest = match Self::decode_partition_manifest(key, body) { Ok(m) => m, Err(e) => { @@ -1577,7 +1595,7 @@ impl BucketReader { }; // Note: assuming memory is sufficient for manifests - match self.partition_manifests.get_mut(&ntpr) { + match self.partition_manifests.get_mut(&labeled_ntpr) { Some(meta) => { // Avoid overwriting a binary manifest with a JSON manifest if meta.head_manifest.is_none() @@ -1588,7 +1606,7 @@ impl BucketReader { } None => { self.partition_manifests.insert( - ntpr, + labeled_ntpr, PartitionMetadata { head_manifest: Some(manifest), archive_manifests: vec![], @@ -1610,22 +1628,23 @@ impl BucketReader { ) -> Result<(), BucketReaderError> { lazy_static! { static ref PARTITION_MANIFEST_KEY: Regex = - Regex::new("[a-f0-9]+/meta/([^]]+)/([^]]+)/(\\d+)_(\\d+)/manifest.(?:json|bin)\\.(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)").unwrap(); + Regex::new("([-a-f0-9]+)/meta/([^]]+)/([^]]+)/(\\d+)_(\\d+)/manifest.(?:json|bin)\\.(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)").unwrap(); } if let Some(grps) = PARTITION_MANIFEST_KEY.captures(key) { + let prefix = grps.get(1).unwrap().as_str().to_string(); // Group::get calls are safe to unwrap() because regex always has those groups if it matched - let ns = grps.get(1).unwrap().as_str().to_string(); - let topic = grps.get(2).unwrap().as_str().to_string(); + let ns = grps.get(2).unwrap().as_str().to_string(); + let topic = grps.get(3).unwrap().as_str().to_string(); // (TODO: these aren't really-truly safe to unwrap because the string might have had too many digits) - let partition_id = grps.get(3).unwrap().as_str().parse::().unwrap(); - let partition_revision = grps.get(4).unwrap().as_str().parse::().unwrap(); + let partition_id = grps.get(4).unwrap().as_str().parse::().unwrap(); + let partition_revision = grps.get(5).unwrap().as_str().parse::().unwrap(); - let base_offset = grps.get(5).unwrap().as_str().parse::().unwrap(); - let committed_offset = grps.get(6).unwrap().as_str().parse::().unwrap(); - let base_kafka_offset = grps.get(7).unwrap().as_str().parse::().unwrap(); - let next_kafka_offset = grps.get(8).unwrap().as_str().parse::().unwrap(); - let base_ts = grps.get(9).unwrap().as_str().parse::().unwrap(); - let last_ts = grps.get(10).unwrap().as_str().parse::().unwrap(); + let base_offset = grps.get(6).unwrap().as_str().parse::().unwrap(); + let committed_offset = grps.get(7).unwrap().as_str().parse::().unwrap(); + let base_kafka_offset = grps.get(8).unwrap().as_str().parse::().unwrap(); + let next_kafka_offset = grps.get(9).unwrap().as_str().parse::().unwrap(); + let base_ts = grps.get(10).unwrap().as_str().parse::().unwrap(); + let last_ts = grps.get(11).unwrap().as_str().parse::().unwrap(); let ntpr = NTPR { ntp: NTP { @@ -1635,9 +1654,13 @@ impl BucketReader { }, revision_id: partition_revision, }; + let labeled_ntpr = LabeledNTPR::from_ntpr_and_prefix(ntpr, prefix); // Note: assuming memory is sufficient for manifests - debug!("Storing archive manifest for {} from key {}", ntpr, key); + debug!( + "Storing archive manifest for {} from key {}", + labeled_ntpr, key + ); let manifest: PartitionManifest = match Self::decode_partition_manifest(key, body) { Ok(m) => m, @@ -1660,13 +1683,13 @@ impl BucketReader { }; // Note: assuming memory is sufficient for manifests - match self.partition_manifests.get_mut(&ntpr) { + match self.partition_manifests.get_mut(&labeled_ntpr) { Some(meta) => { meta.archive_manifests.push(archive_manifest); } None => { self.partition_manifests.insert( - ntpr, + labeled_ntpr, PartitionMetadata { head_manifest: None, archive_manifests: vec![archive_manifest], @@ -1759,10 +1782,18 @@ impl BucketReader { topic, revision_id: manifest.revision_id as i64, }; + // TODO: this tool doesn't support binary format manifests at all. + let labeled_ntr = LabeledNTR { + ntr: ntr.clone(), + label: None, + }; - debug!("Storing topic manifest for {} from key {}", ntr, key); + debug!( + "Storing topic manifest for {} from key {}", + &labeled_ntr, key + ); - if let Some(_) = self.topic_manifests.insert(ntr, manifest) { + if self.topic_manifests.insert(labeled_ntr, manifest).is_some() { warn!("Two topic manifests for same NTR seen ({})", key); } } else { @@ -1806,6 +1837,11 @@ impl BucketReader { topic, revision_id: initial_revision as i64, }; + // TODO: this tool doesn't support binary format manifests at all. + let labeled_ntr = LabeledNTR { + ntr: ntr.clone(), + label: None, + }; let mut cursor = std::io::Cursor::new(body.as_ref()); let marker = match LifecycleMarker::from_bytes(&mut cursor) { @@ -1821,7 +1857,7 @@ impl BucketReader { debug!("Storing lifecycle marker for {} from key {}", ntr, key); - if let Some(_) = self.lifecycle_markers.insert(ntr, marker) { + if self.lifecycle_markers.insert(labeled_ntr, marker).is_some() { warn!("Two lifecycle markers for same NTR seen ({})", key); } } else { @@ -1845,21 +1881,22 @@ impl BucketReader { lazy_static! { static ref SEGMENT_KEY: Regex = Regex::new( - "[a-f0-9]+/([^]]+)/([^]]+)/(\\d+)_(\\d+)/(\\d+)-(\\d+)-(\\d+)-(\\d+)-v1.log.(\\d+)" + "([-a-f0-9]+)/([^]]+)/([^]]+)/(\\d+)_(\\d+)/(\\d+)-(\\d+)-(\\d+)-(\\d+)-v1.log.(\\d+)" ) .unwrap(); } - let (ntpr, segment) = if let Some(grps) = SEGMENT_KEY.captures(key) { - let ns = grps.get(1).unwrap().as_str().to_string(); - let topic = grps.get(2).unwrap().as_str().to_string(); + let (lntpr, segment) = if let Some(grps) = SEGMENT_KEY.captures(key) { + let prefix = grps.get(1).unwrap().as_str().to_string(); + let ns = grps.get(2).unwrap().as_str().to_string(); + let topic = grps.get(3).unwrap().as_str().to_string(); // (TODO: these aren't really-truly safe to unwrap because the string might have had too many digits) - let partition_id = grps.get(3).unwrap().as_str().parse::().unwrap(); - let partition_revision = grps.get(4).unwrap().as_str().parse::().unwrap(); - let start_offset = grps.get(5).unwrap().as_str().parse::().unwrap(); - let _committed_offset = grps.get(6).unwrap().as_str(); - let size_bytes = grps.get(7).unwrap().as_str().parse::().unwrap(); - let original_term = grps.get(8).unwrap().as_str().parse::().unwrap(); - let upload_term = grps.get(9).unwrap().as_str().parse::().unwrap(); + let partition_id = grps.get(4).unwrap().as_str().parse::().unwrap(); + let partition_revision = grps.get(5).unwrap().as_str().parse::().unwrap(); + let start_offset = grps.get(6).unwrap().as_str().parse::().unwrap(); + let _committed_offset = grps.get(7).unwrap().as_str(); + let size_bytes = grps.get(8).unwrap().as_str().parse::().unwrap(); + let original_term = grps.get(9).unwrap().as_str().parse::().unwrap(); + let upload_term = grps.get(10).unwrap().as_str().parse::().unwrap(); debug!( "ingest_segment v2+ {}/{}/{} {} (key {}", ns, topic, partition_id, start_offset, key @@ -1873,13 +1910,13 @@ impl BucketReader { }, revision_id: partition_revision, }; - - if !filter.match_ntp(&ntpr.ntp) { + let labeled_ntpr = LabeledNTPR::from_ntpr_and_prefix(ntpr, prefix); + if !filter.match_ntp(&labeled_ntpr.ntpr.ntp) { return; } ( - ntpr, + labeled_ntpr, SegmentObject { key: key.to_string(), base_offset: start_offset, @@ -1910,13 +1947,14 @@ impl BucketReader { }, revision_id: partition_revision, }; + let labeled_ntpr = LabeledNTPR { ntpr, label: None }; - if !filter.match_ntp(&ntpr.ntp) { + if !filter.match_ntp(&labeled_ntpr.ntpr.ntp) { return; } ( - ntpr, + labeled_ntpr, SegmentObject { key: key.to_string(), base_offset: start_offset, @@ -1934,7 +1972,7 @@ impl BucketReader { let values = self .partitions - .entry(ntpr) + .entry(lntpr) .or_insert_with(|| PartitionObjects::new()); values.push(segment); } diff --git a/tools/rp_storage_tool/src/fundamental.rs b/tools/rp_storage_tool/src/fundamental.rs index 7ef6bfde16b2d..b6b104dfc8d21 100644 --- a/tools/rp_storage_tool/src/fundamental.rs +++ b/tools/rp_storage_tool/src/fundamental.rs @@ -95,6 +95,175 @@ impl<'de> Deserialize<'de> for NTR { } } +#[derive(Clone, Eq, PartialEq, Hash, Debug, Ord, PartialOrd)] +pub struct LabeledNTR { + pub ntr: NTR, + pub label: Option, +} + +impl LabeledNTR { + pub fn from_str(s: &str) -> Self { + lazy_static! { + static ref NTP_MASK_EXPR: Regex = + Regex::new("([^]]+/)?([^]]+)/([^]]+)_([^_]+)").unwrap(); + } + + if let Some(grps) = NTP_MASK_EXPR.captures(&s) { + // Remove the trailing /. + let label = grps + .get(1) + .map(|s| s.as_str().chars().take(s.len() - 1).collect()); + + let namespace = grps.get(2).unwrap().as_str().to_string(); + let topic = grps.get(3).unwrap().as_str().to_string(); + let revision = grps.get(4).unwrap().as_str().to_string(); + + let ntr = NTR { + namespace, + topic, + revision_id: revision.parse::().unwrap(), + }; + Self { ntr, label } + } else { + panic!("Malformed NTP query string"); + } + } +} + +impl fmt::Display for LabeledNTR { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let maybe_label = self + .label + .as_ref() + .map_or("".to_owned(), |l| format!("{}/", l)); + write!( + f, + "{}{}/{}_{}", + maybe_label, self.ntr.namespace, self.ntr.topic, self.ntr.revision_id + ) + } +} + +impl Serialize for LabeledNTR { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let stringized = format!("{}", self); + stringized.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for LabeledNTR { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let stringized = String::deserialize(deserializer)?; + Ok(Self::from_str(stringized.as_str())) + } +} + +#[derive(Clone, Eq, PartialEq, Hash, Debug, Ord, PartialOrd)] +pub struct LabeledNTPR { + pub ntpr: NTPR, + pub label: Option, +} + +impl LabeledNTPR { + pub fn to_ntr(&self) -> LabeledNTR { + return LabeledNTR { + ntr: NTR { + namespace: self.ntpr.ntp.namespace.clone(), + topic: self.ntpr.ntp.topic.clone(), + revision_id: self.ntpr.revision_id, + }, + label: self.label.clone(), + }; + } + + // Converts the given NTPR and prefix of a manifest or segment path and constructs a labeled + // NTPR. + pub fn from_ntpr_and_prefix(ntpr: NTPR, prefix: String) -> Self { + Self { + ntpr, + // Prefixes of exactly 8 characters are hashes rather than labels. + label: if prefix.len() != 8 { + Some(prefix) + } else { + None + }, + } + } + + pub fn from_str(s: &str) -> Self { + lazy_static! { + static ref NTP_MASK_EXPR: Regex = + Regex::new("([^]]+/)?([^]]+)/([^]]+)/([^_]+)_([^_]+)").unwrap(); + } + + if let Some(grps) = NTP_MASK_EXPR.captures(&s) { + // Remove the trailing /. + let label = grps + .get(1) + .map(|s| s.as_str().chars().take(s.len() - 1).collect()); + + let namespace = grps.get(2).unwrap().as_str().to_string(); + let topic = grps.get(3).unwrap().as_str().to_string(); + let partition = grps.get(4).unwrap().as_str().to_string(); + let revision = grps.get(5).unwrap().as_str().to_string(); + + Self { + ntpr: NTPR { + ntp: NTP { + namespace, + topic, + partition_id: partition.parse::().unwrap(), + }, + revision_id: revision.parse::().unwrap(), + }, + label, + } + } else { + panic!("Malformed NTP query string"); + } + } +} + +impl Serialize for LabeledNTPR { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let stringized = format!("{}", self); + stringized.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for LabeledNTPR { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let stringized = String::deserialize(deserializer)?; + Ok(Self::from_str(stringized.as_str())) + } +} + +impl fmt::Display for LabeledNTPR { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let maybe_label = if let Some(l) = &self.label { + format!("{}/", l) + } else { + "".to_string() + }; + f.write_fmt(format_args!( + "{}{}_{}", + maybe_label, self.ntpr.ntp, self.ntpr.revision_id + )) + } +} + /// A Partition, uniquely identified by its revision ID #[derive(Eq, PartialEq, Hash, Debug, Clone, Ord, PartialOrd)] pub struct NTPR { diff --git a/tools/rp_storage_tool/src/main.rs b/tools/rp_storage_tool/src/main.rs index cc9ab34f7f7c8..56c5c581393d4 100644 --- a/tools/rp_storage_tool/src/main.rs +++ b/tools/rp_storage_tool/src/main.rs @@ -21,10 +21,13 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::bucket_reader::{AnomalyStatus, BucketReader, MetadataGap, PartitionObjects}; use crate::error::BucketReaderError; -use crate::fundamental::{raw_to_kafka, KafkaOffset, RaftTerm, RawOffset, NTPR, NTR}; +use crate::fundamental::{ + raw_to_kafka, KafkaOffset, LabeledNTPR, LabeledNTR, RaftTerm, RawOffset, NTPR, NTR, +}; use crate::ntp_mask::NTPFilter; use crate::remote_types::{ - segment_shortname, LifecycleMarker, PartitionManifest, PartitionManifestSegment, TopicManifest, + segment_shortname, LifecycleMarker, PartitionManifest, PartitionManifestSegment, RemoteLabel, + TopicManifest, }; use crate::repair::{ DataAddNullSegment, ManifestEditAlterSegment, ManifestSegmentDiff, RepairEdit, @@ -259,8 +262,8 @@ impl DataScanTopicSummary { #[derive(Serialize)] pub struct DataScanReport { - ntps: BTreeMap, - summary: BTreeMap, + ntps: BTreeMap, + summary: BTreeMap, } impl NTPDataScanResult { @@ -320,7 +323,7 @@ impl NTPDataScanResult { } async fn seek( - ntpr: &NTPR, + ntpr: &LabeledNTPR, objects: &PartitionObjects, manifest: &PartitionManifest, bounds: (RawOffset, RawOffset), @@ -360,7 +363,7 @@ async fn seek( } async fn scan_data_ntp( - ntpr: &NTPR, + ntpr: &LabeledNTPR, objects: &PartitionObjects, bucket_reader: &BucketReader, bounds: Option<(RawOffset, RawOffset)>, @@ -592,7 +595,7 @@ async fn scan_data_ntp( delta_offset_end: curr_seg.delta_offset, max_timestamp: curr_seg.base_timestamp, base_timestamp: prev_seg.max_timestamp, - ntp_revision: Some(ntpr.revision_id as u64), + ntp_revision: Some(ntpr.ntpr.revision_id as u64), sname_format: curr_seg.sname_format, segment_term: curr_seg.segment_term, archiver_term: curr_seg.archiver_term, @@ -605,8 +608,11 @@ async fn scan_data_ntp( ); // Unwrap safe because we know we have current_segment_meta - let object_key = - manifest_opt.unwrap().segment_key(&null_seg).unwrap(); + let remote_label = RemoteLabel::from_string(&ntpr.label); + let object_key = manifest_opt + .unwrap() + .segment_key(&null_seg, &remote_label) + .unwrap(); ntp_report.proposed_repairs.push( RepairEdit::AddNullSegment(DataAddNullSegment { @@ -770,12 +776,12 @@ async fn scan_data( // TODO: wire up the batch/record read to consider any EOFs etc as errors // when reading from S3, and set failed=true here - let mut report: BTreeMap = BTreeMap::new(); + let mut report: BTreeMap = BTreeMap::new(); - let ntprs: Vec = bucket_reader + let ntprs: Vec = bucket_reader .partitions .keys() - .filter(|k| cli.filter.match_ntpr(k)) + .filter(|k| cli.filter.match_lntpr(k)) .map(|k| k.clone()) .collect(); @@ -806,7 +812,7 @@ async fn scan_data( // TODO: validate index objects // TODO: validate tx manifest objects - let mut topic_summaries: BTreeMap = BTreeMap::new(); + let mut topic_summaries: BTreeMap = BTreeMap::new(); for (ntpr, ntp_report) in &report { let ntr = ntpr.to_ntr(); if !topic_summaries.contains_key(&ntr) { @@ -858,7 +864,7 @@ async fn scan_gaps( scan_result: NTPDataScanResult, } - let mut results: HashMap = HashMap::new(); + let mut results: HashMap = HashMap::new(); let mut offset_gaps = HashMap::new(); std::mem::swap(&mut offset_gaps, &mut reader.anomalies.metadata_offset_gaps); @@ -980,7 +986,7 @@ async fn extract( let sink_client = object_store::local::LocalFileSystem::new_with_prefix(sink)?; for (ntpr, _objects) in bucket_reader.partitions.iter() { - if !cli.filter.match_ntpr(ntpr) { + if !cli.filter.match_lntpr(ntpr) { // If metadata was loaded from a file, it might not be filtered // in a way that lines up with cli.filter: re-filter so that one // can have a monolithic metadata file but extract individual partitions @@ -1019,12 +1025,14 @@ async fn extract( } for (ntr, _tp_man) in &bucket_reader.topic_manifests { - if !cli.filter.match_ntr(ntr) { + if !cli.filter.match_lntr(ntr) { continue; } - let path = - object_store::path::Path::from(TopicManifest::manifest_key(&ntr.namespace, &ntr.topic)); + let path = object_store::path::Path::from(TopicManifest::manifest_key( + &ntr.ntr.namespace, + &ntr.ntr.topic, + )); let get_r = bucket_reader.client.get(&path).await?; let bytes = get_r.bytes().await?; sink_client.put(&path, bytes).await?; @@ -1032,7 +1040,7 @@ async fn extract( if !metadata_only { for (ntpr, objects) in bucket_reader.partitions.iter() { - if !cli.filter.match_ntpr(ntpr) { + if !cli.filter.match_lntpr(ntpr) { continue; } diff --git a/tools/rp_storage_tool/src/ntp_mask.rs b/tools/rp_storage_tool/src/ntp_mask.rs index 4fc7bcc5bf7ce..1807006a90531 100644 --- a/tools/rp_storage_tool/src/ntp_mask.rs +++ b/tools/rp_storage_tool/src/ntp_mask.rs @@ -1,4 +1,4 @@ -use crate::fundamental::{NTP, NTR}; +use crate::fundamental::{LabeledNTPR, LabeledNTR, NTP, NTR}; use crate::NTPR; use regex::Regex; use std::fmt::{Display, Formatter}; @@ -167,6 +167,15 @@ impl NTPFilter { self.match_parts(&ntr.namespace, &ntr.topic, None, Some(ntr.revision_id)) } + pub fn match_lntr(&self, lntr: &LabeledNTR) -> bool { + self.match_parts( + &lntr.ntr.namespace, + &lntr.ntr.topic, + None, + Some(lntr.ntr.revision_id), + ) + } + pub fn match_ntp(&self, ntp: &NTP) -> bool { self.match_parts(&ntp.namespace, &ntp.topic, Some(ntp.partition_id), None) } @@ -179,6 +188,14 @@ impl NTPFilter { Some(ntpr.revision_id), ) } + pub fn match_lntpr(&self, lntpr: &LabeledNTPR) -> bool { + self.match_parts( + &lntpr.ntpr.ntp.namespace, + &lntpr.ntpr.ntp.topic, + Some(lntpr.ntpr.ntp.partition_id), + Some(lntpr.ntpr.revision_id), + ) + } } impl std::fmt::Display for NTPFilter { diff --git a/tools/rp_storage_tool/src/remote_types.rs b/tools/rp_storage_tool/src/remote_types.rs index 588314e44f340..ede49704cb2fc 100644 --- a/tools/rp_storage_tool/src/remote_types.rs +++ b/tools/rp_storage_tool/src/remote_types.rs @@ -1,6 +1,7 @@ use crate::error::BucketReaderError; use crate::fundamental::{ - raw_to_kafka, DeltaOffset, KafkaOffset, RaftTerm, RawOffset, Timestamp, NTP, NTPR, NTR, + raw_to_kafka, DeltaOffset, KafkaOffset, LabeledNTPR, RaftTerm, RawOffset, Timestamp, NTP, NTPR, + NTR, }; use deltafor::envelope::{SerdeEnvelope, SerdeEnvelopeContext}; use deltafor::{DeltaAlg, DeltaDelta, DeltaFORDecoder, DeltaXor}; @@ -13,6 +14,19 @@ use std::collections::HashMap; use std::marker::PhantomData; use xxhash_rust::xxh32::xxh32; +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RemoteLabel { + pub cluster_uuid: String, +} + +impl RemoteLabel { + pub fn from_string(s_opt: &Option) -> Option { + s_opt.as_ref().map(|s| RemoteLabel { + cluster_uuid: s.clone(), + }) + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ClusterMetadataManifest { pub version: u32, @@ -857,21 +871,28 @@ impl PartitionManifest { } } - pub fn manifest_key(ntpr: &NTPR, extension: &str) -> String { + pub fn manifest_key(ntpr: &LabeledNTPR, extension: &str) -> String { let path = format!( "{}/{}/{}_{}", - ntpr.ntp.namespace, ntpr.ntp.topic, ntpr.ntp.partition_id, ntpr.revision_id + ntpr.ntpr.ntp.namespace, + ntpr.ntpr.ntp.topic, + ntpr.ntpr.ntp.partition_id, + ntpr.ntpr.revision_id ); let bitmask = 0xf0000000; let hash = xxh32(path.as_bytes(), 0); - format!( - "{:08x}/meta/{}/manifest.{}", - hash & bitmask, - path, - extension - ) + let prefix = if let Some(label) = &ntpr.label { + label.clone() + } else { + format!("{:08x}/meta", hash & bitmask) + }; + format!("{}/meta/{}/manifest.{}", prefix, path, extension) } - pub fn segment_key(&self, segment: &PartitionManifestSegment) -> Option { + pub fn segment_key( + &self, + segment: &PartitionManifestSegment, + label: &Option, + ) -> Option { let sname_format = match segment.sname_format { None => SegmentNameFormat::V1, Some(1) => SegmentNameFormat::V1, @@ -916,6 +937,12 @@ impl PartitionManifest { "{}/{}/{}_{}/{}", self.namespace, self.topic, self.partition, self.revision, name ); + if let Some(remote_label) = label { + return Some(format!( + "{}/{}.{}", + remote_label.cluster_uuid, path, segment.archiver_term + )); + } let hash = xxh32(path.as_bytes(), 0); @@ -937,16 +964,24 @@ pub struct ArchivePartitionManifest { } impl ArchivePartitionManifest { - pub fn key(&self, ntpr: &NTPR) -> String { + pub fn key(&self, ntpr: &LabeledNTPR) -> String { let path = format!( "{}/{}/{}_{}", - ntpr.ntp.namespace, ntpr.ntp.topic, ntpr.ntp.partition_id, ntpr.revision_id + ntpr.ntpr.ntp.namespace, + ntpr.ntpr.ntp.topic, + ntpr.ntpr.ntp.partition_id, + ntpr.ntpr.revision_id ); let bitmask = 0xf0000000; let hash = xxh32(path.as_bytes(), 0); + let prefix = if let Some(label) = &ntpr.label { + label.clone() + } else { + format!("{:08x}/meta", hash & bitmask) + }; format!( - "{:08x}/meta/{}/manifest.json_{}_{}_{}_{}_{}_{}", - hash & bitmask, + "{}/{}/manifest.json_{}_{}_{}_{}_{}_{}", + prefix, path, self.base_offset, self.committed_offset, diff --git a/tools/rp_storage_tool/src/repair.rs b/tools/rp_storage_tool/src/repair.rs index 1649fd5d87554..5a12fc329dc9a 100644 --- a/tools/rp_storage_tool/src/repair.rs +++ b/tools/rp_storage_tool/src/repair.rs @@ -1,7 +1,9 @@ use crate::bucket_reader::{BucketReader, MetadataGap, SegmentObject}; use crate::error::BucketReaderError; -use crate::fundamental::{DeltaOffset, RaftTerm, RawOffset, NTPR}; -use crate::remote_types::{segment_shortname, PartitionManifest, PartitionManifestSegment}; +use crate::fundamental::{DeltaOffset, LabeledNTPR, RaftTerm, RawOffset, NTPR}; +use crate::remote_types::{ + segment_shortname, PartitionManifest, PartitionManifestSegment, RemoteLabel, +}; use log::{info, warn}; use serde::Serialize; @@ -98,7 +100,7 @@ pub fn project_repairs(manifest: &mut PartitionManifest, repairs: &Vec, bucket_reader: &BucketReader, ) -> Result, BucketReaderError> { @@ -131,8 +133,11 @@ pub async fn maybe_adjust_manifest( // segment's metadata let current_metadata = manifest.get_segment_by_offset(prev_seg).unwrap(); + let remote_label = RemoteLabel::from_string(&ntpr.label); let current_segment = SegmentObject { - key: manifest.segment_key(current_metadata).unwrap(), + key: manifest + .segment_key(current_metadata, &remote_label) + .unwrap(), size_bytes: current_metadata.size_bytes as u64, base_offset: current_metadata.base_offset as i64, upload_term: current_metadata.archiver_term as RaftTerm,