Skip to content

Commit

Permalink
add Redis & TimeToLive to Persistence module (#2383)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Mar 22, 2024
1 parent 37ca592 commit 317b5b8
Show file tree
Hide file tree
Showing 16 changed files with 554 additions and 69 deletions.
18 changes: 18 additions & 0 deletions .changeset/few-garlics-attend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"@effect/experimental": patch
---

add TimeToLive module to @effect/experimental

A trait for attaching expiry information to objects.

```ts
import * as TimeToLive from "@effect/experimental";
import { Duration, Exit } from "effect";

class User {
[TimeToLive.symbol](exit: Exit.Exit<unknown, unknown>) {
return Exit.isSuccess(exit) ? Duration.seconds(60) : Duration.zero;
}
}
```
5 changes: 5 additions & 0 deletions .changeset/few-zebras-cry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/experimental": patch
---

add Redis Persistence module
5 changes: 5 additions & 0 deletions .changeset/many-tools-live.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/experimental": minor
---

add TimeToLive support to Persistence module
5 changes: 5 additions & 0 deletions .changeset/young-spoons-remain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

add Duration.isFinite api, to determine if a duration is not Infinity
6 changes: 6 additions & 0 deletions packages/effect/src/Duration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ const make = (input: number | bigint): Duration => {
*/
export const isDuration = (u: unknown): u is Duration => hasProperty(u, TypeId)

/**
* @since 2.0.0
* @category guards
*/
export const isFinite = (self: Duration): boolean => self.value._tag !== "Infinity"

/**
* @since 2.0.0
* @category constructors
Expand Down
5 changes: 5 additions & 0 deletions packages/experimental/examples/redis/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
services:
redis:
image: redis:alpine
ports:
- "6379:6379"
46 changes: 46 additions & 0 deletions packages/experimental/examples/redis/resolver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as Redis from "@effect/experimental/Persistence/Redis"
import { persisted } from "@effect/experimental/RequestResolver"
import * as TimeToLive from "@effect/experimental/TimeToLive"
import { runMain } from "@effect/platform-node/NodeRuntime"
import { Schema } from "@effect/schema"
import { Effect, Exit, PrimaryKey, ReadonlyArray, RequestResolver } from "effect"

class User extends Schema.Class<User>("User")({
id: Schema.number,
name: Schema.string
}) {}

class GetUserById extends Schema.TaggedRequest<GetUserById>()("GetUserById", Schema.string, User, {
id: Schema.number
}) {
[PrimaryKey.symbol]() {
return `GetUserById:${this.id}`
}
[TimeToLive.symbol](exit: Exit.Exit<User, string>) {
return Exit.isSuccess(exit) ? 30000 : 0
}
}

Effect.gen(function*(_) {
const resolver = yield* _(
RequestResolver.fromEffectTagged<GetUserById>()({
GetUserById: (reqs) => {
console.log("uncached requests", reqs.length)
return Effect.forEach(reqs, (req) => Effect.succeed(new User({ id: req.id, name: "John" })))
}
}),
persisted("users")
)

const users = yield* _(
Effect.forEach(ReadonlyArray.range(1, 5), (id) => Effect.request(new GetUserById({ id }), resolver), {
batching: true
})
)

console.log(users)
}).pipe(
Effect.scoped,
Effect.provide(Redis.layerResult({})),
runMain
)
5 changes: 5 additions & 0 deletions packages/experimental/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,17 @@
"@effect/platform-node": "workspace:^",
"@effect/schema": "workspace:^",
"effect": "workspace:^",
"ioredis": "^5.3.2",
"lmdb": "^2.9.2",
"ws": "^8.14"
},
"peerDependenciesMeta": {
"@effect/platform-node": {
"optional": true
},
"ioredis": {
"optional": true
},
"lmdb": {
"optional": true
},
Expand All @@ -63,6 +67,7 @@
},
"devDependencies": {
"@types/ws": "^8.5.10",
"ioredis": "^5.3.2",
"lmdb": "^2.9.2",
"vitest-websocket-mock": "^0.3.0",
"ws": "^8.16.0"
Expand Down
134 changes: 97 additions & 37 deletions packages/experimental/src/Persistence.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,56 @@
/**
* @since 1.0.0
*/
import { RefailError, TypeIdError } from "@effect/platform/Error"
import * as KeyValueStore from "@effect/platform/KeyValueStore"
import type * as ParseResult from "@effect/schema/ParseResult"
import * as Serializable from "@effect/schema/Serializable"
import * as TreeFormatter from "@effect/schema/TreeFormatter"
import * as Context from "effect/Context"
import * as Data from "effect/Data"
import type * as Duration from "effect/Duration"
import * as Effect from "effect/Effect"
import type * as Exit from "effect/Exit"
import { identity } from "effect/Function"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as Predicate from "effect/Predicate"
import * as PrimaryKey from "effect/PrimaryKey"
import type * as Scope from "effect/Scope"
import * as TimeToLive from "./TimeToLive.js"

/**
* @since 1.0.0
* @category type ids
*/
export const ErrorTypeId = Symbol.for("@effect/experimental/PersistenceError")

/**
* @since 1.0.0
* @category type ids
*/
export type ErrorTypeId = typeof ErrorTypeId

/**
* @since 1.0.0
* @category errors
*/
export type PersistenceError = PersistenceSchemaError | PersistenceBackingError
export type PersistenceError = PersistenceParseError | PersistenceBackingError

/**
* @since 1.0.0
* @category errors
*/
export class PersistenceSchemaError extends Data.TaggedError("PersistenceSchemaError")<{
export class PersistenceParseError extends TypeIdError(ErrorTypeId, "PersistenceError")<{
readonly reason: "ParseError"
readonly method: string
readonly error: ParseResult.ParseError["error"]
}> {
/**
* @since 1.0.0
*/
static make(method: string, error: ParseResult.ParseError["error"]) {
return new PersistenceParseError({ reason: "ParseError", method, error })
}

get message() {
return TreeFormatter.formatIssue(this.error)
}
Expand All @@ -39,13 +60,19 @@ export class PersistenceSchemaError extends Data.TaggedError("PersistenceSchemaE
* @since 1.0.0
* @category errors
*/
export class PersistenceBackingError extends Data.TaggedError("PersistenceBackingError")<{
export class PersistenceBackingError extends RefailError(ErrorTypeId, "PersistenceError")<{
readonly reason: "BackingError"
readonly method: string
readonly error: unknown
}> {
/**
* @since 1.0.0
*/
static make(method: string, error: unknown) {
return new PersistenceBackingError({ reason: "BackingError", method, error })
}

get message() {
const errorString = String(Predicate.hasProperty(this.error, "message") ? this.error.message : this.error)
return `${this.method}: ${errorString}`
return `${this.method}: ${super.message}`
}
}

Expand Down Expand Up @@ -77,7 +104,11 @@ export interface BackingPersistence {
export interface BackingPersistenceStore {
readonly get: (key: string) => Effect.Effect<Option.Option<unknown>, PersistenceError>
readonly getMany: (key: Array<string>) => Effect.Effect<Array<Option.Option<unknown>>, PersistenceError>
readonly set: (key: string, value: unknown) => Effect.Effect<void, PersistenceError>
readonly set: (
key: string,
value: unknown,
ttl: Option.Option<Duration.Duration>
) => Effect.Effect<void, PersistenceError>
readonly remove: (key: string) => Effect.Effect<void, PersistenceError>
readonly clear: Effect.Effect<void, PersistenceError>
}
Expand Down Expand Up @@ -183,7 +214,7 @@ export const layerResult = Layer.effect(
) =>
Effect.mapError(
Serializable.deserializeExit(key, value),
(_) => new PersistenceSchemaError({ method, error: _.error })
(_) => PersistenceParseError.make(method, _.error)
)
const encode = <R, IE, E, IA, A>(
method: string,
Expand All @@ -192,7 +223,7 @@ export const layerResult = Layer.effect(
) =>
Effect.mapError(
Serializable.serializeExit(key, value),
(_) => new PersistenceSchemaError({ method, error: _.error })
(_) => PersistenceParseError.make(method, _.error)
)
const makeKey = <R, IE, E, IA, A>(
key: ResultPersistence.Key<R, IE, E, IA, A>
Expand Down Expand Up @@ -224,7 +255,7 @@ export const layerResult = Layer.effect(
),
set: (key, value) =>
encode("set", key, value).pipe(
Effect.flatMap((_) => storage.set(makeKey(key), _))
Effect.flatMap((_) => storage.set(makeKey(key), _, TimeToLive.getFinite(key, value)))
),
remove: (key) => storage.remove(makeKey(key)),
clear: storage.clear
Expand All @@ -238,22 +269,43 @@ export const layerResult = Layer.effect(
* @since 1.0.0
* @category layers
*/
export const layerMemory: Layer.Layer<BackingPersistence> = Layer.succeed(
export const layerMemory: Layer.Layer<BackingPersistence> = Layer.sync(
BackingPersistence,
BackingPersistence.of({
[BackingPersistenceTypeId]: BackingPersistenceTypeId,
make: (_storeId) =>
Effect.sync(() => {
const map = new Map<string, unknown>()
return identity<BackingPersistenceStore>({
get: (key) => Effect.sync(() => Option.fromNullable(map.get(key))),
getMany: (keys) => Effect.sync(() => keys.map((key) => Option.fromNullable(map.get(key)))),
set: (key, value) => Effect.sync(() => map.set(key, value)),
remove: (key) => Effect.sync(() => map.delete(key)),
clear: Effect.sync(() => map.clear())
() => {
const stores = new Map<string, Map<string, readonly [unknown, expires: number | null]>>()
const getStore = (storeId: string) => {
let store = stores.get(storeId)
if (store === undefined) {
store = new Map<string, readonly [unknown, expires: number | null]>()
stores.set(storeId, store)
}
return store
}
return BackingPersistence.of({
[BackingPersistenceTypeId]: BackingPersistenceTypeId,
make: (storeId) =>
Effect.map(Effect.clock, (clock) => {
const map = getStore(storeId)
const unsafeGet = (key: string): Option.Option<unknown> => {
const value = map.get(key)
if (value === undefined) {
return Option.none()
} else if (value[1] !== null && value[1] <= clock.unsafeCurrentTimeMillis()) {
map.delete(key)
return Option.none()
}
return Option.some(value[0])
}
return identity<BackingPersistenceStore>({
get: (key) => Effect.sync(() => unsafeGet(key)),
getMany: (keys) => Effect.sync(() => keys.map(unsafeGet)),
set: (key, value, ttl) => Effect.sync(() => map.set(key, [value, TimeToLive.unsafeToExpires(clock, ttl)])),
remove: (key) => Effect.sync(() => map.delete(key)),
clear: Effect.sync(() => map.clear())
})
})
})
})
})
}
)

/**
Expand All @@ -267,46 +319,54 @@ export const layerKeyValueStore: Layer.Layer<BackingPersistence, never, KeyValue
return BackingPersistence.of({
[BackingPersistenceTypeId]: BackingPersistenceTypeId,
make: (storeId) =>
Effect.sync(() => {
Effect.map(Effect.clock, (clock) => {
const store = KeyValueStore.prefix(backing, storeId)
const get = (method: string, key: string) =>
Effect.flatMap(
Effect.mapError(
store.get(key),
(error) => new PersistenceBackingError({ method, error })
(error) => PersistenceBackingError.make(method, error)
),
Option.match({
onNone: () => Effect.succeedNone,
onSome: (s) =>
Effect.asSome(
Effect.flatMap(
Effect.try({
try: () => JSON.parse(s),
catch: (error) => new PersistenceBackingError({ method, error })
})
catch: (error) => PersistenceBackingError.make(method, error)
}),
(_) => {
if (!Array.isArray(_)) return Effect.succeedNone
const [value, expires] = _ as [unknown, number | null]
if (expires !== null && expires <= clock.unsafeCurrentTimeMillis()) {
return Effect.as(Effect.ignore(store.remove(key)), Option.none())
}
return Effect.succeed(Option.some(value))
}
)
})
)
return identity<BackingPersistenceStore>({
get: (key) => get("get", key),
getMany: (keys) => Effect.forEach(keys, (key) => get("getMany", key)),
set: (key, value) =>
set: (key, value, ttl) =>
Effect.flatMap(
Effect.try({
try: () => JSON.stringify(value),
catch: (error) => new PersistenceBackingError({ method: "set", error })
try: () => JSON.stringify([value, TimeToLive.unsafeToExpires(clock, ttl)]),
catch: (error) => PersistenceBackingError.make("set", error)
}),
(u) =>
Effect.mapError(
store.set(key, u),
(error) => new PersistenceBackingError({ method: "set", error })
(error) => PersistenceBackingError.make("set", error)
)
),
remove: (key) =>
Effect.mapError(
store.remove(key),
(error) => new PersistenceBackingError({ method: "remove", error })
(error) => PersistenceBackingError.make("remove", error)
),
clear: Effect.mapError(store.clear, (error) => new PersistenceBackingError({ method: "clear", error }))
clear: Effect.mapError(store.clear, (error) => PersistenceBackingError.make("clear", error))
})
})
})
Expand Down
Loading

0 comments on commit 317b5b8

Please sign in to comment.