diff --git a/daemon/application.cpp b/daemon/application.cpp index f57fea56..a488fa14 100644 --- a/daemon/application.cpp +++ b/daemon/application.cpp @@ -147,6 +147,10 @@ void setup_di_container(kgr::container& container) { container.service(); + container.invoke( + [](auto& box_repo, auto& pool) { pool.count_links(box_repo); }); + container.service(); container.service(); @@ -185,7 +189,7 @@ void setup_defaults(kgr::container& container) { using namespace bxt; auto& unit_of_work_factory = container.service(); - auto uow = coro::sync_wait(unit_of_work_factory()); + auto uow = coro::sync_wait(unit_of_work_factory(true)); auto& repository = container.service(); @@ -200,12 +204,6 @@ void setup_defaults(kgr::container& container) { User default_user(Name("default"), "ILoveMacarons"); default_user.set_permissions({Permission("*")}); - const auto result = coro::sync_wait(uow->begin_async()); - if (!result.has_value()) { - bxt::loge(result.error().what()); - abort(); - } - const auto add_result = coro::sync_wait(repository.add_async(default_user, uow)); if (!add_result.has_value()) { diff --git a/daemon/persistence/box/pool/Pool.cpp b/daemon/persistence/box/pool/Pool.cpp index e04002cb..55d40838 100644 --- a/daemon/persistence/box/pool/Pool.cpp +++ b/daemon/persistence/box/pool/Pool.cpp @@ -50,7 +50,7 @@ namespace bxt::Persistence::Box { std::string Pool::format_target_path(Core::Domain::PoolLocation location, const std::string& arch, - const std::optional& filename) { + const std::optional& filename) const { std::string template_string = "{location}/{arch}"; if (m_options.templates.contains({arch})) { @@ -80,8 +80,8 @@ Pool::Pool(BoxOptions& box_options, : m_pool_path(box_options.box_path / "pool"), m_options(options), m_uow_factory(uow_factory) { - const auto sections = coro::sync_wait( - section_repository.all_async(coro::sync_wait(m_uow_factory()))); + auto uow = coro::sync_wait(m_uow_factory()); + const auto sections = coro::sync_wait(section_repository.all_async(uow)); if (!sections.has_value()) { loge("Pool: Can't get available sections, the reason is \"{}\". " @@ -112,28 +112,150 @@ Pool::Pool(BoxOptions& box_options, } } } - Pool::Result Pool::move_to(const PackageRecord& package) { PackageRecord result = package; for (auto& [location, description] : result.descriptions) { - std::filesystem::path target = + std::error_code ec; + auto canonical_path = + std::filesystem::weakly_canonical(description.filepath, ec); + if (ec) { return bxt::make_error(ec); } + + std::filesystem::path target = std::filesystem::weakly_canonical( format_target_path(location, package.id.section.architecture, - description.filepath.filename().string()); + canonical_path.filename().string())); - move_file(description.filepath, target); + move_file(canonical_path, target); + logd("Pool: Moving file from {} to {}", canonical_path.string(), + target.string()); description.filepath = target; + m_pool_package_link_counts.lazy_emplace_l( + target, + [&](auto& p) { + p.second += 1; + logd("Pool: Incrementing link count for {}", target.string()); + }, + [&](const auto& ctor) { + ctor(target, 1); + logd("Pool: Adding new link count for {}", target.string()); + }); + if (!description.signature_path.has_value()) { continue; } std::filesystem::path signature_target = format_target_path( location, package.id.section.architecture, - fmt::format("{}.sig", description.filepath.filename().string())); + fmt::format("{}.sig", target.filename().string())); move_file(*description.signature_path, signature_target); + logd("Pool: Moving signature file from {} to {}", + description.signature_path->string(), signature_target.string()); description.signature_path = signature_target; } return result; } + +PoolBase::Result Pool::remove(const PackageRecord& package) { + std::error_code ec; + for (const auto& [location, description] : package.descriptions) { + std::error_code ec; + auto canonical_path = + std::filesystem::weakly_canonical(description.filepath, ec); + if (ec) { return bxt::make_error(ec); } + + auto has_value = m_pool_package_link_counts.modify_if( + canonical_path, [&](auto& count) { + if (count.second > 0) { + count.second -= 1; + logd("Pool: Decrementing link count for {}", + canonical_path.string()); + } else { + logw("Pool: {} is being removed but the link count already " + "was 0. " + "Removing anyway.", + canonical_path.string()); + } + + if (count.second != 0) { return; } + + logd("Pool: No more links for {}, removing", + canonical_path.string()); + + std::filesystem::remove(canonical_path, ec); + if (ec) { + loge("Pool: Failed to remove file {}, error: {}", + canonical_path.string(), ec.message()); + return; + } + logd("Pool: Removed file {}", canonical_path.string()); + + if (!description.signature_path.has_value()) { return; } + + std::filesystem::remove(*description.signature_path, ec); + if (ec) { + loge("Pool: Failed to remove signature file {}, error: {}", + description.signature_path->string(), ec.message()); + return; + } + logd("Pool: Removed signature file {}", + description.signature_path->string()); + }); + + if (has_value && m_pool_package_link_counts[canonical_path] == 0) { + m_pool_package_link_counts.erase(canonical_path); + } + } + + return {}; +} + +PoolBase::Result + Pool::path_for_package(const PackageRecord& package) const { + PackageRecord result = package; + for (auto& [location, description] : result.descriptions) { + description.filepath = std::filesystem::weakly_canonical( + format_target_path(location, package.id.section.architecture, + description.filepath.filename().string())); + + if (description.signature_path.has_value()) { + description.signature_path = + std::filesystem::weakly_canonical(format_target_path( + location, package.id.section.architecture, + fmt::format("{}.sig", + description.filepath.filename().string()))); + } + } + return result; +} + +void Pool::count_links(PackageRepositoryBase& package_repository) { + auto packages = coro::sync_wait( + package_repository.all_async(coro::sync_wait(m_uow_factory()))); + + if (!packages.has_value()) { + loge("Pool: Can't get available packages, the reason is \"{}\". ", + packages.error().what()); + exit(1); + } + + for (const auto& package : *packages) { + for (const auto& [location, desc] : package.pool_entries()) { + std::error_code ec; + auto canonical_path = + std::filesystem::canonical(desc.file_path(), ec); + if (ec) { + loge("Pool: Can't get canonical path for {}, the reason is " + "\"{}\". ", + desc.file_path().string(), ec.message()); + exit(1); + } + m_pool_package_link_counts.lazy_emplace_l( + canonical_path, [&](auto& p) { p.second += 1; }, + [&](const auto& ctor) { ctor(canonical_path, 1); }); + } + } + logd("Pool: Counted links for {} paths", m_pool_package_link_counts.size()); +} + } // namespace bxt::Persistence::Box diff --git a/daemon/persistence/box/pool/Pool.h b/daemon/persistence/box/pool/Pool.h index e023957f..1c555cf9 100644 --- a/daemon/persistence/box/pool/Pool.h +++ b/daemon/persistence/box/pool/Pool.h @@ -9,12 +9,14 @@ #include "PoolOptions.h" #include "core/domain/enums/PoolLocation.h" +#include "core/domain/repositories/PackageRepositoryBase.h" #include "core/domain/repositories/RepositoryBase.h" #include "core/domain/repositories/UnitOfWorkBase.h" #include "persistence/box/BoxOptions.h" #include "persistence/box/pool/PoolBase.h" #include +#include #include namespace bxt::Persistence::Box { @@ -30,16 +32,26 @@ class Pool : public PoolBase { PoolBase::Result move_to(const PackageRecord& package) override; + PoolBase::Result remove(const PackageRecord& package) override; + + PoolBase::Result + path_for_package(const PackageRecord& package) const override; + + void count_links(PackageRepositoryBase& package_repository); + private: - std::string - format_target_path(Core::Domain::PoolLocation location, - const std::string& arch, - const std::optional& filename = {}); + std::string format_target_path( + Core::Domain::PoolLocation location, + const std::string& arch, + const std::optional& filename = {}) const; std::filesystem::path m_pool_path; std::set m_architectures; PoolOptions& m_options; UnitOfWorkBaseFactory& m_uow_factory; + + phmap::parallel_flat_hash_map + m_pool_package_link_counts; }; } // namespace bxt::Persistence::Box diff --git a/daemon/persistence/box/pool/PoolBase.h b/daemon/persistence/box/pool/PoolBase.h index 2fa7a122..45bba889 100644 --- a/daemon/persistence/box/pool/PoolBase.h +++ b/daemon/persistence/box/pool/PoolBase.h @@ -20,5 +20,9 @@ struct PoolBase { BXT_DECLARE_RESULT(FsError); virtual Result move_to(const PackageRecord& package) = 0; + virtual Result remove(const PackageRecord& package) = 0; + + virtual Result + path_for_package(const PackageRecord& package) const = 0; }; } // namespace bxt::Persistence::Box diff --git a/daemon/persistence/box/store/LMDBPackageStore.cpp b/daemon/persistence/box/store/LMDBPackageStore.cpp index d4bad452..224c0e09 100644 --- a/daemon/persistence/box/store/LMDBPackageStore.cpp +++ b/daemon/persistence/box/store/LMDBPackageStore.cpp @@ -8,6 +8,7 @@ #include "core/domain/repositories/UnitOfWorkBase.h" #include "persistence/box/pool/PoolBase.h" +#include "persistence/box/record/PackageRecord.h" #include "persistence/lmdb/LmdbUnitOfWork.h" #include @@ -32,11 +33,11 @@ coro::task> DatabaseError::ErrorType::InvalidArgument); } - auto moved_package = m_pool.move_to(package); + auto package_after_move = m_pool.path_for_package(package); - if (!moved_package.has_value()) { + if (!package_after_move.has_value()) { co_return bxt::make_error_with_source( - std::move(moved_package.error()), + std::move(package_after_move.error()), DatabaseError::ErrorType::InvalidArgument); } @@ -48,7 +49,8 @@ coro::task> DatabaseError::ErrorType::AlreadyExists); } - auto result = co_await m_db.put(lmdb_uow->txn().value, key, *moved_package); + auto result = + co_await m_db.put(lmdb_uow->txn().value, key, *package_after_move); if (!result.has_value()) { co_return bxt::make_error_with_source( @@ -56,6 +58,10 @@ coro::task> DatabaseError::ErrorType::InvalidArgument); } + lmdb_uow->hook([this, package = std::move(package)] { + m_pool.move_to(std::move(package)); + }); + co_return {}; } @@ -68,6 +74,14 @@ coro::task> DatabaseError::ErrorType::InvalidArgument); } + auto package_to_delete = + co_await m_db.get(lmdb_uow->txn().value, package_id.to_string()); + + if (!package_to_delete.has_value()) { + co_return bxt::make_error( + DatabaseError::ErrorType::EntityNotFound); + } + auto result = co_await m_db.del(lmdb_uow->txn().value, package_id.to_string()); @@ -76,6 +90,9 @@ coro::task> std::move(result.error()), DatabaseError::ErrorType::InvalidArgument); } + lmdb_uow->hook([this, package_to_delete = std::move(*package_to_delete)] { + return m_pool.remove(std::move(package_to_delete)).has_value(); + }); co_return {}; } diff --git a/daemon/persistence/lmdb/LmdbUnitOfWork.h b/daemon/persistence/lmdb/LmdbUnitOfWork.h index 69b01511..9947aaf4 100644 --- a/daemon/persistence/lmdb/LmdbUnitOfWork.h +++ b/daemon/persistence/lmdb/LmdbUnitOfWork.h @@ -12,6 +12,7 @@ #include #include +#include namespace bxt::Persistence { class LmdbUnitOfWork : public Core::Domain::UnitOfWorkBase { @@ -22,11 +23,18 @@ class LmdbUnitOfWork : public Core::Domain::UnitOfWorkBase { virtual ~LmdbUnitOfWork() = default; coro::task> commit_async() override { + while (!m_hooks.empty()) { + m_hooks.front()(); + m_hooks.pop(); + } + m_txn->value.commit(); co_return {}; } coro::task> rollback_async() override { + m_hooks = {}; + m_txn->value.abort(); co_return {}; } @@ -41,9 +49,12 @@ class LmdbUnitOfWork : public Core::Domain::UnitOfWorkBase { co_return {}; } + void hook(std::function&& hook) { m_hooks.push(std::move(hook)); } + Utilities::locked& txn() const { return *m_txn; } private: + std::queue> m_hooks; std::shared_ptr m_env; std::unique_ptr> m_txn; };