Skip to content

Commit

Permalink
feat: persistent operations
Browse files Browse the repository at this point in the history
  • Loading branch information
patroza committed Jul 1, 2024
1 parent 5094d04 commit d7a4881
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 58 deletions.
5 changes: 5 additions & 0 deletions .changeset/eleven-chefs-shop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect-app/infra": minor
---

Add persistent Operations
10 changes: 10 additions & 0 deletions packages/infra/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,16 @@
"default": "./_cjs/services/Operations.cjs"
}
},
"./services/OperationsRepo": {
"import": {
"types": "./dist/services/OperationsRepo.d.ts",
"default": "./dist/services/OperationsRepo.js"
},
"require": {
"types": "./dist/services/OperationsRepo.d.ts",
"default": "./_cjs/services/OperationsRepo.cjs"
}
},
"./services/QueueMaker/errors": {
"import": {
"types": "./dist/services/QueueMaker/errors.d.ts",
Expand Down
103 changes: 45 additions & 58 deletions packages/infra/src/services/Operations.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { annotateLogscoped, flatMap } from "@effect-app/core/Effect"
import { dual } from "@effect-app/core/Function"
import { dual, pipe } from "@effect-app/core/Function"
import type { RequestFiberSet } from "@effect-app/infra-adapters/RequestFiberSet"
import { reportError } from "@effect-app/infra/errorReporter"
import { NonEmptyString2k } from "@effect-app/schema"
Expand All @@ -10,76 +10,63 @@ import type { OperationProgress } from "effect-app/Operations"
import { Failure, Operation, OperationId, Success } from "effect-app/Operations"
import { MainFiberSet } from "effect-app/services/MainFiberSet"
import * as Scope from "effect/Scope"
import { batch } from "src/rateLimit.js"
import { forkDaemonReportRequestUnexpected } from "../api/reportError.js"
import { OperationsRepo } from "./OperationsRepo.js"
import { where } from "./query.js"

const reportAppError = reportError("Operations.Cleanup")

const make = Effect.sync(() => {
const ops = new Map<OperationId, Operation>()
const make = Effect.gen(function*() {
const repo = yield* OperationsRepo
const makeOp = Effect.sync(() => OperationId.make())

const cleanup = Effect
.sync(() => {
const before = subHours(new Date(), 1)
;[...ops
.entries()]
.forEach(([id, op]) => {
const lastChanged = Option.fromNullable(op.updatedAt).pipe(Option.getOrElse(() => op.createdAt))
if (lastChanged < before) {
ops.delete(id)
}
})
})
.pipe(Effect.withSpan("Operations.cleanup"))
const cleanup = Effect.sync(() => subHours(new Date(), 1)).pipe(
Effect.andThen((before) => repo.query(where("updatedAt", "lt", before.toISOString()))),
Effect.andThen((ops) => pipe(ops, batch(100, Effect.succeed, (items) => repo.removeAndPublish(items)))),
Effect.withSpan("Operations.cleanup")
)

function addOp(id: OperationId, title: NonEmptyString2k) {
return Effect.sync(() => {
ops.set(id, new Operation({ id, title }))
})
return repo.save(new Operation({ id, title })).pipe(Effect.orDie)
}
function findOp(id: OperationId) {
return Effect.sync(() => Option.fromNullable(ops.get(id)))
return repo.find(id)
}
function finishOp(id: OperationId, exit: Exit<unknown, unknown>) {
return Effect.flatMap(findOp(id), (_) =>
Effect.sync(() => {
if (Option.isNone(_)) {
throw new Error("Not found")
}
ops.set(
id,
copy(_.value, {
updatedAt: new Date(),
result: Exit.isSuccess(exit)
? new Success()
: new Failure({
message: Cause.isInterrupted(exit.cause)
? NonEmptyString2k("Interrupted")
: Cause.isDie(exit.cause)
? NonEmptyString2k("Unknown error")
: Cause
.failureOption(exit.cause)
.pipe(
Option.flatMap((_) =>
typeof _ === "object" && _ !== null && "message" in _ && S.is(NonEmptyString2k)(_.message)
? Option.some(_.message)
: Option.none()
),
Option.getOrNull
)
})
})
)
}))
return Effect
.flatMap(repo.get(id).pipe(Effect.orDie), (_) =>
repo
.save(
copy(_, {
updatedAt: new Date(),
result: Exit.isSuccess(exit)
? new Success()
: new Failure({
message: Cause.isInterrupted(exit.cause)
? NonEmptyString2k("Interrupted")
: Cause.isDie(exit.cause)
? NonEmptyString2k("Unknown error")
: Cause
.failureOption(exit.cause)
.pipe(
Option.flatMap((_) =>
typeof _ === "object" && _ !== null && "message" in _ && S.is(NonEmptyString2k)(_.message)
? Option.some(_.message)
: Option.none()
),
Option.getOrNull
)
})
})
)
.pipe(Effect.orDie))
}
function update(id: OperationId, progress: OperationProgress) {
return Effect.flatMap(findOp(id), (_) =>
Effect.sync(() => {
if (Option.isNone(_)) {
throw new Error("Not found")
}
ops.set(id, copy(_.value, { updatedAt: new Date(), progress }))
}))
return Effect.flatMap(
repo.get(id).pipe(Effect.orDie),
(_) => repo.save(copy(_, { updatedAt: new Date(), progress })).pipe(Effect.orDie)
)
}
return {
cleanup,
Expand All @@ -93,7 +80,7 @@ const make = Effect.sync(() => {
)
),

all: Effect.sync(() => [...ops.values()]),
all: repo.all,
find: findOp,
update
}
Expand Down
7 changes: 7 additions & 0 deletions packages/infra/src/services/OperationsRepo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { Operation } from "effect-app/Operations"
import { RepositoryDefaultImpl } from "./RepositoryBase.js"

export class OperationsRepo extends RepositoryDefaultImpl<OperationsRepo>()(
"Operation",
Operation
) {}

0 comments on commit d7a4881

Please sign in to comment.