Skip to content

Commit

Permalink
feat(daemon): implement pool link counting
Browse files Browse the repository at this point in the history
This implements link counting for the pool. This is needed to track when package should be removed from the pool.

The counter increases each time a package that references the pool entry is added to the repository and decreases when a package is removed. When the counter reaches zero, the package is removed from the pool.
  • Loading branch information
LordTermor committed Jun 29, 2024
1 parent c0e8397 commit 0104e5b
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 23 deletions.
12 changes: 5 additions & 7 deletions daemon/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ void setup_di_container(kgr::container& container) {

container.service<di::Persistence::Box::BoxRepository>();

container.invoke<di::Persistence::Box::BoxRepository,
di::Persistence::Box::Pool>(
[](auto& box_repo, auto& pool) { pool.count_links(box_repo); });

container.service<di::Core::Application::AuthService>();
container.service<di::Core::Application::PermissionService>();

Expand Down Expand Up @@ -185,7 +189,7 @@ void setup_defaults(kgr::container& container) {
using namespace bxt;
auto& unit_of_work_factory =
container.service<di::Core::Domain::UnitOfWorkBaseFactory>();
auto uow = coro::sync_wait(unit_of_work_factory());
auto uow = coro::sync_wait(unit_of_work_factory(true));

auto& repository = container.service<di::Core::Domain::UserRepository>();

Expand All @@ -200,12 +204,6 @@ void setup_defaults(kgr::container& container) {
User default_user(Name("default"), "ILoveMacarons");
default_user.set_permissions({Permission("*")});

const auto result = coro::sync_wait(uow->begin_async());
if (!result.has_value()) {
bxt::loge(result.error().what());
abort();
}

const auto add_result =
coro::sync_wait(repository.add_async(default_user, uow));
if (!add_result.has_value()) {
Expand Down
138 changes: 130 additions & 8 deletions daemon/persistence/box/pool/Pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace bxt::Persistence::Box {
std::string
Pool::format_target_path(Core::Domain::PoolLocation location,
const std::string& arch,
const std::optional<std::string>& filename) {
const std::optional<std::string>& filename) const {
std::string template_string = "{location}/{arch}";

if (m_options.templates.contains({arch})) {
Expand Down Expand Up @@ -80,8 +80,8 @@ Pool::Pool(BoxOptions& box_options,
: m_pool_path(box_options.box_path / "pool"),
m_options(options),
m_uow_factory(uow_factory) {
const auto sections = coro::sync_wait(
section_repository.all_async(coro::sync_wait(m_uow_factory())));
auto uow = coro::sync_wait(m_uow_factory());
const auto sections = coro::sync_wait(section_repository.all_async(uow));

if (!sections.has_value()) {
loge("Pool: Can't get available sections, the reason is \"{}\". "
Expand Down Expand Up @@ -112,28 +112,150 @@ Pool::Pool(BoxOptions& box_options,
}
}
}

Pool::Result<PackageRecord> Pool::move_to(const PackageRecord& package) {
PackageRecord result = package;
for (auto& [location, description] : result.descriptions) {
std::filesystem::path target =
std::error_code ec;
auto canonical_path =
std::filesystem::weakly_canonical(description.filepath, ec);
if (ec) { return bxt::make_error<FsError>(ec); }

std::filesystem::path target = std::filesystem::weakly_canonical(
format_target_path(location, package.id.section.architecture,
description.filepath.filename().string());
canonical_path.filename().string()));

move_file(description.filepath, target);
move_file(canonical_path, target);
logd("Pool: Moving file from {} to {}", canonical_path.string(),
target.string());

description.filepath = target;

m_pool_package_link_counts.lazy_emplace_l(
target,
[&](auto& p) {
p.second += 1;
logd("Pool: Incrementing link count for {}", target.string());
},
[&](const auto& ctor) {
ctor(target, 1);
logd("Pool: Adding new link count for {}", target.string());
});

if (!description.signature_path.has_value()) { continue; }

std::filesystem::path signature_target = format_target_path(
location, package.id.section.architecture,
fmt::format("{}.sig", description.filepath.filename().string()));
fmt::format("{}.sig", target.filename().string()));

move_file(*description.signature_path, signature_target);
logd("Pool: Moving signature file from {} to {}",
description.signature_path->string(), signature_target.string());

description.signature_path = signature_target;
}
return result;
}

PoolBase::Result<void> Pool::remove(const PackageRecord& package) {
std::error_code ec;
for (const auto& [location, description] : package.descriptions) {
std::error_code ec;
auto canonical_path =
std::filesystem::weakly_canonical(description.filepath, ec);
if (ec) { return bxt::make_error<FsError>(ec); }

auto has_value = m_pool_package_link_counts.modify_if(
canonical_path, [&](auto& count) {
if (count.second > 0) {
count.second -= 1;
logd("Pool: Decrementing link count for {}",
canonical_path.string());
} else {
logw("Pool: {} is being removed but the link count already "
"was 0. "
"Removing anyway.",
canonical_path.string());
}

if (count.second != 0) { return; }

logd("Pool: No more links for {}, removing",
canonical_path.string());

std::filesystem::remove(canonical_path, ec);
if (ec) {
loge("Pool: Failed to remove file {}, error: {}",
canonical_path.string(), ec.message());
return;
}
logd("Pool: Removed file {}", canonical_path.string());

if (!description.signature_path.has_value()) { return; }

std::filesystem::remove(*description.signature_path, ec);
if (ec) {
loge("Pool: Failed to remove signature file {}, error: {}",
description.signature_path->string(), ec.message());
return;
}
logd("Pool: Removed signature file {}",
description.signature_path->string());
});

if (has_value && m_pool_package_link_counts[canonical_path] == 0) {
m_pool_package_link_counts.erase(canonical_path);
}
}

return {};
}

PoolBase::Result<PackageRecord>
Pool::path_for_package(const PackageRecord& package) const {
PackageRecord result = package;
for (auto& [location, description] : result.descriptions) {
description.filepath = std::filesystem::weakly_canonical(
format_target_path(location, package.id.section.architecture,
description.filepath.filename().string()));

if (description.signature_path.has_value()) {
description.signature_path =
std::filesystem::weakly_canonical(format_target_path(
location, package.id.section.architecture,
fmt::format("{}.sig",
description.filepath.filename().string())));
}
}
return result;
}

void Pool::count_links(PackageRepositoryBase& package_repository) {
auto packages = coro::sync_wait(
package_repository.all_async(coro::sync_wait(m_uow_factory())));

if (!packages.has_value()) {
loge("Pool: Can't get available packages, the reason is \"{}\". ",
packages.error().what());
exit(1);
}

for (const auto& package : *packages) {
for (const auto& [location, desc] : package.pool_entries()) {
std::error_code ec;
auto canonical_path =
std::filesystem::canonical(desc.file_path(), ec);
if (ec) {
loge("Pool: Can't get canonical path for {}, the reason is "
"\"{}\". ",
desc.file_path().string(), ec.message());
exit(1);
}
m_pool_package_link_counts.lazy_emplace_l(
canonical_path, [&](auto& p) { p.second += 1; },
[&](const auto& ctor) { ctor(canonical_path, 1); });
}
}
logd("Pool: Counted links for {} paths", m_pool_package_link_counts.size());
}

} // namespace bxt::Persistence::Box
20 changes: 16 additions & 4 deletions daemon/persistence/box/pool/Pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@

#include "PoolOptions.h"
#include "core/domain/enums/PoolLocation.h"
#include "core/domain/repositories/PackageRepositoryBase.h"
#include "core/domain/repositories/RepositoryBase.h"
#include "core/domain/repositories/UnitOfWorkBase.h"
#include "persistence/box/BoxOptions.h"
#include "persistence/box/pool/PoolBase.h"

#include <filesystem>
#include <parallel_hashmap/phmap.h>
#include <string>

namespace bxt::Persistence::Box {
Expand All @@ -30,16 +32,26 @@ class Pool : public PoolBase {
PoolBase::Result<PackageRecord>
move_to(const PackageRecord& package) override;

PoolBase::Result<void> remove(const PackageRecord& package) override;

PoolBase::Result<PackageRecord>
path_for_package(const PackageRecord& package) const override;

void count_links(PackageRepositoryBase& package_repository);

private:
std::string
format_target_path(Core::Domain::PoolLocation location,
const std::string& arch,
const std::optional<std::string>& filename = {});
std::string format_target_path(
Core::Domain::PoolLocation location,
const std::string& arch,
const std::optional<std::string>& filename = {}) const;

std::filesystem::path m_pool_path;
std::set<std::string> m_architectures;
PoolOptions& m_options;
UnitOfWorkBaseFactory& m_uow_factory;

phmap::parallel_flat_hash_map<std::filesystem::path, size_t>
m_pool_package_link_counts;
};

} // namespace bxt::Persistence::Box
4 changes: 4 additions & 0 deletions daemon/persistence/box/pool/PoolBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,9 @@ struct PoolBase {
BXT_DECLARE_RESULT(FsError);

virtual Result<PackageRecord> move_to(const PackageRecord& package) = 0;
virtual Result<void> remove(const PackageRecord& package) = 0;

virtual Result<PackageRecord>
path_for_package(const PackageRecord& package) const = 0;
};
} // namespace bxt::Persistence::Box
25 changes: 21 additions & 4 deletions daemon/persistence/box/store/LMDBPackageStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "core/domain/repositories/UnitOfWorkBase.h"
#include "persistence/box/pool/PoolBase.h"
#include "persistence/box/record/PackageRecord.h"
#include "persistence/lmdb/LmdbUnitOfWork.h"

#include <filesystem>
Expand All @@ -32,11 +33,11 @@ coro::task<std::expected<void, DatabaseError>>
DatabaseError::ErrorType::InvalidArgument);
}

auto moved_package = m_pool.move_to(package);
auto package_after_move = m_pool.path_for_package(package);

if (!moved_package.has_value()) {
if (!package_after_move.has_value()) {
co_return bxt::make_error_with_source<DatabaseError>(
std::move(moved_package.error()),
std::move(package_after_move.error()),
DatabaseError::ErrorType::InvalidArgument);
}

Expand All @@ -48,14 +49,19 @@ coro::task<std::expected<void, DatabaseError>>
DatabaseError::ErrorType::AlreadyExists);
}

auto result = co_await m_db.put(lmdb_uow->txn().value, key, *moved_package);
auto result =
co_await m_db.put(lmdb_uow->txn().value, key, *package_after_move);

if (!result.has_value()) {
co_return bxt::make_error_with_source<DatabaseError>(
std::move(result.error()),
DatabaseError::ErrorType::InvalidArgument);
}

lmdb_uow->hook([this, package = std::move(package)] {
m_pool.move_to(std::move(package));
});

co_return {};
}

Expand All @@ -68,6 +74,14 @@ coro::task<std::expected<void, DatabaseError>>
DatabaseError::ErrorType::InvalidArgument);
}

auto package_to_delete =
co_await m_db.get(lmdb_uow->txn().value, package_id.to_string());

if (!package_to_delete.has_value()) {
co_return bxt::make_error<DatabaseError>(
DatabaseError::ErrorType::EntityNotFound);
}

auto result =
co_await m_db.del(lmdb_uow->txn().value, package_id.to_string());

Expand All @@ -76,6 +90,9 @@ coro::task<std::expected<void, DatabaseError>>
std::move(result.error()),
DatabaseError::ErrorType::InvalidArgument);
}
lmdb_uow->hook([this, package_to_delete = std::move(*package_to_delete)] {
return m_pool.remove(std::move(package_to_delete)).has_value();
});

co_return {};
}
Expand Down
11 changes: 11 additions & 0 deletions daemon/persistence/lmdb/LmdbUnitOfWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <coro/task.hpp>
#include <memory>
#include <queue>
namespace bxt::Persistence {

class LmdbUnitOfWork : public Core::Domain::UnitOfWorkBase {
Expand All @@ -22,11 +23,18 @@ class LmdbUnitOfWork : public Core::Domain::UnitOfWorkBase {
virtual ~LmdbUnitOfWork() = default;

coro::task<Result<void>> commit_async() override {
while (!m_hooks.empty()) {
m_hooks.front()();
m_hooks.pop();
}

m_txn->value.commit();
co_return {};
}

coro::task<Result<void>> rollback_async() override {
m_hooks = {};

m_txn->value.abort();
co_return {};
}
Expand All @@ -41,9 +49,12 @@ class LmdbUnitOfWork : public Core::Domain::UnitOfWorkBase {
co_return {};
}

void hook(std::function<void()>&& hook) { m_hooks.push(std::move(hook)); }

Utilities::locked<lmdb::txn>& txn() const { return *m_txn; }

private:
std::queue<std::function<void()>> m_hooks;
std::shared_ptr<Utilities::LMDB::Environment> m_env;
std::unique_ptr<Utilities::locked<lmdb::txn>> m_txn;
};
Expand Down

0 comments on commit 0104e5b

Please sign in to comment.