Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add some method for lsmt, rename crc32c for zfile #308

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 64 additions & 17 deletions src/overlaybd/lsmt/file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ limitations under the License.
*/
#include "file.h"
#include <cstdint>
#include <openssl/bio.h>
#include <string.h>
#include <stdarg.h>
#include <memory>
Expand Down Expand Up @@ -165,7 +164,7 @@ struct HeaderTrailer {

UUID::String uuid; // 37 bytes.
UUID::String parent_uuid; // 37 bytes.
uint16_t reserved; // Reserved.
uint16_t reserved; // Reserved.

static const uint8_t LSMT_V1 = 1; // v1 (UUID check)
static const uint8_t LSMT_SUB_V1 = 1; // .1 deprecated level range.
Expand All @@ -179,7 +178,8 @@ struct HeaderTrailer {

class LSMTReadOnlyFile;
static LSMTReadOnlyFile *open_file_ro(IFile *file, bool ownership, bool reserve_tag);
static HeaderTrailer *verify_ht(IFile *file, char *buf, bool is_trailer = false, ssize_t st_size = -1);
static HeaderTrailer *verify_ht(IFile *file, char *buf, bool is_trailer = false,
ssize_t st_size = -1);

static const uint32_t ALIGNMENT = 512; // same as trim block size.
static const uint32_t ALIGNMENT4K = 4096;
Expand Down Expand Up @@ -520,6 +520,10 @@ class LSMTReadOnlyFile : public IFileRW {
return 0;
}

virtual std::vector<IFile *> get_lower_files() const override {
return m_files;
}

template <typename T1, typename T2, typename T3>
inline void forward(void *&buffer, T1 &offset, T2 &count, T3 step) {
(char *&)buffer += step * ALIGNMENT;
Expand Down Expand Up @@ -618,7 +622,8 @@ class LSMTReadOnlyFile : public IFileRW {
if (!pht->is_sealed()) {
LOG_ERROR_RETURN(ENOTSUP, -1, "Commit a compacted LSMTReadonlyFile is not allowed.");
}
CompactOptions opts(&m_files, (SegmentMapping*)m_index->buffer(), m_index->size(), m_vsize, &args);
CompactOptions opts(&m_files, (SegmentMapping *)m_index->buffer(), m_index->size(), m_vsize,
&args);

atomic_uint64_t _no_use_var(0);
return compact(opts, _no_use_var);
Expand Down Expand Up @@ -1053,9 +1058,9 @@ class LSMTSparseFile : public LSMTFile {
class LSMTWarpFile : public LSMTFile {
public:
const static int READ_BUFFER_SIZE = 65536;
IFile* m_target_file = nullptr;
IFile *m_target_file = nullptr;

LSMTWarpFile(){
LSMTWarpFile() {
m_filetype = LSMTFileType::WarpFile;
}
~LSMTWarpFile() {
Expand All @@ -1074,11 +1079,11 @@ class LSMTWarpFile : public LSMTFile {
};
m.tag = tag;
auto file = m_files[tag];
LOG_DEBUG("insert segment: `, filePtr: `", m,file);
LOG_DEBUG("insert segment: `, filePtr: `", m, file);
auto ret = file->pwrite(buf, count, offset);
if (ret != (ssize_t)count) {
LOG_ERRNO_RETURN(0, -1, "write failed, file:`, ret:`, pos:`, count:`",
file, ret, offset, count);
LOG_ERRNO_RETURN(0, -1, "write failed, file:`, ret:`, pos:`, count:`", file, ret,
offset, count);
}
static_cast<IMemoryIndex0 *>(m_index)->insert(m);
append_index(m);
Expand All @@ -1102,8 +1107,8 @@ class LSMTWarpFile : public LSMTFile {
while (lba.count > 0) {
SegmentMapping m;
m.offset = lba.offset / ALIGNMENT;
m.length = (Segment::MAX_LENGTH < lba.count / ALIGNMENT ?
Segment::MAX_LENGTH : lba.count / ALIGNMENT);
m.length = (Segment::MAX_LENGTH < lba.count / ALIGNMENT ? Segment::MAX_LENGTH
: lba.count / ALIGNMENT);
m.moffset = lba.roffset / ALIGNMENT;
m.tag = m_rw_tag + (uint8_t)SegmentType::remoteData;
LOG_DEBUG("insert segment: ` into findex: `", m, m_findex);
Expand Down Expand Up @@ -1211,8 +1216,8 @@ static HeaderTrailer *verify_ht(IFile *file, char *buf, bool is_trailer, ssize_t
LOG_ERRNO_RETURN(0, nullptr, "failed to read file trailer.");
if (!pht->verify_magic() || !pht->is_trailer() || !pht->is_data_file() || !pht->is_sealed())
LOG_ERROR_RETURN(0, nullptr,
"trailer magic, trailer type, "
"file type or sealedness doesn't match");
"trailer magic, trailer type, "
"file type or sealedness doesn't match");
return pht;
}

Expand Down Expand Up @@ -1266,13 +1271,14 @@ static SegmentMapping *do_load_index(IFile *file, HeaderTrailer *pheader_trailer
if (ibuf[i].offset != SegmentMapping::INVALID_OFFSET) {
ibuf[index_size] = ibuf[i];
ibuf[index_size].tag = (warp_file_tag ? ibuf[i].tag : 0);
if (min_tag > ibuf[index_size].tag) min_tag = ibuf[index_size].tag;
if (min_tag > ibuf[index_size].tag)
min_tag = ibuf[index_size].tag;
index_size++;
}
}
if (warp_file_tag) {
LOG_INFO("rebuild index tag for LSMTWarpFile.");
for (size_t i = 0; i<index_size; i++) {
for (size_t i = 0; i < index_size; i++) {
if (warp_file_tag == 1) /* only fsmeta */
ibuf[i].tag = (uint8_t)SegmentType::fsMeta;
if (warp_file_tag == 2) /* only remote data */
Expand Down Expand Up @@ -1451,8 +1457,7 @@ IFileRW *open_warpfile_rw(IFile *findex, IFile *fsmeta_file, IFile *target_file,
auto rst = new LSMTWarpFile;
rst->m_files.resize(2);
LSMT::HeaderTrailer ht;
auto p = do_load_index(findex, &ht, false,
3);
auto p = do_load_index(findex, &ht, false, 3);
auto pi = create_memory_index0(p, ht.index_size, 0, -1);
if (!pi) {
delete[] p;
Expand Down Expand Up @@ -1785,4 +1790,46 @@ IFileRW *stack_files(IFileRW *upper_layer, IFileRO *lower_layers, bool ownership
return rst;
}

IMemoryIndex *open_file_index(IFile *file) {
HeaderTrailer ht;
auto p = do_load_index(file, &ht, true);
if (!p) {
LOG_ERROR_RETURN(0, nullptr, "failed to load index");
}

auto pi = create_memory_index(p, ht.index_size, HeaderTrailer::SPACE / ALIGNMENT,
ht.index_offset / ALIGNMENT, true, ht.virtual_size);
if (!pi) {
delete[] p;
LOG_ERROR_RETURN(0, nullptr, "failed to create memory index");
}
return pi;
}

IFileRO *open_files_with_merged_index(IFile **src_files, size_t n, IMemoryIndex *index,
bool ownership) {
vector<IFile *> m_files(src_files, src_files + n);
auto rst = new LSMTReadOnlyFile;
rst->m_index = index;
rst->m_files = move(m_files);
rst->m_vsize = index->vsize();
rst->m_uuid.resize(rst->m_files.size());
rst->m_file_ownership = ownership;
return rst;
}

int is_lsmt(IFile *file) {
char buf[HeaderTrailer::SPACE];
auto ret = file->pread(buf, HeaderTrailer::SPACE, 0);
if (ret < (ssize_t)HeaderTrailer::SPACE)
LOG_ERRNO_RETURN(0, -1, "failed to read file header.");
auto pht = (HeaderTrailer *)buf;
if (!pht->verify_magic() || !pht->is_header()) {
LOG_DEBUG("file: ` is not lsmt object", file);
return 1;
}
LOG_DEBUG("file: ` is lsmt object", file);
return 0;
}

} // namespace LSMT
27 changes: 19 additions & 8 deletions src/overlaybd/lsmt/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ IMemoryIndex -> IMemoryIndex0 -> IComboIndex -> Index0 ( set<SegmentMap> ) -> Co
#pragma once
#include <inttypes.h>
#include <cstddef>
#include <vector>
#include <photon/fs/filesystem.h>
#include <photon/fs/virtual-file.h>
#include <photon/common/uuid.h>
Expand All @@ -45,6 +46,8 @@ class IFileRO : public photon::fs::VirtualReadOnlyFile {

// return uuid of m_files[layer_idx];
virtual int get_uuid(UUID &out, size_t layer_idx = 0) const = 0;

virtual std::vector<IFile *> get_lower_files() const = 0;
};

struct CommitArgs {
Expand Down Expand Up @@ -104,23 +107,25 @@ struct LayerInfo {
UUID uuid;
char *user_tag = nullptr; // a user provided string of message, 256B at most
bool sparse_rw = false;
size_t len = 0; // len of user_tag; if it's 0, it will be detected with strlen()
LayerInfo(photon::fs::IFile *_fdata = nullptr, photon::fs::IFile *_findex = nullptr) : fdata(_fdata), findex(_findex) {
size_t len = 0; // len of user_tag; if it's 0, it will be detected with strlen()
LayerInfo(photon::fs::IFile *_fdata = nullptr, photon::fs::IFile *_findex = nullptr)
: fdata(_fdata), findex(_findex) {
parent_uuid.clear();
uuid.generate();
}
};

struct WarpFileArgs {
photon::fs::IFile *findex = nullptr;
photon::fs::IFile *fsmeta = nullptr; // sparse_file
photon::fs::IFile *target_file = nullptr; // eg. remote target, local data file
photon::fs::IFile *fsmeta = nullptr; // sparse_file
photon::fs::IFile *target_file = nullptr; // eg. remote target, local data file
uint64_t virtual_size;
UUID::String parent_uuid;
UUID uuid;
char *user_tag = nullptr; // a user provided string of message, 256B at most
size_t len = 0; // len of user_tag; if it's 0, it will be detected with strlen()
WarpFileArgs(photon::fs::IFile *findex, photon::fs::IFile *fsmeta, photon::fs::IFile *target_file)
WarpFileArgs(photon::fs::IFile *findex, photon::fs::IFile *fsmeta,
photon::fs::IFile *target_file)
: findex(findex), fsmeta(fsmeta), target_file(target_file) {
uuid.generate();
}
Expand All @@ -131,7 +136,8 @@ extern "C" IFileRW *create_file_rw(const LayerInfo &args, bool ownership = false
// open a writable LSMT file constitued by a data file and a index file,
// optionally obtaining the ownerships of the underlying files,
// thus they will be destructed automatically.
extern "C" IFileRW *open_file_rw(photon::fs::IFile *fdata, photon::fs::IFile *findex, bool ownership = false);
extern "C" IFileRW *open_file_rw(photon::fs::IFile *fdata, photon::fs::IFile *findex,
bool ownership = false);

// open a read-only LSMT file, which was created by
// `close_seal()`ing or `commit()`ing a R/W LSMT file.
Expand All @@ -148,9 +154,10 @@ extern "C" IFileRO *open_files_ro(photon::fs::IFile **files, size_t n, bool owne
extern "C" IFileRW *create_warpfile(WarpFileArgs &args, bool ownership = false);

extern "C" IFileRW *open_warpfile_rw(photon::fs::IFile *findex, photon::fs::IFile *fsmeta_file,
photon::fs::IFile *target_file, bool ownership = false);
photon::fs::IFile *target_file, bool ownership = false);

extern "C" IFileRO *open_warpfile_ro(photon::fs::IFile *warpfile, photon::fs::IFile *target_file, bool ownership = false);
extern "C" IFileRO *open_warpfile_ro(photon::fs::IFile *warpfile, photon::fs::IFile *target_file,
bool ownership = false);

// merge multiple RO files (layers) into a single RO file (layer)
// returning 0 for success, -1 otherwise
Expand All @@ -164,4 +171,8 @@ extern "C" int merge_files_ro(photon::fs::IFile **src_files, size_t n, const Com
extern "C" IFileRW *stack_files(IFileRW *upper_layer, IFileRO *lower_layers, bool ownership = false,
bool check_order = true);

IMemoryIndex *open_file_index(photon::fs::IFile *file);
IFileRO *open_files_with_merged_index(photon::fs::IFile **src_files, size_t n, IMemoryIndex *index,
bool ownership = false);
int is_lsmt(photon::fs::IFile *file);
} // namespace LSMT
23 changes: 16 additions & 7 deletions src/overlaybd/lsmt/index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class Index : public IMemoryIndex {
const SegmentMapping *pbegin = nullptr;
const SegmentMapping *pend = nullptr;
uint64_t alloc_blk = 0;
uint64_t virtual_size = 0;

inline void get_alloc_blks() {
for (auto m : mapping) {
Expand All @@ -72,16 +73,18 @@ class Index : public IMemoryIndex {
delete[] pbegin;
}
}
Index(const SegmentMapping *pmappings = nullptr, size_t n = 0, bool ownership = true)
: ownership(ownership) {
Index(const SegmentMapping *pmappings = nullptr, size_t n = 0, bool ownership = true,
uint64_t vsize = 0)
: ownership(ownership), virtual_size(vsize) {
if (n == 0 || pmappings == nullptr) {
pbegin = pend = nullptr;
return;
}
pbegin = pmappings;
pend = pbegin + n;
}
Index(vector<SegmentMapping> &&m) : mapping(std::move(m)) {
Index(vector<SegmentMapping> &&m, uint64_t vsize = 0)
: mapping(std::move(m)), virtual_size(vsize) {
if (mapping.size()) {
pbegin = &mapping[0];
pend = pbegin + mapping.size();
Expand Down Expand Up @@ -146,6 +149,10 @@ class Index : public IMemoryIndex {
m.tag += delta;
return 0;
}

uint64_t vsize() const override {
return virtual_size;
}
};

class LevelIndex : public Index {
Expand Down Expand Up @@ -385,6 +392,7 @@ class Index0 : public IComboIndex {
virtual const IMemoryIndex0 *front_index() const override {
return this;
}
UNIMPLEMENTED(size_t vsize() const override);
};

static void merge_indexes(uint8_t level, vector<SegmentMapping> &mapping, const Index **pindexes,
Expand Down Expand Up @@ -540,10 +548,10 @@ IMemoryIndex0 *create_memory_index0(const SegmentMapping *pmappings, size_t n,
}

IMemoryIndex *create_memory_index(const SegmentMapping *pmappings, size_t n, uint64_t moffset_begin,
uint64_t moffset_end, bool ownership) {
uint64_t moffset_end, bool ownership, uint64_t vsize) {
auto ok1 = verify_mapping_order(pmappings, n);
auto ok2 = verify_mapping_moffset(pmappings, n, moffset_begin, moffset_end);
return (ok1 && ok2) ? new Index(pmappings, n, ownership) : nullptr;
return (ok1 && ok2) ? new Index(pmappings, n, ownership, vsize) : nullptr;
}

IMemoryIndex *create_level_index(const SegmentMapping *pmappings, size_t n, uint64_t moffset_begin,
Expand Down Expand Up @@ -605,7 +613,8 @@ static void merge_indexes(uint8_t level, vector<SegmentMapping> &mapping, const
}
}

IComboIndex *create_combo_index(IMemoryIndex0 *index0, const IMemoryIndex *index, uint8_t ro_index_count, bool ownership) {
IComboIndex *create_combo_index(IMemoryIndex0 *index0, const IMemoryIndex *index,
uint8_t ro_index_count, bool ownership) {
if (!index0 || !index)
LOG_ERROR_RETURN(EINVAL, nullptr, "invalid argument(s)");

Expand Down Expand Up @@ -666,6 +675,6 @@ IMemoryIndex *merge_memory_indexes(const IMemoryIndex **pindexes, size_t n) {
auto pi = (const Index **)pindexes;
mapping.reserve(pi[0]->size());
merge_indexes(0, mapping, pi, n, 0, UINT64_MAX);
return new Index(std::move(mapping));
return new Index(std::move(mapping), pindexes[0]->vsize());
}
} // namespace LSMT
4 changes: 3 additions & 1 deletion src/overlaybd/lsmt/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class IMemoryIndex {

// number of 512B blocks allocated
virtual uint64_t block_count() const = 0;

virtual uint64_t vsize() const = 0;
};

// the level 0 memory index, which supports write
Expand Down Expand Up @@ -169,7 +171,7 @@ inline IMemoryIndex0 *create_memory_index0() {
// the mapped offset must be within [moffset_begin, moffset_end)
extern "C" IMemoryIndex *create_memory_index(const SegmentMapping *pmappings, std::size_t n,
uint64_t moffset_begin, uint64_t moffset_end,
bool ownership = true);
bool ownership = true, uint64_t vsize = 0);

// merge multiple indexes into a single one index
// the `tag` field of each element in the result is subscript of `pindexes`:
Expand Down
16 changes: 5 additions & 11 deletions src/overlaybd/zfile/zfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,7 @@ const static uint8_t FLAG_VALID_FALSE = 0;
const static uint8_t FLAG_VALID_TRUE = 1;
const static uint8_t FLAG_VALID_CRC_CHECK = 2;

template <typename T>
static std::unique_ptr<T[]> new_align_mem(size_t _size, size_t alignment = ALIGNMENT_4K) {
size_t size = (_size + alignment - 1) / alignment * alignment;
return std::unique_ptr<T[]>(new T[size]);
}

inline uint32_t crc32c(void *buf, size_t size) {
inline uint32_t crc32c_salt(void *buf, size_t size) {
return crc32::crc32c_extend(buf, size, NOI_WELL_KNOWN_PRIME);
}
/* ZFile Format:
Expand Down Expand Up @@ -489,7 +483,7 @@ class CompressionFile : public VirtualReadOnlyFile {
int retry = 3;
again:
if (m_ht.opt.verify) {
auto c = crc32c((void *)block.buffer(), block.compressed_size);
auto c = crc32c_salt((void *)block.buffer(), block.compressed_size);
if (c != block.crc32_code()) {
if ((valid == FLAG_VALID_TRUE) && (retry--)) {
int reload_res = block.reload();
Expand Down Expand Up @@ -563,7 +557,7 @@ ssize_t compress_data(ICompressor *compressor, const unsigned char *buf, size_t
// LOG_DEBUG("compress buffer {offset: `, count: `} into ` bytes.", i, step, ret);
compressed_len = ret;
if (gen_crc) {
auto crc32_code = crc32c(dest_buf, compressed_len);
auto crc32_code = crc32c_salt(dest_buf, compressed_len);
*((uint32_t *)&dest_buf[compressed_len]) = crc32_code;
LOG_DEBUG("append ` bytes crc32_code: `", sizeof(uint32_t), crc32_code);
compressed_len += sizeof(uint32_t);
Expand Down Expand Up @@ -1157,7 +1151,7 @@ int zfile_compress(IFile *file, IFile *as, const CompressArgs *args) {
LOG_ERRNO_RETURN(0, -1, "failed to write compressed data.");
}
if (crc32_verify) {
auto crc32_code = crc32c(&compressed_data[j * buf_size], compressed_len[j]);
auto crc32_code = crc32c_salt(&compressed_data[j * buf_size], compressed_len[j]);
LOG_DEBUG("append ` bytes crc32_code: {offset: `, count: `, crc32: `}",
sizeof(uint32_t), moffset, compressed_len[j], HEX(crc32_code).width(8));
compressed_len[j] += sizeof(uint32_t);
Expand Down Expand Up @@ -1212,7 +1206,7 @@ int zfile_decompress(IFile *src, IFile *dst) {
for (off_t offset = 0; offset < raw_data_size; offset += block_size) {
auto len = (ssize_t)std::min(block_size, (size_t)raw_data_size - offset);
auto readn = file->pread(raw_buf.get(), len, offset);
LOG_DEBUG("readn: `, crc32: `", readn, HEX(crc32c(raw_buf.get(), len)).width(8));
LOG_DEBUG("readn: `, crc32: `", readn, HEX(crc32c_salt(raw_buf.get(), len)).width(8));
if (readn != len)
return -1;
if (dst->write(raw_buf.get(), readn) != readn) {
Expand Down