Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update cluster examples for latest /platform #3032

Merged
merged 2 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions packages/cluster-node/examples/sample-connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,11 @@ import * as StorageFile from "@effect/cluster-node/StorageFile"
import * as Serialization from "@effect/cluster/Serialization"
import * as Sharding from "@effect/cluster/Sharding"
import * as ShardingConfig from "@effect/cluster/ShardingConfig"
import * as NodeClient from "@effect/platform-node/NodeHttpClient"
import { runMain } from "@effect/platform-node/NodeRuntime"
import * as HttpClient from "@effect/platform/HttpClient"
import { HttpClient, HttpClientRequest } from "@effect/platform"
import { NodeHttpClient, NodeRuntime } from "@effect/platform-node"
import { Resolver } from "@effect/rpc"
import { HttpResolver } from "@effect/rpc-http"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Logger from "effect/Logger"
import * as LogLevel from "effect/LogLevel"
import * as Ref from "effect/Ref"
import { Effect, Layer, Logger, LogLevel, Ref } from "effect"
import { CounterEntity, GetCurrent, Increment } from "./sample-common.js"

const liveLayer = Effect.gen(function*(_) {
Expand All @@ -37,30 +32,30 @@ const liveLayer = Effect.gen(function*(_) {
Layer.provide(StorageFile.storageFile),
Layer.provide(PodsRpc.podsRpc<never>((podAddress) =>
HttpResolver.make<ShardingServiceRpc.ShardingServiceRpc>(
HttpClient.client.fetchOk.pipe(
HttpClient.client.mapRequest(
HttpClient.request.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`)
HttpClient.fetchOk.pipe(
HttpClient.mapRequest(
HttpClientRequest.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`)
)
)
).pipe(Resolver.toClient)
)),
Layer.provide(ShardManagerClientRpc.shardManagerClientRpc(
(shardManagerUri) =>
HttpResolver.make<ShardingServiceRpc.ShardingServiceRpc>(
HttpClient.client.fetchOk.pipe(
HttpClient.client.mapRequest(
HttpClient.request.prependUrl(shardManagerUri)
HttpClient.fetchOk.pipe(
HttpClient.mapRequest(
HttpClientRequest.prependUrl(shardManagerUri)
)
)
).pipe(Resolver.toClient)
)),
Layer.provide(ShardingConfig.withDefaults({ shardingPort: 54322 })),
Layer.provide(Serialization.json),
Layer.provide(NodeClient.layer)
Layer.provide(NodeHttpClient.layerUndici)
)

Layer.launch(liveLayer).pipe(
Logger.withMinimumLogLevel(LogLevel.All),
Effect.tapErrorCause(Effect.logError),
runMain
NodeRuntime.runMain
)
38 changes: 15 additions & 23 deletions packages/cluster-node/examples/sample-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,58 +5,50 @@ import * as StorageFile from "@effect/cluster-node/StorageFile"
import * as ManagerConfig from "@effect/cluster/ManagerConfig"
import * as PodsHealth from "@effect/cluster/PodsHealth"
import * as ShardManager from "@effect/cluster/ShardManager"
import { NodeHttpServer } from "@effect/platform-node"
import { runMain } from "@effect/platform-node/NodeRuntime"
import * as HttpClient from "@effect/platform/HttpClient"
import * as HttpServer from "@effect/platform/HttpServer"
import { HttpClient, HttpClientRequest, HttpMiddleware, HttpRouter, HttpServer } from "@effect/platform"
import { NodeHttpServer, NodeRuntime } from "@effect/platform-node"
import { Resolver } from "@effect/rpc"
import { HttpResolver, HttpRouter } from "@effect/rpc-http"
import { Context } from "effect"
import * as Effect from "effect/Effect"
import { pipe } from "effect/Function"
import * as Layer from "effect/Layer"
import * as Logger from "effect/Logger"
import * as LogLevel from "effect/LogLevel"
import { HttpResolver, HttpRouter as RpcHttpRouter } from "@effect/rpc-http"
import { Context, Effect, Layer, Logger, LogLevel } from "effect"
import { createServer } from "node:http"

const HttpLive = Layer.flatMap(
Layer.effect(ManagerConfig.ManagerConfig, ManagerConfig.ManagerConfig),
(config) =>
HttpServer.router.empty.pipe(
HttpServer.router.post("/api/rest", HttpRouter.toHttpApp(ShardManagerServiceRpc.router)),
HttpServer.server.serve(HttpServer.middleware.logger),
HttpServer.server.withLogAddress,
HttpRouter.empty.pipe(
HttpRouter.post("/api/rest", RpcHttpRouter.toHttpApp(ShardManagerServiceRpc.router)),
HttpServer.serve(HttpMiddleware.logger),
HttpServer.withLogAddress,
Layer.provide(
NodeHttpServer.server.layer(createServer, {
NodeHttpServer.layer(createServer, {
port: Context.get(config, ManagerConfig.ManagerConfig).apiPort
})
),
Layer.discard
)
)

const liveShardingManager = pipe(
Effect.never,
const liveShardingManager = Effect.never.pipe(
Layer.scopedDiscard,
Layer.provide(HttpLive),
Layer.provide(ShardManager.live),
Layer.provide(StorageFile.storageFile),
Layer.provide(PodsHealth.local),
Layer.provide(PodsRpc.podsRpc<never>((podAddress) =>
HttpResolver.make<ShardingServiceRpc.ShardingServiceRpc>(
HttpClient.client.fetchOk.pipe(
HttpClient.client.mapRequest(
HttpClient.request.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`)
HttpClient.fetchOk.pipe(
HttpClient.mapRequest(
HttpClientRequest.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`)
)
)
).pipe(Resolver.toClient)
)),
Layer.provide(ManagerConfig.fromConfig),
Layer.provide(HttpClient.client.layer)
Layer.provide(HttpClient.layer)
)

Layer.launch(liveShardingManager).pipe(
Logger.withMinimumLogLevel(LogLevel.All),
Effect.tapErrorCause(Effect.logError),
runMain
NodeRuntime.runMain
)
53 changes: 20 additions & 33 deletions packages/cluster-node/examples/sample-shard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,23 @@ import * as RecipientBehaviour from "@effect/cluster/RecipientBehaviour"
import * as Serialization from "@effect/cluster/Serialization"
import * as Sharding from "@effect/cluster/Sharding"
import * as ShardingConfig from "@effect/cluster/ShardingConfig"
import { NodeHttpServer } from "@effect/platform-node"
import * as NodeClient from "@effect/platform-node/NodeHttpClient"
import { runMain } from "@effect/platform-node/NodeRuntime"
import * as HttpClient from "@effect/platform/HttpClient"
import * as HttpServer from "@effect/platform/HttpServer"
import { HttpClient, HttpClientRequest, HttpMiddleware, HttpRouter, HttpServer } from "@effect/platform"
import { NodeHttpClient, NodeHttpServer, NodeRuntime } from "@effect/platform-node"
import { Resolver } from "@effect/rpc"
import { HttpResolver, HttpRouter } from "@effect/rpc-http"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Exit from "effect/Exit"
import { pipe } from "effect/Function"
import * as Layer from "effect/Layer"
import * as Logger from "effect/Logger"
import * as LogLevel from "effect/LogLevel"
import * as Ref from "effect/Ref"
import { HttpResolver, HttpRouter as RpcHttpRouter } from "@effect/rpc-http"
import { Context, Effect, Exit, Layer, Logger, LogLevel, Ref } from "effect"
import { createServer } from "node:http"
import { CounterEntity } from "./sample-common.js"

const HttpLive = Layer.flatMap(
Layer.effect(ShardingConfig.ShardingConfig, ShardingConfig.ShardingConfig),
(config) =>
HttpServer.router.empty.pipe(
HttpServer.router.post("/api/rest", HttpRouter.toHttpApp(ShardingServiceRpc.router)),
HttpServer.server.serve(HttpServer.middleware.logger),
HttpServer.server.withLogAddress,
HttpRouter.empty.pipe(
HttpRouter.post("/api/rest", RpcHttpRouter.toHttpApp(ShardingServiceRpc.router)),
HttpServer.serve(HttpMiddleware.logger),
HttpServer.withLogAddress,
Layer.provide(
NodeHttpServer.server.layer(createServer, {
NodeHttpServer.layer(createServer, {
port: Context.get(config, ShardingConfig.ShardingConfig).shardingPort
})
),
Expand All @@ -49,20 +39,17 @@ const liveLayer = Sharding.registerEntity(
(entityId, message, stateRef) => {
switch (message._tag) {
case "Increment":
return pipe(
Ref.update(stateRef, (count) => count + 1),
return Ref.update(stateRef, (count) => count + 1).pipe(
Effect.zipLeft(Effect.logInfo(`Counter ${entityId} incremented`)),
Effect.as(MessageState.Processed(Exit.void))
)
case "Decrement":
return pipe(
Ref.update(stateRef, (count) => count - 1),
return Ref.update(stateRef, (count) => count - 1).pipe(
Effect.zipLeft(Effect.logInfo(`Counter ${entityId} decremented`)),
Effect.as(MessageState.Processed(Exit.void))
)
case "GetCurrent":
return pipe(
Ref.get(stateRef),
return Ref.get(stateRef).pipe(
Effect.exit,
Effect.map((result) => MessageState.Processed(result))
)
Expand All @@ -77,30 +64,30 @@ const liveLayer = Sharding.registerEntity(
Layer.provide(StorageFile.storageFile),
Layer.provide(PodsRpc.podsRpc<never>((podAddress) =>
HttpResolver.make<ShardingServiceRpc.ShardingServiceRpc>(
HttpClient.client.fetchOk.pipe(
HttpClient.client.mapRequest(
HttpClient.request.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`)
HttpClient.fetchOk.pipe(
HttpClient.mapRequest(
HttpClientRequest.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`)
)
)
).pipe(Resolver.toClient)
)),
Layer.provide(ShardManagerClientRpc.shardManagerClientRpc(
(shardManagerUri) =>
HttpResolver.make<ShardingServiceRpc.ShardingServiceRpc>(
HttpClient.client.fetchOk.pipe(
HttpClient.client.mapRequest(
HttpClient.request.prependUrl(shardManagerUri)
HttpClient.fetchOk.pipe(
HttpClient.mapRequest(
HttpClientRequest.prependUrl(shardManagerUri)
)
)
).pipe(Resolver.toClient)
)),
Layer.provide(Serialization.json),
Layer.provide(NodeClient.layer),
Layer.provide(NodeHttpClient.layerUndici),
Layer.provide(ShardingConfig.fromConfig)
)

Layer.launch(liveLayer).pipe(
Logger.withMinimumLogLevel(LogLevel.Debug),
Effect.tapErrorCause(Effect.logError),
runMain
NodeRuntime.runMain
)
10 changes: 8 additions & 2 deletions packages/sql-drizzle/test/Mysql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ describe.sequential("Mysql", () => {
yield* db.insert(users).values({ name: "Alice", snakeCase: "alice" })
const results = yield* db.select().from(users)
assert.deepStrictEqual(results, [{ id: 1, name: "Alice", snakeCase: "alice" }])
}).pipe(Effect.provide(MysqlContainer.DrizzleLive)), { timeout: 60000 })
}).pipe(
Effect.provide(MysqlContainer.DrizzleLive),
Effect.catchTag("ContainerError", () => Effect.void)
), { timeout: 60000 })

it.effect("remote callback", () =>
Effect.gen(function*(_) {
Expand All @@ -31,5 +34,8 @@ describe.sequential("Mysql", () => {
yield* Effect.promise(() => db.insert(users).values({ name: "Alice", snakeCase: "snake" }))
const results = yield* Effect.promise(() => db.select().from(users))
assert.deepStrictEqual(results, [{ id: 1, name: "Alice", snakeCase: "snake" }])
}).pipe(Effect.provide(MysqlContainer.DrizzleLive)), { timeout: 60000 })
}).pipe(
Effect.provide(MysqlContainer.DrizzleLive),
Effect.catchTag("ContainerError", () => Effect.void)
), { timeout: 60000 })
})
14 changes: 12 additions & 2 deletions packages/sql-drizzle/test/Pg.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ describe.sequential("Pg", () => {
yield* db.insert(users).values({ name: "Alice", snakeCase: "alice" })
const results = yield* db.select().from(users)
assert.deepStrictEqual(results, [{ id: 1, name: "Alice", snakeCase: "alice" }])
}).pipe(Effect.provide(PgContainer.DrizzleLive)), { timeout: 60000 })
}).pipe(
Effect.provide(PgContainer.DrizzleLive),
Effect.catchTag("ContainerError", () => Effect.void)
), {
timeout: 60000
})

it.effect("remote callback", () =>
Effect.gen(function*(_) {
Expand All @@ -31,5 +36,10 @@ describe.sequential("Pg", () => {
yield* Effect.promise(() => db.insert(users).values({ name: "Alice", snakeCase: "snake" }))
const results = yield* Effect.promise(() => db.select().from(users))
assert.deepStrictEqual(results, [{ id: 1, name: "Alice", snakeCase: "snake" }])
}).pipe(Effect.provide(PgContainer.DrizzleLive)), { timeout: 60000 })
}).pipe(
Effect.provide(PgContainer.DrizzleLive),
Effect.catchTag("ContainerError", () => Effect.void)
), {
timeout: 60000
})
})
20 changes: 15 additions & 5 deletions packages/sql-drizzle/test/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import * as Pg from "@effect/sql-pg"
import type { StartedMySqlContainer } from "@testcontainers/mysql"
import { MySqlContainer } from "@testcontainers/mysql"
import { PostgreSqlContainer, type StartedPostgreSqlContainer } from "@testcontainers/postgresql"
import { Config, Context, Effect, Layer, Secret } from "effect"
import { Config, Context, Data, Effect, Layer, Redacted } from "effect"

export class ContainerError extends Data.TaggedError("ContainerError")<{
cause: unknown
}> {}

export class PgContainer extends Context.Tag("test/PgContainer")<
PgContainer,
Expand All @@ -14,7 +18,10 @@ export class PgContainer extends Context.Tag("test/PgContainer")<
static Live = Layer.scoped(
this,
Effect.acquireRelease(
Effect.promise(() => new PostgreSqlContainer("postgres:alpine").start()),
Effect.tryPromise({
try: () => new PostgreSqlContainer("postgres:alpine").start(),
catch: (cause) => new ContainerError({ cause })
}),
(container) => Effect.promise(() => container.stop())
)
)
Expand All @@ -23,7 +30,7 @@ export class PgContainer extends Context.Tag("test/PgContainer")<
Effect.gen(function*(_) {
const container = yield* _(PgContainer)
return Pg.client.layer({
url: Config.succeed(Secret.fromString(container.getConnectionUri()))
url: Config.succeed(Redacted.make(container.getConnectionUri()))
})
})
).pipe(Layer.provide(this.Live))
Expand All @@ -38,7 +45,10 @@ export class MysqlContainer extends Context.Tag("test/MysqlContainer")<
static Live = Layer.scoped(
this,
Effect.acquireRelease(
Effect.promise(() => new MySqlContainer("mysql:lts").start()),
Effect.tryPromise({
try: () => new MySqlContainer("mysql:lts").start(),
catch: (cause) => new ContainerError({ cause })
}),
(container) => Effect.promise(() => container.stop())
)
)
Expand All @@ -47,7 +57,7 @@ export class MysqlContainer extends Context.Tag("test/MysqlContainer")<
Effect.gen(function*(_) {
const container = yield* _(MysqlContainer)
return Mysql.client.layer({
url: Config.succeed(Secret.fromString(container.getConnectionUri()))
url: Config.succeed(Redacted.make(container.getConnectionUri()))
})
})
).pipe(Layer.provide(this.Live))
Expand Down