Skip to content

Commit

Permalink
Merge remote-tracking branch 'carlsverre/main' into 2.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Dec 20, 2024
2 parents eae4c20 + 10804c3 commit 84afe43
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 75 deletions.
22 changes: 7 additions & 15 deletions src/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod item;

use crate::{Keyspace, PartitionHandle, PersistMode};
use item::Item;
use lsm_tree::{AbstractTree, ValueType};
use lsm_tree::{AbstractTree, UserKey, UserValue, ValueType};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand Down Expand Up @@ -55,28 +55,20 @@ impl Batch {
}

/// Inserts a key-value pair into the batch
pub fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(
pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&mut self,
p: &PartitionHandle,
key: K,
value: V,
) {
self.data.push(Item::new(
p.name.clone(),
key.as_ref(),
value.as_ref(),
ValueType::Value,
));
self.data
.push(Item::new(p.name.clone(), key, value, ValueType::Value));
}

/// Adds a tombstone marker for a key
pub fn remove<K: AsRef<[u8]>>(&mut self, p: &PartitionHandle, key: K) {
self.data.push(Item::new(
p.name.clone(),
key.as_ref(),
vec![],
ValueType::Tombstone,
));
pub fn remove<K: Into<UserKey>>(&mut self, p: &PartitionHandle, key: K) {
self.data
.push(Item::new(p.name.clone(), key, vec![], ValueType::Tombstone));
}

/// Commits the batch to the [`Keyspace`] atomically
Expand Down
18 changes: 11 additions & 7 deletions src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,15 +860,19 @@ impl PartitionHandle {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> crate::Result<()> {
pub fn insert<K: Into<UserKey>, V: Into<UserKey>>(
&self,
key: K,
value: V,
) -> crate::Result<()> {
use std::sync::atomic::Ordering;

if self.is_deleted.load(Ordering::Relaxed) {
return Err(crate::Error::PartitionDeleted);
}

let key = key.as_ref();
let value = value.as_ref();
let key = key.into();
let value = value.into();

let mut journal_writer = self.journal.get_writer();

Expand All @@ -879,7 +883,7 @@ impl PartitionHandle {
return Err(crate::Error::Poisoned);
}

journal_writer.write_raw(&self.name, key, value, lsm_tree::ValueType::Value, seqno)?;
journal_writer.write_raw(&self.name, &key, &value, lsm_tree::ValueType::Value, seqno)?;

if !self.config.manual_journal_persist {
journal_writer
Expand Down Expand Up @@ -935,14 +939,14 @@ impl PartitionHandle {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<()> {
pub fn remove<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
use std::sync::atomic::Ordering;

if self.is_deleted.load(Ordering::Relaxed) {
return Err(crate::Error::PartitionDeleted);
}

let key = key.as_ref();
let key = key.into();

let mut journal_writer = self.journal.get_writer();

Expand All @@ -953,7 +957,7 @@ impl PartitionHandle {
return Err(crate::Error::Poisoned);
}

journal_writer.write_raw(&self.name, key, &[], lsm_tree::ValueType::Tombstone, seqno)?;
journal_writer.write_raw(&self.name, &key, &[], lsm_tree::ValueType::Tombstone, seqno)?;

if !self.config.manual_journal_persist {
journal_writer
Expand Down
10 changes: 5 additions & 5 deletions src/tx/conflict_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ impl ConflictManager {
}
}

pub fn mark_read(&mut self, partition: &PartitionKey, key: &Slice) {
self.push_read(partition, Read::Single(key.clone()));
pub fn mark_read(&mut self, partition: &PartitionKey, key: Slice) {
self.push_read(partition, Read::Single(key));
}

pub fn mark_conflict(&mut self, partition: &PartitionKey, key: &[u8]) {
pub fn mark_conflict(&mut self, partition: &PartitionKey, key: Slice) {
if let Some(tbl) = self.conflict_keys.get_mut(partition) {
tbl.insert(key.into());
tbl.insert(key);
} else {
self.conflict_keys
.entry(partition.clone())
.or_default()
.insert(key.into());
.insert(key);
}
}

Expand Down
28 changes: 18 additions & 10 deletions src/tx/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// (found in the LICENSE-* files in the repository)

use crate::{gc::GarbageCollection, PartitionHandle, TxKeyspace};
use lsm_tree::{gc::Report as GcReport, KvPair, UserValue};
use lsm_tree::{gc::Report as GcReport, KvPair, UserKey, UserValue};
use std::path::PathBuf;

/// Access to a partition of a transactional keyspace
Expand Down Expand Up @@ -63,7 +63,7 @@ impl TransactionalPartitionHandle {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn take<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<UserValue>> {
pub fn take<K: Into<UserKey>>(&self, key: K) -> crate::Result<Option<UserValue>> {
self.fetch_update(key, |_| None)
}

Expand Down Expand Up @@ -121,11 +121,13 @@ impl TransactionalPartitionHandle {
///
/// Will return `Err` if an IO error occurs.
#[allow(unused_mut)]
pub fn fetch_update<K: AsRef<[u8]>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
pub fn fetch_update<K: Into<UserKey>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
&self,
key: K,
mut f: F,
) -> crate::Result<Option<UserValue>> {
let key: UserKey = key.into();

#[cfg(feature = "single_writer_tx")]
{
let mut tx = self.keyspace.write_tx();
Expand All @@ -139,7 +141,7 @@ impl TransactionalPartitionHandle {
#[cfg(feature = "ssi_tx")]
loop {
let mut tx = self.keyspace.write_tx()?;
let prev = tx.fetch_update(self, key.as_ref(), &mut f)?;
let prev = tx.fetch_update(self, key.clone(), &mut f)?;
if tx.commit()?.is_ok() {
return Ok(prev);
}
Expand Down Expand Up @@ -200,11 +202,13 @@ impl TransactionalPartitionHandle {
///
/// Will return `Err` if an IO error occurs.
#[allow(unused_mut)]
pub fn update_fetch<K: AsRef<[u8]>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
pub fn update_fetch<K: Into<UserKey>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
&self,
key: K,
mut f: F,
) -> crate::Result<Option<UserValue>> {
let key = key.into();

#[cfg(feature = "single_writer_tx")]
{
let mut tx = self.keyspace.write_tx();
Expand All @@ -217,7 +221,7 @@ impl TransactionalPartitionHandle {
#[cfg(feature = "ssi_tx")]
loop {
let mut tx = self.keyspace.write_tx()?;
let updated = tx.update_fetch(self, key.as_ref(), &mut f)?;
let updated = tx.update_fetch(self, key.clone(), &mut f)?;
if tx.commit()?.is_ok() {
return Ok(updated);
}
Expand Down Expand Up @@ -251,7 +255,11 @@ impl TransactionalPartitionHandle {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> crate::Result<()> {
pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
value: V,
) -> crate::Result<()> {
#[cfg(feature = "single_writer_tx")]
{
let mut tx = self.keyspace.write_tx();
Expand All @@ -263,7 +271,7 @@ impl TransactionalPartitionHandle {
#[cfg(feature = "ssi_tx")]
{
let mut tx = self.keyspace.write_tx()?;
tx.insert(self, key.as_ref(), value.as_ref());
tx.insert(self, key, value);
tx.commit()?.expect("blind insert should not conflict ever");
Ok(())
}
Expand Down Expand Up @@ -296,7 +304,7 @@ impl TransactionalPartitionHandle {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<()> {
pub fn remove<K: Into<UserKey>>(&self, key: K) -> crate::Result<()> {
#[cfg(feature = "single_writer_tx")]
{
let mut tx = self.keyspace.write_tx();
Expand All @@ -308,7 +316,7 @@ impl TransactionalPartitionHandle {
#[cfg(feature = "ssi_tx")]
{
let mut tx = self.keyspace.write_tx()?;
tx.remove(self, key.as_ref());
tx.remove(self, key);
tx.commit()?.expect("blind remove should not conflict ever");
Ok(())
}
Expand Down
34 changes: 18 additions & 16 deletions src/tx/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl BaseTransaction {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub(super) fn take<K: AsRef<[u8]>>(
pub(super) fn take<K: Into<UserKey>>(
&mut self,
partition: &TxPartitionHandle,
key: K,
Expand All @@ -75,24 +75,25 @@ impl BaseTransaction {
///
/// Will return `Err` if an IO error occurs.
pub(super) fn update_fetch<
K: AsRef<[u8]>,
K: Into<UserKey>,
F: FnMut(Option<&UserValue>) -> Option<UserValue>,
>(
&mut self,
partition: &TxPartitionHandle,
key: K,
mut f: F,
) -> crate::Result<Option<UserValue>> {
let key = key.into();
let prev = self.get(partition, &key)?;
let updated = f(prev.as_ref());

if let Some(value) = &updated {
if let Some(value) = updated.clone() {
// NOTE: Skip insert if the value hasn't changed
if updated != prev {
self.insert(partition, &key, value);
if prev.as_ref() != Some(&value) {
self.insert(partition, key, value);
}
} else if prev.is_some() {
self.remove(partition, &key);
self.remove(partition, key);
}

Ok(updated)
Expand All @@ -106,24 +107,25 @@ impl BaseTransaction {
///
/// Will return `Err` if an IO error occurs.
pub(super) fn fetch_update<
K: AsRef<[u8]>,
K: Into<UserKey>,
F: FnMut(Option<&UserValue>) -> Option<UserValue>,
>(
&mut self,
partition: &TxPartitionHandle,
key: K,
mut f: F,
) -> crate::Result<Option<UserValue>> {
let key = key.into();
let prev = self.get(partition, &key)?;
let updated = f(prev.as_ref());

if let Some(value) = &updated {
if let Some(value) = updated {
// NOTE: Skip insert if the value hasn't changed
if updated != prev {
self.insert(partition, &key, value);
if prev.as_ref() != Some(&value) {
self.insert(partition, key, value);
}
} else if prev.is_some() {
self.remove(partition, &key);
self.remove(partition, key);
}

Ok(prev)
Expand Down Expand Up @@ -350,7 +352,7 @@ impl BaseTransaction {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub(super) fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(
pub(super) fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&mut self,
partition: &TxPartitionHandle,
key: K,
Expand All @@ -361,8 +363,8 @@ impl BaseTransaction {
.entry(partition.inner.name.clone())
.or_default()
.insert(lsm_tree::InternalValue::from_components(
key.as_ref(),
value.as_ref(),
key,
value,
// NOTE: Just take the max seqno, which should never be reached
// that way, the write is definitely always the newest
SeqNo::MAX,
Expand All @@ -378,13 +380,13 @@ impl BaseTransaction {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub(super) fn remove<K: AsRef<[u8]>>(&mut self, partition: &TxPartitionHandle, key: K) {
pub(super) fn remove<K: Into<UserKey>>(&mut self, partition: &TxPartitionHandle, key: K) {
// TODO: PERF: slow??
self.memtables
.entry(partition.inner.name.clone())
.or_default()
.insert(lsm_tree::InternalValue::new_tombstone(
key.as_ref(),
key,
// NOTE: Just take the max seqno, which should never be reached
// that way, the write is definitely always the newest
SeqNo::MAX,
Expand Down
10 changes: 5 additions & 5 deletions src/tx/write/single_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<'a> WriteTransaction<'a> {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn take<K: AsRef<[u8]>>(
pub fn take<K: Into<UserKey>>(
&mut self,
partition: &TxPartitionHandle,
key: K,
Expand Down Expand Up @@ -116,7 +116,7 @@ impl<'a> WriteTransaction<'a> {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn update_fetch<K: AsRef<[u8]>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
pub fn update_fetch<K: Into<UserKey>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
&mut self,
partition: &TxPartitionHandle,
key: K,
Expand Down Expand Up @@ -175,7 +175,7 @@ impl<'a> WriteTransaction<'a> {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn fetch_update<K: AsRef<[u8]>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
pub fn fetch_update<K: Into<UserKey>, F: FnMut(Option<&UserValue>) -> Option<UserValue>>(
&mut self,
partition: &TxPartitionHandle,
key: K,
Expand Down Expand Up @@ -555,7 +555,7 @@ impl<'a> WriteTransaction<'a> {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(
pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&mut self,
partition: &TxPartitionHandle,
key: K,
Expand Down Expand Up @@ -598,7 +598,7 @@ impl<'a> WriteTransaction<'a> {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn remove<K: AsRef<[u8]>>(&mut self, partition: &TxPartitionHandle, key: K) {
pub fn remove<K: Into<UserKey>>(&mut self, partition: &TxPartitionHandle, key: K) {
self.inner.remove(partition, key);
}

Expand Down
Loading

0 comments on commit 84afe43

Please sign in to comment.