From 0430368e2f3937986328928f825e717fce6ef8b8 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Fri, 28 Jun 2024 10:26:06 +0200 Subject: [PATCH] fix(market): fixed demand refresh logic that produced unusable counter-offers and agreements * added MarketModuleOptions, and allowed configuring demandRefreshIntervalSec * switched publishAndRefresh demand to use this setting instead of the expiry time of the demand (deprecated and can be removed) * the `golem.srv.comp.expiration` property of the agreement offer is now set by the workload director and not the basic director * the allocation is now created with +15min more expiry time than the market order itself * fixed the issue where we fail to accept the invoice for an agreement that was terminated due to expiry with an allocation that expired together with the agreement (change: the allocation expiry time is set to +15 minutes more than the configured rental duration) --- examples/advanced/manual-pools.ts | 19 ++++-- examples/advanced/reuse-allocation.ts | 15 ++++- examples/advanced/step-by-step.ts | 17 ++++-- src/experimental/deployment/deployment.ts | 7 ++- src/golem-network/golem-network.ts | 61 ++++++++++++++----- src/market/agreement/agreement.test.ts | 1 - src/market/demand/demand.ts | 3 +- .../basic-demand-director-config.test.ts | 13 ++-- .../directors/basic-demand-director-config.ts | 11 +--- .../demand/directors/basic-demand-director.ts | 1 - .../workload-demand-director-config.ts | 25 ++++++-- .../workload-demand-director.test.ts | 14 +++++ .../directors/workload-demand-director.ts | 2 + src/market/market.module.test.ts | 53 +++++++++------- src/market/market.module.ts | 55 ++++++++++++----- src/resource-rental/rental.module.ts | 2 +- src/resource-rental/resource-rental-pool.ts | 16 ++++- .../yagna/adapters/market-api-adapter.test.ts | 18 ++---- .../yagna/adapters/market-api-adapter.ts | 11 +++- tests/e2e/resourceRentalPool.spec.ts | 7 +++ 20 files changed, 246 insertions(+), 105 deletions(-) diff --git a/examples/advanced/manual-pools.ts b/examples/advanced/manual-pools.ts index eb8334bc0..6e0d2e11a 100644 --- a/examples/advanced/manual-pools.ts +++ b/examples/advanced/manual-pools.ts @@ -1,7 +1,8 @@ import { Allocation, DraftOfferProposalPool, GolemNetwork } from "@golem-sdk/golem-js"; import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; -const RENT_HOURS = 0.25; +const RENTAL_DURATION_HOURS = 0.25; +const ALLOCATION_DURATION_HOURS = RENTAL_DURATION_HOURS + 0.25; const demandOptions = { demand: { @@ -13,7 +14,7 @@ const demandOptions = { }, }, market: { - rentHours: RENT_HOURS, + rentHours: RENTAL_DURATION_HOURS, pricing: { model: "linear", maxStartPrice: 1, @@ -31,15 +32,25 @@ const demandOptions = { const glm = new GolemNetwork({ logger, }); + + console.assert( + ALLOCATION_DURATION_HOURS > RENTAL_DURATION_HOURS, + "Always create allocations that will live longer than the planned rental duration", + ); + let allocation: Allocation | undefined; try { await glm.connect(); - allocation = await glm.payment.createAllocation({ budget: 1, expirationSec: RENT_HOURS * 60 * 60 }); + allocation = await glm.payment.createAllocation({ budget: 1, expirationSec: ALLOCATION_DURATION_HOURS * 60 * 60 }); const proposalPool = new DraftOfferProposalPool({ minCount: 1 }); - const demandSpecification = await glm.market.buildDemandDetails(demandOptions.demand, allocation); + const demandSpecification = await glm.market.buildDemandDetails( + demandOptions.demand, + demandOptions.market, + allocation, + ); const draftProposal$ = glm.market.collectDraftOfferProposals({ demandSpecification, diff --git a/examples/advanced/reuse-allocation.ts b/examples/advanced/reuse-allocation.ts index baa3710a3..e2b212b18 100644 --- a/examples/advanced/reuse-allocation.ts +++ b/examples/advanced/reuse-allocation.ts @@ -4,7 +4,16 @@ */ import { MarketOrderSpec, GolemNetwork } from "@golem-sdk/golem-js"; import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; + (async () => { + const ALLOCATION_DURATION_HOURS = 1; + const RENTAL_DURATION_HOURS = 0.5; + + console.assert( + ALLOCATION_DURATION_HOURS > RENTAL_DURATION_HOURS, + "Always create allocations that will live longer than the planned rental duration", + ); + const glm = new GolemNetwork({ logger: pinoPrettyLogger({ level: "info", @@ -16,7 +25,7 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; const allocation = await glm.payment.createAllocation({ budget: 1, - expirationSec: 3600, + expirationSec: ALLOCATION_DURATION_HOURS * 60 * 60, }); const firstOrder: MarketOrderSpec = { @@ -24,7 +33,7 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; workload: { imageTag: "golem/alpine:latest" }, }, market: { - rentHours: 0.5, + rentHours: RENTAL_DURATION_HOURS, pricing: { model: "burn-rate", avgGlmPerHour: 0.5, @@ -40,7 +49,7 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; workload: { imageTag: "golem/alpine:latest" }, }, market: { - rentHours: 0.5, + rentHours: RENTAL_DURATION_HOURS, pricing: { model: "burn-rate", avgGlmPerHour: 0.5, diff --git a/examples/advanced/step-by-step.ts b/examples/advanced/step-by-step.ts index 140d12d84..12fcf4bbf 100644 --- a/examples/advanced/step-by-step.ts +++ b/examples/advanced/step-by-step.ts @@ -24,11 +24,20 @@ import { filter, map, switchMap, take } from "rxjs"; logger, }); + const RENTAL_DURATION_HOURS = 5 / 60; + const ALLOCATION_DURATION_HOURS = RENTAL_DURATION_HOURS + 0.25; + + console.assert( + ALLOCATION_DURATION_HOURS > RENTAL_DURATION_HOURS, + "Always create allocations that will live longer than the planned rental duration", + ); + let allocation: Allocation | undefined; try { await glm.connect(); // Define the order that we're going to place on the market + const order: MarketOrderSpec = { demand: { workload: { @@ -36,11 +45,10 @@ import { filter, map, switchMap, take } from "rxjs"; minCpuCores: 1, minMemGib: 2, }, - expirationSec: 30 * 60, }, market: { // We're only going to rent the provider for 5 minutes max - rentHours: 5 / 60, + rentHours: RENTAL_DURATION_HOURS, pricing: { model: "linear", maxStartPrice: 1, @@ -51,13 +59,14 @@ import { filter, map, switchMap, take } from "rxjs"; }; // Allocate funds to cover the order, we will only pay for the actual usage // so any unused funds will be returned to us at the end + allocation = await glm.payment.createAllocation({ budget: glm.market.estimateBudget({ order, maxAgreements: 1 }), - expirationSec: order.market.rentHours * 60 * 60, + expirationSec: ALLOCATION_DURATION_HOURS * 60 * 60, }); // Convert the human-readable order to a protocol-level format that we will publish on the network - const demandSpecification = await glm.market.buildDemandDetails(order.demand, allocation); + const demandSpecification = await glm.market.buildDemandDetails(order.demand, order.market, allocation); // Publish the order on the market // This methods creates and observable that publishes the order and refreshes it every 30 minutes. diff --git a/src/experimental/deployment/deployment.ts b/src/experimental/deployment/deployment.ts index fadb21a0c..6b3e98c6f 100644 --- a/src/experimental/deployment/deployment.ts +++ b/src/experimental/deployment/deployment.ts @@ -153,7 +153,12 @@ export class Deployment { ? this.networks.get(pool.options?.deployment.network) : undefined; - const demandSpecification = await this.modules.market.buildDemandDetails(pool.options.demand, allocation); + const demandSpecification = await this.modules.market.buildDemandDetails( + pool.options.demand, + pool.options.market, + allocation, + ); + const proposalPool = new DraftOfferProposalPool({ logger: this.logger, validateOfferProposal: pool.options.market.offerProposalFilter, diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index d12efd2f0..52c6abe4e 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -5,8 +5,9 @@ import { IMarketApi, MarketModule, MarketModuleImpl, - MarketOptions, + MarketModuleOptions, OfferProposal, + OrderMarketOptions, } from "../market"; import { Allocation, IPaymentApi, PaymentModule, PaymentModuleImpl, PaymentModuleOptions } from "../payment"; import { ActivityModule, ActivityModuleImpl, ExeUnitOptions, IActivityApi, IFileServer } from "../activity"; @@ -27,7 +28,7 @@ import { AgreementRepository } from "../shared/yagna/repository/agreement-reposi import { ProposalRepository } from "../shared/yagna/repository/proposal-repository"; import { CacheService } from "../shared/cache/CacheService"; import { DemandRepository } from "../shared/yagna/repository/demand-repository"; -import { BuildDemandOptions, IDemandRepository } from "../market/demand/demand"; +import { IDemandRepository, OrderDemandOptions } from "../market/demand"; import { GftpServerAdapter } from "../shared/storage/GftpServerAdapter"; import { GftpStorageProvider, @@ -77,6 +78,7 @@ export interface GolemNetworkOptions { * `DEBUG` environment variable to `golem-js:*`. */ logger?: Logger; + /** * Set the API key and URL for the Yagna API. */ @@ -84,17 +86,29 @@ export interface GolemNetworkOptions { key?: string; url?: string; }; + /** * Set payment-related options. + * * This is where you can specify the network, payment driver and more. * By default, the network is set to the `holesky` test network. */ payment?: Partial; + + /** + * Set market related options. + * + * This is where you can globally specify several options that determine how the SDK will + * interact with the market. + */ + market?: Partial; + /** * Set the data transfer protocol to use for file transfers. * Default is `gftp`. */ dataTransferProtocol?: DataTransferProtocol; + /** * Override some of the services used by the GolemNetwork instance. * This is useful for testing or when you want to provide your own implementation of some services. @@ -124,10 +138,11 @@ type AllocationOptions = { * Represents the order specifications which will result in access to ResourceRental. */ export interface MarketOrderSpec { - demand: BuildDemandOptions; - market: MarketOptions; + demand: OrderDemandOptions; + market: OrderMarketOptions; activity?: ResourceRentalOptions["activity"]; payment?: ResourceRentalOptions["payment"] & AllocationOptions; + /** The network that should be used for communication between the resources rented as part of this order */ network?: Network; } @@ -273,13 +288,13 @@ export class GolemNetwork { fileServer: this.options.override?.fileServer || new GftpServerAdapter(this.storageProvider), }; this.network = getFactory(NetworkModuleImpl, this.options.override?.network)(this.services); - this.market = getFactory( - MarketModuleImpl, - this.options.override?.market, - )({ - ...this.services, - networkModule: this.network, - }); + this.market = getFactory(MarketModuleImpl, this.options.override?.market)( + { + ...this.services, + networkModule: this.network, + }, + this.options.market, + ); this.payment = getFactory(PaymentModuleImpl, this.options.override?.payment)(this.services, this.options.payment); this.activity = getFactory(ActivityModuleImpl, this.options.override?.activity)(this.services); this.rental = getFactory( @@ -362,14 +377,27 @@ export class GolemNetwork { }): Promise { if (!order.payment?.allocation) { const budget = this.market.estimateBudget({ order, maxAgreements }); + + /** + * We need to create allocations that will exist longer than the agreements made. + * + * Without this in the event of agreement termination due to its expiry, + * the invoice for the agreement arrives, and we try to accept the invoice with + * an allocation that already expired (had the same expiration time as the agreement), + * which leads to unpaid invoices. + */ + const EXPIRATION_BUFFER_MINUTES = 15; + return this.payment.createAllocation({ budget, - expirationSec: order.market.rentHours * 60 * 60, + expirationSec: order.market.rentHours * (60 + EXPIRATION_BUFFER_MINUTES) * 60, }); } + if (typeof order.payment.allocation === "string") { return this.payment.getAllocation(order.payment.allocation); } + return order.payment.allocation; } @@ -434,7 +462,7 @@ export class GolemNetwork { allocation = await this.getAllocationFromOrder({ order, maxAgreements: 1 }); signal.throwIfAborted(); - const demandSpecification = await this.market.buildDemandDetails(order.demand, allocation); + const demandSpecification = await this.market.buildDemandDetails(order.demand, order.market, allocation); const draftProposal$ = this.market.collectDraftOfferProposals({ demandSpecification, pricing: order.market.pricing, @@ -554,7 +582,7 @@ export class GolemNetwork { allocation = await this.getAllocationFromOrder({ order, maxAgreements }); signal.throwIfAborted(); - const demandSpecification = await this.market.buildDemandDetails(order.demand, allocation); + const demandSpecification = await this.market.buildDemandDetails(order.demand, order.market, allocation); const draftProposal$ = this.market.collectDraftOfferProposals({ demandSpecification, @@ -563,6 +591,8 @@ export class GolemNetwork { }); subscription = proposalPool.readFrom(draftProposal$); + const rentSeconds = order.market.rentHours * 60 * 60; + resourceRentalPool = this.rental.createResourceRentalPool(proposalPool, allocation, { poolSize, network: order.network, @@ -572,9 +602,10 @@ export class GolemNetwork { exeUnit: { setup, teardown }, }, agreementOptions: { - expirationSec: order.market.rentHours * 60 * 60, + expirationSec: rentSeconds, }, }); + this.cleanupTasks.push(cleanup); return resourceRentalPool; diff --git a/src/market/agreement/agreement.test.ts b/src/market/agreement/agreement.test.ts index 0e7eb62ca..7b04c7759 100644 --- a/src/market/agreement/agreement.test.ts +++ b/src/market/agreement/agreement.test.ts @@ -33,7 +33,6 @@ const demand = new Demand( properties: [], }, "erc20-holesky-tglm", - 30 * 60, ), ); diff --git a/src/market/demand/demand.ts b/src/market/demand/demand.ts index 7ca49811a..42a26052b 100644 --- a/src/market/demand/demand.ts +++ b/src/market/demand/demand.ts @@ -82,7 +82,7 @@ export interface BasicDemandPropertyConfig { midAgreementPaymentTimeoutSec: number; } -export type BuildDemandOptions = Partial<{ +export type OrderDemandOptions = Partial<{ /** Demand properties related to the activities that will be executed on providers */ workload: Partial; /** Demand properties that determine payment related terms & conditions of the agreement */ @@ -104,7 +104,6 @@ export class DemandSpecification { /** Represents the low level demand request body that will be used to subscribe for offers matching our "computational resource needs" */ public readonly prototype: DemandBodyPrototype, public readonly paymentPlatform: string, - public readonly expirationSec: number, ) {} } diff --git a/src/market/demand/directors/basic-demand-director-config.test.ts b/src/market/demand/directors/basic-demand-director-config.test.ts index 8028947e0..f3757fa5e 100644 --- a/src/market/demand/directors/basic-demand-director-config.test.ts +++ b/src/market/demand/directors/basic-demand-director-config.test.ts @@ -1,12 +1,11 @@ import { BasicDemandDirectorConfig } from "./basic-demand-director-config"; describe("BasicDemandDirectorConfig", () => { - test("should throw user error if expiration option is invalid", () => { - expect(() => { - new BasicDemandDirectorConfig({ - expirationSec: -3, - subnetTag: "public", - }); - }).toThrow("The demand expiration time has to be a positive integer"); + test("it sets the subnet tag property", () => { + const config = new BasicDemandDirectorConfig({ + subnetTag: "public", + }); + + expect(config.subnetTag).toBe("public"); }); }); diff --git a/src/market/demand/directors/basic-demand-director-config.ts b/src/market/demand/directors/basic-demand-director-config.ts index 46041b9c5..796171d77 100644 --- a/src/market/demand/directors/basic-demand-director-config.ts +++ b/src/market/demand/directors/basic-demand-director-config.ts @@ -1,28 +1,19 @@ import { BaseConfig } from "./base-config"; -import { GolemConfigError } from "../../../shared/error/golem-error"; import * as EnvUtils from "../../../shared/utils/env"; export interface BasicDemandDirectorConfigOptions { - expirationSec: number; + /** Determines which subnet tag should be used for the offer/demand matching */ subnetTag: string; } export class BasicDemandDirectorConfig extends BaseConfig implements BasicDemandDirectorConfigOptions { - public readonly expirationSec = 30 * 60; // 30 minutes public readonly subnetTag: string = EnvUtils.getYagnaSubnet(); constructor(options?: Partial) { super(); - if (options?.expirationSec) { - this.expirationSec = options.expirationSec; - } if (options?.subnetTag) { this.subnetTag = options.subnetTag; } - - if (!this.isPositiveInt(this.expirationSec)) { - throw new GolemConfigError("The demand expiration time has to be a positive integer"); - } } } diff --git a/src/market/demand/directors/basic-demand-director.ts b/src/market/demand/directors/basic-demand-director.ts index f6a8208d0..b329e5925 100644 --- a/src/market/demand/directors/basic-demand-director.ts +++ b/src/market/demand/directors/basic-demand-director.ts @@ -8,7 +8,6 @@ export class BasicDemandDirector implements IDemandDirector { apply(builder: DemandBodyBuilder) { builder .addProperty("golem.srv.caps.multi-activity", true) - .addProperty("golem.srv.comp.expiration", Date.now() + this.config.expirationSec * 1000) .addProperty("golem.node.debug.subnet", this.config.subnetTag); builder diff --git a/src/market/demand/directors/workload-demand-director-config.ts b/src/market/demand/directors/workload-demand-director-config.ts index a0f4c973a..1c8c2d6cd 100644 --- a/src/market/demand/directors/workload-demand-director-config.ts +++ b/src/market/demand/directors/workload-demand-director-config.ts @@ -1,11 +1,17 @@ import { WorkloadDemandDirectorConfigOptions } from "../options"; import { GolemConfigError } from "../../../shared/error/golem-error"; +import { BaseConfig } from "./base-config"; export enum PackageFormat { GVMKitSquash = "gvmkit-squash", } -export class WorkloadDemandDirectorConfig { +type RequiredWorkloadDemandConfigOptions = { + /** Number of seconds after which the agreement resulting from this demand will no longer be valid */ + expirationSec: number; +}; + +export class WorkloadDemandDirectorConfig extends BaseConfig { readonly packageFormat: string = PackageFormat.GVMKitSquash; readonly engine: string = "vm"; readonly minMemGib: number = 0.5; @@ -13,6 +19,9 @@ export class WorkloadDemandDirectorConfig { readonly minCpuThreads: number = 1; readonly minCpuCores: number = 1; readonly capabilities: string[] = []; + + readonly expirationSec: number; + readonly manifest?: string; readonly manifestSig?: string; readonly manifestSigAlgorithm?: string; @@ -22,10 +31,12 @@ export class WorkloadDemandDirectorConfig { readonly imageTag?: string; readonly imageUrl?: string; - constructor(options?: Partial) { - if (options) { - Object.assign(this, options); - } + constructor(options: Partial & RequiredWorkloadDemandConfigOptions) { + super(); + + Object.assign(this, options); + + this.expirationSec = options.expirationSec; if (!this.imageHash && !this.manifest && !this.imageTag && !this.imageUrl) { throw new GolemConfigError("You must define a package or manifest option"); @@ -34,5 +45,9 @@ export class WorkloadDemandDirectorConfig { if (this.imageUrl && !this.imageHash) { throw new GolemConfigError("If you provide an imageUrl, you must also provide it's SHA3-224 hash in imageHash"); } + + if (!this.isPositiveInt(this.expirationSec)) { + throw new GolemConfigError("The expirationSec param has to be a positive integer"); + } } } diff --git a/src/market/demand/directors/workload-demand-director.test.ts b/src/market/demand/directors/workload-demand-director.test.ts index d40c5f295..91cd2f5a6 100644 --- a/src/market/demand/directors/workload-demand-director.test.ts +++ b/src/market/demand/directors/workload-demand-director.test.ts @@ -9,6 +9,7 @@ describe("ActivityDemandDirector", () => { const director = new WorkloadDemandDirector( new WorkloadDemandDirectorConfig({ imageHash: "529f7fdaf1cf46ce3126eb6bbcd3b213c314fe8fe884914f5d1106d4", + expirationSec: 600, }), ); await director.apply(builder); @@ -43,6 +44,7 @@ describe("ActivityDemandDirector", () => { manifestCert, manifestSigAlgorithm, capabilities, + expirationSec: 600, }), ); await director.apply(builder); @@ -71,4 +73,16 @@ describe("ActivityDemandDirector", () => { ]), ); }); + + test("should throw an error if user providers a negative expirationSec value", () => { + expect( + () => + new WorkloadDemandDirector( + new WorkloadDemandDirectorConfig({ + imageHash: "529f7fdaf1cf46ce3126eb6bbcd3b213c314fe8fe884914f5d1106d4", + expirationSec: -3, + }), + ), + ).toThrow("The expirationSec param has to be a positive integer"); + }); }); diff --git a/src/market/demand/directors/workload-demand-director.ts b/src/market/demand/directors/workload-demand-director.ts index 4254fa688..97ebbf034 100644 --- a/src/market/demand/directors/workload-demand-director.ts +++ b/src/market/demand/directors/workload-demand-director.ts @@ -8,6 +8,8 @@ export class WorkloadDemandDirector implements IDemandDirector { constructor(private config: WorkloadDemandDirectorConfig) {} public async apply(builder: DemandBodyBuilder) { + builder.addProperty("golem.srv.comp.expiration", Date.now() + this.config.expirationSec * 1000); + builder .addProperty("golem.srv.comp.vm.package_format", this.config.packageFormat) .addConstraint("golem.runtime.name", this.config.engine); diff --git a/src/market/market.module.test.ts b/src/market/market.module.test.ts index 8a85f846c..35cb21bc2 100644 --- a/src/market/market.module.test.ts +++ b/src/market/market.module.test.ts @@ -24,6 +24,8 @@ const testAgreementEvent$ = new Subject(); let marketModule: MarketModuleImpl; +const DEMAND_REFRESH_INTERVAL_SEC = 60; + beforeEach(() => { jest.useFakeTimers(); jest.resetAllMocks(); @@ -33,17 +35,22 @@ beforeEach(() => { when(mockMarketApiAdapter.collectAgreementEvents()).thenReturn(testAgreementEvent$); - marketModule = new MarketModuleImpl({ - activityApi: instance(imock()), - paymentApi: instance(imock()), - networkApi: instance(imock()), - yagna: instance(mockYagna), - logger: instance(imock()), - marketApi: instance(mockMarketApiAdapter), - fileServer: instance(imock()), - storageProvider: instance(imock()), - networkModule: instance(imock()), - }); + marketModule = new MarketModuleImpl( + { + activityApi: instance(imock()), + paymentApi: instance(imock()), + networkApi: instance(imock()), + yagna: instance(mockYagna), + logger: instance(imock()), + marketApi: instance(mockMarketApiAdapter), + fileServer: instance(imock()), + storageProvider: instance(imock()), + networkModule: instance(imock()), + }, + { + demandRefreshIntervalSec: DEMAND_REFRESH_INTERVAL_SEC, + }, + ); }); describe("Market module", () => { @@ -53,6 +60,7 @@ describe("Market module", () => { id: "allocation-id", paymentPlatform: "erc20-holesky-tglm", } as Allocation; + when(mockMarketApiAdapter.getPaymentRelatedDemandDecorations("allocation-id")).thenResolve({ properties: [ { @@ -70,19 +78,26 @@ describe("Market module", () => { ], }); + const rentalDurationHours = 1; const demandSpecification = await marketModule.buildDemandDetails( { workload: { imageHash: "AAAAHASHAAAA", imageUrl: "https://custom.image.url/", }, - expirationSec: 42, payment: { debitNotesAcceptanceTimeoutSec: 42, midAgreementDebitNoteIntervalSec: 42, midAgreementPaymentTimeoutSec: 42, }, }, + { + rentHours: rentalDurationHours, + pricing: { + model: "burn-rate", + avgGlmPerHour: 1, + }, + }, allocation, ); @@ -103,14 +118,14 @@ describe("Market module", () => { key: "golem.srv.caps.multi-activity", value: true, }, - { - key: "golem.srv.comp.expiration", - value: Date.now() + 42 * 1000, - }, { key: "golem.node.debug.subnet", value: "public", }, + { + key: "golem.srv.comp.expiration", + value: Date.now() + rentalDurationHours * 60 * 60 * 1000, + }, { key: "golem.srv.comp.vm.package_format", value: "gvmkit-squash", @@ -142,7 +157,6 @@ describe("Market module", () => { ]; expect(demandSpecification.paymentPlatform).toBe(allocation.paymentPlatform); - expect(demandSpecification.expirationSec).toBe(42); expect(demandSpecification.prototype.constraints).toEqual(expect.arrayContaining(expectedConstraints)); expect(demandSpecification.prototype.properties).toEqual(expectedProperties); }); @@ -171,7 +185,6 @@ describe("Market module", () => { it("should emit a new demand every specified interval", (done) => { const mockSpecification = mock(DemandSpecification); - when(mockSpecification.expirationSec).thenReturn(10); const mockSpecificationInstance = instance(mockSpecification); const mockDemand0 = new Demand("demand-id-0", mockSpecificationInstance); const mockDemand1 = new Demand("demand-id-1", mockSpecificationInstance); @@ -188,7 +201,7 @@ describe("Market module", () => { demand$.pipe(take(3)).subscribe({ next: (demand) => { demands.push(demand); - jest.advanceTimersByTime(10 * 1000); + jest.advanceTimersByTime(DEMAND_REFRESH_INTERVAL_SEC * 1000); }, complete: () => { try { @@ -265,7 +278,6 @@ describe("Market module", () => { constraints: [], }, "erc20-holesky-tglm", - 60 * 60, ); const providerInfo: ProviderInfo = { @@ -354,7 +366,6 @@ describe("Market module", () => { constraints: [], }, "erc20-holesky-tglm", - 60 * 60, ); const providerInfo: ProviderInfo = { diff --git a/src/market/market.module.ts b/src/market/market.module.ts index e50ff3da8..2afd94051 100644 --- a/src/market/market.module.ts +++ b/src/market/market.module.ts @@ -26,7 +26,7 @@ import { OfferProposalFilter, ProposalsBatch, } from "./proposal"; -import { BuildDemandOptions, DemandBodyBuilder, DemandSpecification } from "./demand"; +import { OrderDemandOptions, DemandBodyBuilder, DemandSpecification } from "./demand"; import { IActivityApi, IFileServer } from "../activity"; import { StorageProvider } from "../shared/storage"; import { WorkloadDemandDirectorConfig } from "./demand/directors/workload-demand-director-config"; @@ -55,7 +55,7 @@ export type PricingOptions = avgGlmPerHour: number; }; -export interface MarketOptions { +export interface OrderMarketOptions { /** How long you want to rent the resources in hours */ rentHours: number; @@ -69,6 +69,16 @@ export interface MarketOptions { offerProposalSelector?: OfferProposalSelector; } +export interface MarketModuleOptions { + /** + * Number of seconds after which the demand will be un-subscribed and subscribed again to get fresh + * offers from the market + * + * @default 30 minutes + */ + demandRefreshIntervalSec: number; +} + export interface MarketModule { events: EventEmitter; @@ -78,7 +88,11 @@ export interface MarketModule { * The method returns a DemandSpecification that can be used to publish the demand to the market, * for example using the `publishDemand` method. */ - buildDemandDetails(options: BuildDemandOptions, allocation: Allocation): Promise; + buildDemandDetails( + demandOptions: OrderDemandOptions, + orderOptions: OrderMarketOptions, + allocation: Allocation, + ): Promise; /** * Publishes the demand to the market and handles refreshing it when needed. @@ -209,6 +223,7 @@ export class MarketModuleImpl implements MarketModule { private readonly logger = defaultLogger("market"); private readonly marketApi: IMarketApi; private fileServer: IFileServer; + private options: MarketModuleOptions; constructor( private readonly deps: { @@ -222,39 +237,51 @@ export class MarketModuleImpl implements MarketModule { fileServer: IFileServer; storageProvider: StorageProvider; }, + options?: Partial, ) { this.logger = deps.logger; this.marketApi = deps.marketApi; this.fileServer = deps.fileServer; + this.options = { + ...{ demandRefreshIntervalSec: 30 * 60 }, + ...options, + }; + this.collectAndEmitAgreementEvents(); } - async buildDemandDetails(options: BuildDemandOptions, allocation: Allocation): Promise { + async buildDemandDetails( + demandOptions: OrderDemandOptions, + orderOptions: OrderMarketOptions, + allocation: Allocation, + ): Promise { const builder = new DemandBodyBuilder(); // Instruct the builder what's required const basicConfig = new BasicDemandDirectorConfig({ - expirationSec: options.expirationSec, - subnetTag: options.subnetTag, + subnetTag: demandOptions.subnetTag, }); const basicDirector = new BasicDemandDirector(basicConfig); basicDirector.apply(builder); - const workloadOptions = options.workload - ? await this.applyLocalGVMIServeSupport(options.workload) - : options.workload; + const workloadOptions = demandOptions.workload + ? await this.applyLocalGVMIServeSupport(demandOptions.workload) + : demandOptions.workload; - const workloadConfig = new WorkloadDemandDirectorConfig(workloadOptions); + const workloadConfig = new WorkloadDemandDirectorConfig({ + ...workloadOptions, + expirationSec: orderOptions.rentHours * 60 * 60, // hours to seconds + }); const workloadDirector = new WorkloadDemandDirector(workloadConfig); await workloadDirector.apply(builder); - const paymentConfig = new PaymentDemandDirectorConfig(options.payment); + const paymentConfig = new PaymentDemandDirectorConfig(demandOptions.payment); const paymentDirector = new PaymentDemandDirector(allocation, this.deps.marketApi, paymentConfig); await paymentDirector.apply(builder); - return new DemandSpecification(builder.getProduct(), allocation.paymentPlatform, basicConfig.expirationSec); + return new DemandSpecification(builder.getProduct(), allocation.paymentPlatform); } /** @@ -340,7 +367,7 @@ export class MarketModuleImpl implements MarketModule { this.logger.error("Error while re-publishing demand for offers", err); subscriber.error(err); }); - }, demandSpecification.expirationSec * 1000); + }, this.options.demandRefreshIntervalSec * 1000); return () => { clearInterval(interval); @@ -640,7 +667,7 @@ export class MarketModuleImpl implements MarketModule { } if (!isPriceValid) { this.events.emit("offerProposalRejectedByPriceFilter", proposal); - this.logger.debug("The offer was rejected because the price was too high", { + this.logger.debug("The offer was ignored because the price was too high", { id: proposal.id, pricing: proposal.pricing, }); diff --git a/src/resource-rental/rental.module.ts b/src/resource-rental/rental.module.ts index 0562005e0..a8fac6a00 100644 --- a/src/resource-rental/rental.module.ts +++ b/src/resource-rental/rental.module.ts @@ -20,7 +20,7 @@ export interface RentalModule { createResourceRentalPool( draftPool: DraftOfferProposalPool, allocation: Allocation, - options?: ResourceRentalPoolOptions, + options: ResourceRentalPoolOptions, ): ResourceRentalPool; } diff --git a/src/resource-rental/resource-rental-pool.ts b/src/resource-rental/resource-rental-pool.ts index 4a26efc20..d6379380a 100644 --- a/src/resource-rental/resource-rental-pool.ts +++ b/src/resource-rental/resource-rental-pool.ts @@ -9,6 +9,7 @@ import type { ResourceRental, ResourceRentalOptions } from "./resource-rental"; import { Network, NetworkModule } from "../network"; import { RentalModule } from "./rental.module"; import { AgreementOptions } from "../market/agreement/agreement"; +import { GolemAbortError } from "../shared/error/golem-error"; export interface ResourceRentalPoolDependencies { allocation: Allocation; @@ -29,13 +30,21 @@ export interface ResourceRentalPoolOptions { } export interface ResourceRentalPoolEvents { + /** Triggered when the pool has the minimal number of rentals prepared for operations */ ready: () => void; + /** Triggered when the pool is emptied from all rentals */ end: () => void; + acquired: (agreement: Agreement) => void; released: (agreement: Agreement) => void; created: (agreement: Agreement) => void; destroyed: (agreement: Agreement) => void; + + /** Fired when the pool will encounter an error */ error: (error: GolemMarketError) => void; + + /** Triggered when the pool enters the "draining" state */ + draining: () => void; } const MAX_POOL_SIZE = 100; @@ -197,17 +206,21 @@ export class ResourceRentalPool { */ async acquire(signalOrTimeout?: number | AbortSignal): Promise { if (this.isDraining) { - throw new Error("The pool is in draining mode"); + throw new GolemAbortError("The pool is in draining mode, you cannot acquire new resources"); } + let resourceRental = await this.takeValidResourceRental(); + if (!resourceRental) { if (!this.canCreateMoreResourceRentals()) { return this.enqueueAcquire(); } resourceRental = await this.createNewResourceRental(signalOrTimeout); } + this.borrowed.add(resourceRental); this.events.emit("acquired", resourceRental.agreement); + return resourceRental; } @@ -267,6 +280,7 @@ export class ResourceRentalPool { private async startDrain() { try { this.abortController.abort("The pool is in draining mode"); + this.events.emit("draining"); this.acquireQueue = []; const allResourceRentals = Array.from(this.borrowed) .concat(Array.from(this.lowPriority)) diff --git a/src/shared/yagna/adapters/market-api-adapter.test.ts b/src/shared/yagna/adapters/market-api-adapter.test.ts index 029de7317..55ab5a14d 100644 --- a/src/shared/yagna/adapters/market-api-adapter.test.ts +++ b/src/shared/yagna/adapters/market-api-adapter.test.ts @@ -75,7 +75,7 @@ describe("Market API Adapter", () => { describe("publishDemandSpecification()", () => { it("should publish a demand", async () => { - const specification = new DemandSpecification(samplePrototype, "my-selected-payment-platform", 60 * 60 * 1000); + const specification = new DemandSpecification(samplePrototype, "my-selected-payment-platform"); when(mockMarket.subscribeDemand(deepEqual(expectedBody))).thenResolve("demand-id"); @@ -88,7 +88,7 @@ describe("Market API Adapter", () => { }); it("should throw an error if the demand is not published", async () => { - const specification = new DemandSpecification(samplePrototype, "my-selected-payment-platform", 60 * 60 * 1000); + const specification = new DemandSpecification(samplePrototype, "my-selected-payment-platform"); when(mockMarket.subscribeDemand(deepEqual(expectedBody))).thenResolve({ message: "error publishing demand", @@ -102,10 +102,7 @@ describe("Market API Adapter", () => { describe("unpublishDemand()", () => { it("should unpublish a demand", async () => { - const demand = new Demand( - "demand-id", - new DemandSpecification(samplePrototype, "my-selected-payment-platform", 60 * 60 * 1000), - ); + const demand = new Demand("demand-id", new DemandSpecification(samplePrototype, "my-selected-payment-platform")); when(mockMarket.unsubscribeDemand("demand-id")).thenResolve({}); @@ -115,10 +112,7 @@ describe("Market API Adapter", () => { }); it("should throw an error if the demand is not unpublished", async () => { - const demand = new Demand( - "demand-id", - new DemandSpecification(samplePrototype, "my-selected-payment-platform", 60 * 60 * 1000), - ); + const demand = new Demand("demand-id", new DemandSpecification(samplePrototype, "my-selected-payment-platform")); when(mockMarket.unsubscribeDemand("demand-id")).thenResolve({ message: "error unpublishing demand", @@ -132,7 +126,7 @@ describe("Market API Adapter", () => { describe("counterProposal()", () => { it("should negotiate a proposal with the selected payment platform", async () => { - const specification = new DemandSpecification(samplePrototype, "my-selected-payment-platform", 60 * 60 * 1000); + const specification = new DemandSpecification(samplePrototype, "my-selected-payment-platform"); const receivedProposal = new OfferProposal( { @@ -172,7 +166,7 @@ describe("Market API Adapter", () => { ).once(); }); it("should throw an error if the counter proposal fails", async () => { - const specification = new DemandSpecification(samplePrototype, "my-selected-payment-platform", 60 * 60 * 1000); + const specification = new DemandSpecification(samplePrototype, "my-selected-payment-platform"); const receivedProposal = new OfferProposal( { ...expectedBody, diff --git a/src/shared/yagna/adapters/market-api-adapter.ts b/src/shared/yagna/adapters/market-api-adapter.ts index 4ae6559fd..f109a65cb 100644 --- a/src/shared/yagna/adapters/market-api-adapter.ts +++ b/src/shared/yagna/adapters/market-api-adapter.ts @@ -215,9 +215,11 @@ export class MarketApiAdapter implements IMarketApi { try { // FIXME #yagna, If we don't provide the app-session ID when confirming the agreement, we won't be able to collect invoices with that app-session-id // it's hard to know when the appSessionId is mandatory and when it isn't + this.logger.debug("Confirming agreement by Requestor", { agreementId: agreement.id }); await this.yagnaApi.market.confirmAgreement(agreement.id, this.yagnaApi.appSessionId); + this.logger.debug("Waiting for agreement approval by Provider", { agreementId: agreement.id }); await this.yagnaApi.market.waitForApproval(agreement.id, options?.waitingForApprovalTimeoutSec || 60); - this.logger.debug(`Agreement approved`, { id: agreement.id }); + this.logger.debug(`Agreement approved by Provider`, { agreementId: agreement.id }); // Get fresh copy return this.agreementRepo.getById(agreement.id); @@ -232,6 +234,7 @@ export class MarketApiAdapter implements IMarketApi { async createAgreement(proposal: OfferProposal, options?: AgreementOptions): Promise { const expirationSec = options?.expirationSec || 3600; + try { const agreementProposalRequest = { proposalId: proposal.id, @@ -247,13 +250,15 @@ export class MarketApiAdapter implements IMarketApi { ); } + const agreement = await this.agreementRepo.getById(agreementId); + this.logger.debug(`Agreement created`, { - agreementId: agreementId, + agreement, proposalId: proposal.id, demandId: proposal.demand.id, }); - return this.agreementRepo.getById(agreementId); + return agreement; } catch (error) { const message = getMessageFromApiError(error); throw new GolemMarketError( diff --git a/tests/e2e/resourceRentalPool.spec.ts b/tests/e2e/resourceRentalPool.spec.ts index 6242e457c..6a30acfa3 100644 --- a/tests/e2e/resourceRentalPool.spec.ts +++ b/tests/e2e/resourceRentalPool.spec.ts @@ -29,6 +29,13 @@ describe("ResourceRentalPool", () => { imageTag: "golem/alpine:latest", }, }, + { + rentHours: 1, + pricing: { + model: "burn-rate", + avgGlmPerHour: 1, + }, + }, allocation, );