From 317b5b8e8c8c2207469b3ebfcf72bf3a9f7cbc60 Mon Sep 17 00:00:00 2001 From: Tim Date: Fri, 22 Mar 2024 13:16:20 +1300 Subject: [PATCH] add Redis & TimeToLive to Persistence module (#2383) --- .changeset/few-garlics-attend.md | 18 +++ .changeset/few-zebras-cry.md | 5 + .changeset/many-tools-live.md | 5 + .changeset/young-spoons-remain.md | 5 + packages/effect/src/Duration.ts | 6 + .../examples/redis/docker-compose.yaml | 5 + .../experimental/examples/redis/resolver.ts | 46 ++++++ packages/experimental/package.json | 5 + packages/experimental/src/Persistence.ts | 134 +++++++++++++----- packages/experimental/src/Persistence/Lmdb.ts | 37 +++-- .../experimental/src/Persistence/Redis.ts | 129 +++++++++++++++++ packages/experimental/src/TimeToLive.ts | 54 +++++++ packages/experimental/src/index.ts | 5 + .../experimental/test/RequestResolver.test.ts | 108 +++++++++++--- packages/experimental/tsconfig.test.json | 3 +- pnpm-lock.yaml | 58 ++++++++ 16 files changed, 554 insertions(+), 69 deletions(-) create mode 100644 .changeset/few-garlics-attend.md create mode 100644 .changeset/few-zebras-cry.md create mode 100644 .changeset/many-tools-live.md create mode 100644 .changeset/young-spoons-remain.md create mode 100644 packages/experimental/examples/redis/docker-compose.yaml create mode 100644 packages/experimental/examples/redis/resolver.ts create mode 100644 packages/experimental/src/Persistence/Redis.ts create mode 100644 packages/experimental/src/TimeToLive.ts diff --git a/.changeset/few-garlics-attend.md b/.changeset/few-garlics-attend.md new file mode 100644 index 0000000000..16d464b40b --- /dev/null +++ b/.changeset/few-garlics-attend.md @@ -0,0 +1,18 @@ +--- +"@effect/experimental": patch +--- + +add TimeToLive module to @effect/experimental + +A trait for attaching expiry information to objects. + +```ts +import * as TimeToLive from "@effect/experimental"; +import { Duration, Exit } from "effect"; + +class User { + [TimeToLive.symbol](exit: Exit.Exit) { + return Exit.isSuccess(exit) ? Duration.seconds(60) : Duration.zero; + } +} +``` diff --git a/.changeset/few-zebras-cry.md b/.changeset/few-zebras-cry.md new file mode 100644 index 0000000000..f5679a3236 --- /dev/null +++ b/.changeset/few-zebras-cry.md @@ -0,0 +1,5 @@ +--- +"@effect/experimental": patch +--- + +add Redis Persistence module diff --git a/.changeset/many-tools-live.md b/.changeset/many-tools-live.md new file mode 100644 index 0000000000..2abc7769ed --- /dev/null +++ b/.changeset/many-tools-live.md @@ -0,0 +1,5 @@ +--- +"@effect/experimental": minor +--- + +add TimeToLive support to Persistence module diff --git a/.changeset/young-spoons-remain.md b/.changeset/young-spoons-remain.md new file mode 100644 index 0000000000..db171d1a2b --- /dev/null +++ b/.changeset/young-spoons-remain.md @@ -0,0 +1,5 @@ +--- +"effect": patch +--- + +add Duration.isFinite api, to determine if a duration is not Infinity diff --git a/packages/effect/src/Duration.ts b/packages/effect/src/Duration.ts index aba2237fec..19ccba2178 100644 --- a/packages/effect/src/Duration.ts +++ b/packages/effect/src/Duration.ts @@ -185,6 +185,12 @@ const make = (input: number | bigint): Duration => { */ export const isDuration = (u: unknown): u is Duration => hasProperty(u, TypeId) +/** + * @since 2.0.0 + * @category guards + */ +export const isFinite = (self: Duration): boolean => self.value._tag !== "Infinity" + /** * @since 2.0.0 * @category constructors diff --git a/packages/experimental/examples/redis/docker-compose.yaml b/packages/experimental/examples/redis/docker-compose.yaml new file mode 100644 index 0000000000..14e7625e49 --- /dev/null +++ b/packages/experimental/examples/redis/docker-compose.yaml @@ -0,0 +1,5 @@ +services: + redis: + image: redis:alpine + ports: + - "6379:6379" diff --git a/packages/experimental/examples/redis/resolver.ts b/packages/experimental/examples/redis/resolver.ts new file mode 100644 index 0000000000..09183c84bd --- /dev/null +++ b/packages/experimental/examples/redis/resolver.ts @@ -0,0 +1,46 @@ +import * as Redis from "@effect/experimental/Persistence/Redis" +import { persisted } from "@effect/experimental/RequestResolver" +import * as TimeToLive from "@effect/experimental/TimeToLive" +import { runMain } from "@effect/platform-node/NodeRuntime" +import { Schema } from "@effect/schema" +import { Effect, Exit, PrimaryKey, ReadonlyArray, RequestResolver } from "effect" + +class User extends Schema.Class("User")({ + id: Schema.number, + name: Schema.string +}) {} + +class GetUserById extends Schema.TaggedRequest()("GetUserById", Schema.string, User, { + id: Schema.number +}) { + [PrimaryKey.symbol]() { + return `GetUserById:${this.id}` + } + [TimeToLive.symbol](exit: Exit.Exit) { + return Exit.isSuccess(exit) ? 30000 : 0 + } +} + +Effect.gen(function*(_) { + const resolver = yield* _( + RequestResolver.fromEffectTagged()({ + GetUserById: (reqs) => { + console.log("uncached requests", reqs.length) + return Effect.forEach(reqs, (req) => Effect.succeed(new User({ id: req.id, name: "John" }))) + } + }), + persisted("users") + ) + + const users = yield* _( + Effect.forEach(ReadonlyArray.range(1, 5), (id) => Effect.request(new GetUserById({ id }), resolver), { + batching: true + }) + ) + + console.log(users) +}).pipe( + Effect.scoped, + Effect.provide(Redis.layerResult({})), + runMain +) diff --git a/packages/experimental/package.json b/packages/experimental/package.json index 012cc85ba7..b67cb430b6 100644 --- a/packages/experimental/package.json +++ b/packages/experimental/package.json @@ -47,6 +47,7 @@ "@effect/platform-node": "workspace:^", "@effect/schema": "workspace:^", "effect": "workspace:^", + "ioredis": "^5.3.2", "lmdb": "^2.9.2", "ws": "^8.14" }, @@ -54,6 +55,9 @@ "@effect/platform-node": { "optional": true }, + "ioredis": { + "optional": true + }, "lmdb": { "optional": true }, @@ -63,6 +67,7 @@ }, "devDependencies": { "@types/ws": "^8.5.10", + "ioredis": "^5.3.2", "lmdb": "^2.9.2", "vitest-websocket-mock": "^0.3.0", "ws": "^8.16.0" diff --git a/packages/experimental/src/Persistence.ts b/packages/experimental/src/Persistence.ts index fc9942e745..9f9f8ca62e 100644 --- a/packages/experimental/src/Persistence.ts +++ b/packages/experimental/src/Persistence.ts @@ -1,35 +1,56 @@ /** * @since 1.0.0 */ +import { RefailError, TypeIdError } from "@effect/platform/Error" import * as KeyValueStore from "@effect/platform/KeyValueStore" import type * as ParseResult from "@effect/schema/ParseResult" import * as Serializable from "@effect/schema/Serializable" import * as TreeFormatter from "@effect/schema/TreeFormatter" import * as Context from "effect/Context" -import * as Data from "effect/Data" +import type * as Duration from "effect/Duration" import * as Effect from "effect/Effect" import type * as Exit from "effect/Exit" import { identity } from "effect/Function" import * as Layer from "effect/Layer" import * as Option from "effect/Option" -import * as Predicate from "effect/Predicate" import * as PrimaryKey from "effect/PrimaryKey" import type * as Scope from "effect/Scope" +import * as TimeToLive from "./TimeToLive.js" + +/** + * @since 1.0.0 + * @category type ids + */ +export const ErrorTypeId = Symbol.for("@effect/experimental/PersistenceError") + +/** + * @since 1.0.0 + * @category type ids + */ +export type ErrorTypeId = typeof ErrorTypeId /** * @since 1.0.0 * @category errors */ -export type PersistenceError = PersistenceSchemaError | PersistenceBackingError +export type PersistenceError = PersistenceParseError | PersistenceBackingError /** * @since 1.0.0 * @category errors */ -export class PersistenceSchemaError extends Data.TaggedError("PersistenceSchemaError")<{ +export class PersistenceParseError extends TypeIdError(ErrorTypeId, "PersistenceError")<{ + readonly reason: "ParseError" readonly method: string readonly error: ParseResult.ParseError["error"] }> { + /** + * @since 1.0.0 + */ + static make(method: string, error: ParseResult.ParseError["error"]) { + return new PersistenceParseError({ reason: "ParseError", method, error }) + } + get message() { return TreeFormatter.formatIssue(this.error) } @@ -39,13 +60,19 @@ export class PersistenceSchemaError extends Data.TaggedError("PersistenceSchemaE * @since 1.0.0 * @category errors */ -export class PersistenceBackingError extends Data.TaggedError("PersistenceBackingError")<{ +export class PersistenceBackingError extends RefailError(ErrorTypeId, "PersistenceError")<{ + readonly reason: "BackingError" readonly method: string - readonly error: unknown }> { + /** + * @since 1.0.0 + */ + static make(method: string, error: unknown) { + return new PersistenceBackingError({ reason: "BackingError", method, error }) + } + get message() { - const errorString = String(Predicate.hasProperty(this.error, "message") ? this.error.message : this.error) - return `${this.method}: ${errorString}` + return `${this.method}: ${super.message}` } } @@ -77,7 +104,11 @@ export interface BackingPersistence { export interface BackingPersistenceStore { readonly get: (key: string) => Effect.Effect, PersistenceError> readonly getMany: (key: Array) => Effect.Effect>, PersistenceError> - readonly set: (key: string, value: unknown) => Effect.Effect + readonly set: ( + key: string, + value: unknown, + ttl: Option.Option + ) => Effect.Effect readonly remove: (key: string) => Effect.Effect readonly clear: Effect.Effect } @@ -183,7 +214,7 @@ export const layerResult = Layer.effect( ) => Effect.mapError( Serializable.deserializeExit(key, value), - (_) => new PersistenceSchemaError({ method, error: _.error }) + (_) => PersistenceParseError.make(method, _.error) ) const encode = ( method: string, @@ -192,7 +223,7 @@ export const layerResult = Layer.effect( ) => Effect.mapError( Serializable.serializeExit(key, value), - (_) => new PersistenceSchemaError({ method, error: _.error }) + (_) => PersistenceParseError.make(method, _.error) ) const makeKey = ( key: ResultPersistence.Key @@ -224,7 +255,7 @@ export const layerResult = Layer.effect( ), set: (key, value) => encode("set", key, value).pipe( - Effect.flatMap((_) => storage.set(makeKey(key), _)) + Effect.flatMap((_) => storage.set(makeKey(key), _, TimeToLive.getFinite(key, value))) ), remove: (key) => storage.remove(makeKey(key)), clear: storage.clear @@ -238,22 +269,43 @@ export const layerResult = Layer.effect( * @since 1.0.0 * @category layers */ -export const layerMemory: Layer.Layer = Layer.succeed( +export const layerMemory: Layer.Layer = Layer.sync( BackingPersistence, - BackingPersistence.of({ - [BackingPersistenceTypeId]: BackingPersistenceTypeId, - make: (_storeId) => - Effect.sync(() => { - const map = new Map() - return identity({ - get: (key) => Effect.sync(() => Option.fromNullable(map.get(key))), - getMany: (keys) => Effect.sync(() => keys.map((key) => Option.fromNullable(map.get(key)))), - set: (key, value) => Effect.sync(() => map.set(key, value)), - remove: (key) => Effect.sync(() => map.delete(key)), - clear: Effect.sync(() => map.clear()) + () => { + const stores = new Map>() + const getStore = (storeId: string) => { + let store = stores.get(storeId) + if (store === undefined) { + store = new Map() + stores.set(storeId, store) + } + return store + } + return BackingPersistence.of({ + [BackingPersistenceTypeId]: BackingPersistenceTypeId, + make: (storeId) => + Effect.map(Effect.clock, (clock) => { + const map = getStore(storeId) + const unsafeGet = (key: string): Option.Option => { + const value = map.get(key) + if (value === undefined) { + return Option.none() + } else if (value[1] !== null && value[1] <= clock.unsafeCurrentTimeMillis()) { + map.delete(key) + return Option.none() + } + return Option.some(value[0]) + } + return identity({ + get: (key) => Effect.sync(() => unsafeGet(key)), + getMany: (keys) => Effect.sync(() => keys.map(unsafeGet)), + set: (key, value, ttl) => Effect.sync(() => map.set(key, [value, TimeToLive.unsafeToExpires(clock, ttl)])), + remove: (key) => Effect.sync(() => map.delete(key)), + clear: Effect.sync(() => map.clear()) + }) }) - }) - }) + }) + } ) /** @@ -267,46 +319,54 @@ export const layerKeyValueStore: Layer.Layer - Effect.sync(() => { + Effect.map(Effect.clock, (clock) => { const store = KeyValueStore.prefix(backing, storeId) const get = (method: string, key: string) => Effect.flatMap( Effect.mapError( store.get(key), - (error) => new PersistenceBackingError({ method, error }) + (error) => PersistenceBackingError.make(method, error) ), Option.match({ onNone: () => Effect.succeedNone, onSome: (s) => - Effect.asSome( + Effect.flatMap( Effect.try({ try: () => JSON.parse(s), - catch: (error) => new PersistenceBackingError({ method, error }) - }) + catch: (error) => PersistenceBackingError.make(method, error) + }), + (_) => { + if (!Array.isArray(_)) return Effect.succeedNone + const [value, expires] = _ as [unknown, number | null] + if (expires !== null && expires <= clock.unsafeCurrentTimeMillis()) { + return Effect.as(Effect.ignore(store.remove(key)), Option.none()) + } + return Effect.succeed(Option.some(value)) + } ) }) ) return identity({ get: (key) => get("get", key), getMany: (keys) => Effect.forEach(keys, (key) => get("getMany", key)), - set: (key, value) => + set: (key, value, ttl) => Effect.flatMap( Effect.try({ - try: () => JSON.stringify(value), - catch: (error) => new PersistenceBackingError({ method: "set", error }) + try: () => JSON.stringify([value, TimeToLive.unsafeToExpires(clock, ttl)]), + catch: (error) => PersistenceBackingError.make("set", error) }), (u) => Effect.mapError( store.set(key, u), - (error) => new PersistenceBackingError({ method: "set", error }) + (error) => PersistenceBackingError.make("set", error) ) ), remove: (key) => Effect.mapError( store.remove(key), - (error) => new PersistenceBackingError({ method: "remove", error }) + (error) => PersistenceBackingError.make("remove", error) ), - clear: Effect.mapError(store.clear, (error) => new PersistenceBackingError({ method: "clear", error })) + clear: Effect.mapError(store.clear, (error) => PersistenceBackingError.make("clear", error)) }) }) }) diff --git a/packages/experimental/src/Persistence/Lmdb.ts b/packages/experimental/src/Persistence/Lmdb.ts index 1d8eb63ed8..f176164111 100644 --- a/packages/experimental/src/Persistence/Lmdb.ts +++ b/packages/experimental/src/Persistence/Lmdb.ts @@ -5,8 +5,10 @@ import * as Effect from "effect/Effect" import { identity } from "effect/Function" import * as Layer from "effect/Layer" import * as Option from "effect/Option" +import * as ReadonlyArray from "effect/ReadonlyArray" import * as Lmdb from "lmdb" import * as Persistence from "../Persistence.js" +import * as TimeToLive from "../TimeToLive.js" /** * @since 1.0.0 @@ -23,34 +25,47 @@ export const make = (options: Lmdb.RootDatabaseOptionsWithPath) => [Persistence.BackingPersistenceTypeId]: Persistence.BackingPersistenceTypeId, make: (storeId) => Effect.gen(function*(_) { + const clock = yield* _(Effect.clock) const store = yield* _(Effect.acquireRelease( Effect.sync(() => lmdb.openDB({ name: storeId })), (store) => Effect.promise(() => store.close()) )) + const valueToOption = (key: string, _: any) => { + if (!Array.isArray(_)) return Option.none() + const [value, expires] = _ as [unknown, number | null] + if (expires !== null && expires <= clock.unsafeCurrentTimeMillis()) { + store.remove(key) + return Option.none() + } + return Option.some(value) + } return identity({ get: (key) => Effect.try({ - try: () => Option.fromNullable(store.get(key)), - catch: (error) => new Persistence.PersistenceBackingError({ method: "get", error }) + try: () => valueToOption(key, store.get(key)), + catch: (error) => Persistence.PersistenceBackingError.make("get", error) }), getMany: (keys) => + Effect.map( + Effect.tryPromise({ + try: () => store.getMany(keys), + catch: (error) => Persistence.PersistenceBackingError.make("getMany", error) + }), + ReadonlyArray.map((value, i) => valueToOption(keys[i], value)) + ), + set: (key, value, ttl) => Effect.tryPromise({ - try: () => store.getMany(keys).then((_) => _.map(Option.fromNullable)), - catch: (error) => new Persistence.PersistenceBackingError({ method: "getMany", error }) - }), - set: (key, value) => - Effect.tryPromise({ - try: () => store.put(key, value), - catch: (error) => new Persistence.PersistenceBackingError({ method: "set", error }) + try: () => store.put(key, [value, TimeToLive.unsafeToExpires(clock, ttl)]), + catch: (error) => Persistence.PersistenceBackingError.make("set", error) }), remove: (key) => Effect.tryPromise({ try: () => store.remove(key), - catch: (error) => new Persistence.PersistenceBackingError({ method: "remove", error }) + catch: (error) => Persistence.PersistenceBackingError.make("remove", error) }), clear: Effect.tryPromise({ try: () => store.clearAsync(), - catch: (error) => new Persistence.PersistenceBackingError({ method: "clear", error }) + catch: (error) => Persistence.PersistenceBackingError.make("clear", error) }) }) }) diff --git a/packages/experimental/src/Persistence/Redis.ts b/packages/experimental/src/Persistence/Redis.ts new file mode 100644 index 0000000000..958ad60b09 --- /dev/null +++ b/packages/experimental/src/Persistence/Redis.ts @@ -0,0 +1,129 @@ +/** + * @since 1.0.0 + */ +import * as Config from "effect/Config" +import type { ConfigError } from "effect/ConfigError" +import * as Duration from "effect/Duration" +import * as Effect from "effect/Effect" +import { identity } from "effect/Function" +import * as Layer from "effect/Layer" +import * as Option from "effect/Option" +import type { RedisOptions } from "ioredis" +import { Redis } from "ioredis" +import * as Persistence from "../Persistence.js" + +/** + * @since 1.0.0 + * @category constructors + */ +export const make = (options: RedisOptions) => + Effect.gen(function*(_) { + const redis = yield* _(Effect.acquireRelease( + Effect.sync(() => new Redis(options)), + (redis) => Effect.promise(() => redis.quit()) + )) + return Persistence.BackingPersistence.of({ + [Persistence.BackingPersistenceTypeId]: Persistence.BackingPersistenceTypeId, + make: (prefix) => + Effect.sync(() => { + const prefixed = (key: string) => `${prefix}:${key}` + const parse = (method: string) => (str: string | null) => { + console.log("parse", method, str) + if (str === null) { + return Effect.succeedNone + } + return Effect.try({ + try: () => Option.some(JSON.parse(str)), + catch: (error) => Persistence.PersistenceBackingError.make(method, error) + }) + } + return identity({ + get: (key) => + Effect.flatMap( + Effect.tryPromise({ + try: () => redis.get(prefixed(key)), + catch: (error) => Persistence.PersistenceBackingError.make("get", error) + }), + parse("get") + ), + getMany: (keys) => + Effect.flatMap( + Effect.tryPromise({ + try: () => redis.mget(keys.map(prefixed)), + catch: (error) => Persistence.PersistenceBackingError.make("getMany", error) + }), + Effect.forEach(parse("getMany")) + ), + set: (key, value, ttl) => + Effect.tryMapPromise( + Effect.try({ + try: () => JSON.stringify(value), + catch: (error) => Persistence.PersistenceBackingError.make("set", error) + }), + { + try: (value) => + ttl._tag === "None" + ? redis.set(prefixed(key), value) + : redis.set(prefixed(key), value, "PX", Duration.toMillis(ttl.value)), + catch: (error) => Persistence.PersistenceBackingError.make("set", error) + } + ), + remove: (key) => + Effect.tryPromise({ + try: () => redis.del(prefixed(key)), + catch: (error) => Persistence.PersistenceBackingError.make("remove", error) + }), + clear: Effect.tryPromise({ + try: () => redis.keys(`${prefix}:*`).then((keys) => redis.del(keys)), + catch: (error) => Persistence.PersistenceBackingError.make("clear", error) + }) + }) + }) + }) + }) + +/** + * @since 1.0.0 + * @category layers + */ +export const layer = ( + options: RedisOptions +): Layer.Layer => + Layer.scoped( + Persistence.BackingPersistence, + make(options) + ) + +/** + * @since 1.0.0 + * @category layers + */ +export const layerConfig = ( + options: Config.Config.Wrap +): Layer.Layer => + Layer.scoped( + Persistence.BackingPersistence, + Effect.flatMap(Config.unwrap(options), make) + ) + +/** + * @since 1.0.0 + * @category layers + */ +export const layerResult = ( + options: RedisOptions +): Layer.Layer => + Persistence.layerResult.pipe( + Layer.provide(layer(options)) + ) + +/** + * @since 1.0.0 + * @category layers + */ +export const layerResultConfig = ( + options: Config.Config.Wrap +): Layer.Layer => + Persistence.layerResult.pipe( + Layer.provide(layerConfig(options)) + ) diff --git a/packages/experimental/src/TimeToLive.ts b/packages/experimental/src/TimeToLive.ts new file mode 100644 index 0000000000..6a243ceb66 --- /dev/null +++ b/packages/experimental/src/TimeToLive.ts @@ -0,0 +1,54 @@ +/** + * @since 1.0.0 + */ +import type { Clock } from "effect/Clock" +import * as Duration from "effect/Duration" +import * as Exit from "effect/Exit" +import * as Option from "effect/Option" +import * as Predicate from "effect/Predicate" + +/** + * @since 1.0.0 + * @category symbols + */ +export const symbol = Symbol.for("@effect/experimental/TimeToLive") + +/** + * @since 1.0.0 + * @category refinements + */ +export const isTimeToLive = (u: unknown): u is TimeToLive => Predicate.hasProperty(u, symbol) + +/** + * @since 1.0.0 + * @category models + */ +export interface TimeToLive { + readonly [symbol]: (exit: Exit.Exit) => Duration.DurationInput +} + +/** + * @since 1.0.0 + * @category accessors + */ +export const get = (u: unknown, exit: Exit.Exit = Exit.unit as any): Duration.Duration => + isTimeToLive(u) ? Duration.decode(u[symbol](exit)) : Duration.infinity + +/** + * @since 1.0.0 + * @category accessors + */ +export const getFinite = ( + u: unknown, + exit: Exit.Exit = Exit.unit as any +): Option.Option => { + const value = get(u, exit) + return Duration.isFinite(value) ? Option.some(value) : Option.none() +} + +/** + * @since 1.0.0 + * @category accessors + */ +export const unsafeToExpires = (clock: Clock, ttl: Option.Option): number | null => + ttl._tag === "None" ? null : clock.unsafeCurrentTimeMillis() + Duration.toMillis(ttl.value) diff --git a/packages/experimental/src/index.ts b/packages/experimental/src/index.ts index 7cb8524cc8..86e751f41a 100644 --- a/packages/experimental/src/index.ts +++ b/packages/experimental/src/index.ts @@ -32,3 +32,8 @@ export * as RequestResolver from "./RequestResolver.js" * @since 1.0.0 */ export * as SocketServer from "./SocketServer.js" + +/** + * @since 1.0.0 + */ +export * as TimeToLive from "./TimeToLive.js" diff --git a/packages/experimental/test/RequestResolver.test.ts b/packages/experimental/test/RequestResolver.test.ts index d90af2a070..a8ca65ad96 100644 --- a/packages/experimental/test/RequestResolver.test.ts +++ b/packages/experimental/test/RequestResolver.test.ts @@ -1,35 +1,54 @@ import * as Persistence from "@effect/experimental/Persistence" import * as PersistenceLmdb from "@effect/experimental/Persistence/Lmdb" import * as RequestResolverX from "@effect/experimental/RequestResolver" +import * as TimeToLive from "@effect/experimental/TimeToLive" import { FileSystem, KeyValueStore } from "@effect/platform" import { NodeContext } from "@effect/platform-node" import { Schema } from "@effect/schema" -import { Effect, Layer, PrimaryKey, ReadonlyArray, RequestResolver } from "effect" -import { assert, describe, test } from "vitest" +import * as it from "@effect/vitest" +import { Effect, Exit, Layer, PrimaryKey, ReadonlyArray, Request, RequestResolver, TestClock } from "effect" +import { assert, describe } from "vitest" + +class User extends Schema.Class("User")({ + id: Schema.number, + name: Schema.string +}) {} + +class MyRequest extends Schema.TaggedRequest()("MyRequest", Schema.string, User, { + id: Schema.number +}) { + [PrimaryKey.symbol]() { + return `MyRequest:${this.id}` + } +} + +class TTLRequest extends Schema.TaggedRequest()("TTLRequest", Schema.string, User, { + id: Schema.number +}) { + [PrimaryKey.symbol]() { + return `TTLRequest:${this.id}` + } + [TimeToLive.symbol](exit: Exit.Exit) { + return Exit.isSuccess(exit) ? 5000 : 1 + } +} describe("RequestResolver", () => { describe("persisted", () => { - class User extends Schema.Class("User")({ - id: Schema.number, - name: Schema.string - }) {} - class MyRequest extends Schema.TaggedRequest()("MyRequest", Schema.string, User, { - id: Schema.number - }) { - [PrimaryKey.symbol]() { - return `MyRequest:${this.id}` - } - } - const testsuite = ( storeId: "memory" | "kvs" | "lmdb", layer: Layer.Layer ) => - test(storeId, () => + it.effect( + storeId, Effect.gen(function*(_) { - const baseResolver = RequestResolver.fromEffectTagged()({ - MyRequest: (reqs) => - Effect.succeed(ReadonlyArray.map(reqs, (req) => new User({ id: req.id, name: "John" }))) + let count = 0 + const baseResolver = RequestResolver.makeBatched((reqs: Array) => { + count += reqs.length + return Effect.forEach(reqs, (req) => { + if (req.id === -1) return Request.fail(req, "not found") + return Request.succeed(req, new User({ id: req.id, name: "John" })) + }, { discard: true }) }) const persisted = yield* _(RequestResolverX.persisted(baseResolver, storeId)) let users = yield* _( @@ -37,25 +56,74 @@ describe("RequestResolver", () => { batching: true }) ) + assert.strictEqual(count, 5) assert.strictEqual(users.length, 5) users = yield* _( Effect.forEach(ReadonlyArray.range(1, 5), (id) => Effect.request(new MyRequest({ id }), persisted), { batching: true }) ) + assert.strictEqual(count, 5) assert.strictEqual(users.length, 5) + // ttl + let results = yield* _( + Effect.forEach(ReadonlyArray.range(-1, 3), (id) => + Effect.exit(Effect.request(new TTLRequest({ id }), persisted)), { + batching: true + }) + ) + assert.strictEqual(count, 10) + assert.strictEqual(results.length, 5) + assert(Exit.isFailure(results[0])) + assert(Exit.isSuccess(results[1])) + + results = yield* _( + Effect.forEach(ReadonlyArray.range(-1, 3), (id) => + Effect.exit(Effect.request(new TTLRequest({ id }), persisted)), { + batching: true + }) + ) + assert.strictEqual(count, 10) + assert.strictEqual(results.length, 5) + + yield* _(TestClock.adjust(1)) + + results = yield* _( + Effect.forEach(ReadonlyArray.range(-1, 3), (id) => + Effect.exit(Effect.request(new TTLRequest({ id }), persisted)), { + batching: true + }) + ) + assert.strictEqual(count, 11) + assert.strictEqual(results.length, 5) + + yield* _(TestClock.adjust(5000)) + + results = yield* _( + Effect.forEach(ReadonlyArray.range(-1, 3), (id) => + Effect.exit(Effect.request(new TTLRequest({ id }), persisted)), { + batching: true + }) + ) + assert.strictEqual(count, 16) + assert.strictEqual(results.length, 5) + + // clear const persistence = yield* _(Persistence.ResultPersistence) const store = yield* _(persistence.make(storeId)) yield* _(store.clear) users = yield* _( - Effect.forEach(ReadonlyArray.range(1, 5), (id) => Effect.request(new MyRequest({ id }), persisted), { + Effect.forEach(ReadonlyArray.range(1, 5), (id) => + Effect.request(new MyRequest({ id }), persisted), { batching: true }) ) + assert.strictEqual(count, 21) assert.strictEqual(users.length, 5) - }).pipe(Effect.scoped, Effect.provide(layer), Effect.runPromise)) + }).pipe(Effect.scoped, Effect.provide(layer)) + ) testsuite("memory", Persistence.layerResultMemory) testsuite("kvs", Persistence.layerResultKeyValueStore.pipe(Layer.provide(KeyValueStore.layerMemory))) diff --git a/packages/experimental/tsconfig.test.json b/packages/experimental/tsconfig.test.json index f99579de52..a08c5c9767 100644 --- a/packages/experimental/tsconfig.test.json +++ b/packages/experimental/tsconfig.test.json @@ -5,7 +5,8 @@ { "path": "tsconfig.src.json" }, { "path": "../effect" }, { "path": "../schema" }, - { "path": "../platform-node" } + { "path": "../platform-node" }, + { "path": "../vitest" } ], "compilerOptions": { "tsBuildInfoFile": ".tsbuildinfo/test.tsbuildinfo", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2400b94739..a4382f51dd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -212,6 +212,9 @@ importers: '@types/ws': specifier: ^8.5.10 version: 8.5.10 + ioredis: + specifier: ^5.3.2 + version: 5.3.2 lmdb: specifier: ^2.9.2 version: 2.9.2 @@ -1557,6 +1560,10 @@ packages: resolution: {integrity: sha512-6EwiSjwWYP7pTckG6I5eyFANjPhmPjUX9JRLUSfNPC7FX7zK9gyZAfUEaECL6ALTpGX5AjnBq3C9XmVWPitNpw==} dev: true + /@ioredis/commands@1.2.0: + resolution: {integrity: sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==} + dev: true + /@isaacs/cliui@8.0.2: resolution: {integrity: sha512-O8jcjabXaleOG9DQ0+ARXWZBTfnP4WNAqzuiJK7ll44AmxGKv/J2M4TPjxjY3znBCfvBXFzucm1twdyFybFqEA==} engines: {node: '>=12'} @@ -3224,6 +3231,11 @@ packages: engines: {node: '>=0.8'} dev: true + /cluster-key-slot@1.1.2: + resolution: {integrity: sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==} + engines: {node: '>=0.10.0'} + dev: true + /color-convert@1.9.3: resolution: {integrity: sha512-QfAUtd+vFdAtFQcC8CCyYt1fYWxSqAiK2cSD6zDB8N3cpsEBAvRxp9zOGg6G/SHHJYAT88/az/IuDGALsNVbGg==} dependencies: @@ -3424,6 +3436,11 @@ packages: object-keys: 1.1.1 dev: true + /denque@2.1.0: + resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==} + engines: {node: '>=0.10'} + dev: true + /dependency-tree@10.0.9: resolution: {integrity: sha512-dwc59FRIsht+HfnTVM0BCjJaEWxdq2YAvEDy4/Hn6CwS3CBWMtFnL3aZGAkQn3XCYxk/YcTDE4jX2Q7bFTwCjA==} engines: {node: '>=14'} @@ -4679,6 +4696,23 @@ packages: fp-ts: 2.16.2 dev: true + /ioredis@5.3.2: + resolution: {integrity: sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==} + engines: {node: '>=12.22.0'} + dependencies: + '@ioredis/commands': 1.2.0 + cluster-key-slot: 1.1.2 + debug: 4.3.4 + denque: 2.1.0 + lodash.defaults: 4.2.0 + lodash.isarguments: 3.1.0 + redis-errors: 1.2.0 + redis-parser: 3.0.0 + standard-as-callback: 2.1.0 + transitivePeerDependencies: + - supports-color + dev: true + /is-alphabetical@1.0.4: resolution: {integrity: sha512-DwzsA04LQ10FHTZuL0/grVDk4rFoVH1pjAToYwBrHSxcrBIGQuXrQMtD5U1b0U2XVgKZCTLLP8u2Qxqhy3l2Vg==} dev: true @@ -5302,6 +5336,14 @@ packages: resolution: {integrity: sha512-xYHt68QRoYGjeeM/XOE1uJtvXQAgvszfBhjV4yvsQH0u2i9I6cI6c6/eG4Hh3UAOVn0y/xAXwmTzEay49Q//HA==} dev: true + /lodash.defaults@4.2.0: + resolution: {integrity: sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==} + dev: true + + /lodash.isarguments@3.1.0: + resolution: {integrity: sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==} + dev: true + /lodash.merge@4.6.2: resolution: {integrity: sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ==} dev: true @@ -6389,6 +6431,18 @@ packages: strip-indent: 3.0.0 dev: true + /redis-errors@1.2.0: + resolution: {integrity: sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==} + engines: {node: '>=4'} + dev: true + + /redis-parser@3.0.0: + resolution: {integrity: sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==} + engines: {node: '>=4'} + dependencies: + redis-errors: 1.2.0 + dev: true + /regenerator-runtime@0.14.1: resolution: {integrity: sha512-dYnhHh0nJoMfnkZs6GmmhFknAGRrLznOu5nc9ML+EJxGvrx6H7teuevqVqCuPcPK//3eDrrjQhehXVx9cnkGdw==} dev: true @@ -6789,6 +6843,10 @@ packages: resolution: {integrity: sha512-1XMJE5fQo1jGH6Y/7ebnwPOBEkIEnT4QF32d5R1+VXdXveM0IBMJt8zfaxX1P3QhVwrYe+576+jkANtSS2mBbw==} dev: true + /standard-as-callback@2.1.0: + resolution: {integrity: sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==} + dev: true + /std-env@3.7.0: resolution: {integrity: sha512-JPbdCEQLj1w5GilpiHAx3qJvFndqybBysA3qUOnznweH4QbNYUsW/ea8QzSrnh0vNsezMMw5bcVool8lM0gwzg==} dev: true