Skip to content

Commit

Permalink
update cluster examples for latest /platform (#3032)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Jun 20, 2024
1 parent 61707b6 commit 2f619e6
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 81 deletions.
27 changes: 11 additions & 16 deletions packages/cluster-node/examples/sample-connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,11 @@ import * as StorageFile from "@effect/cluster-node/StorageFile"
import * as Serialization from "@effect/cluster/Serialization"
import * as Sharding from "@effect/cluster/Sharding"
import * as ShardingConfig from "@effect/cluster/ShardingConfig"
import * as NodeClient from "@effect/platform-node/NodeHttpClient"
import { runMain } from "@effect/platform-node/NodeRuntime"
import * as HttpClient from "@effect/platform/HttpClient"
import { HttpClient, HttpClientRequest } from "@effect/platform"
import { NodeHttpClient, NodeRuntime } from "@effect/platform-node"
import { Resolver } from "@effect/rpc"
import { HttpResolver } from "@effect/rpc-http"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Logger from "effect/Logger"
import * as LogLevel from "effect/LogLevel"
import * as Ref from "effect/Ref"
import { Effect, Layer, Logger, LogLevel, Ref } from "effect"
import { CounterEntity, GetCurrent, Increment } from "./sample-common.js"

const liveLayer = Effect.gen(function*(_) {
Expand All @@ -37,30 +32,30 @@ const liveLayer = Effect.gen(function*(_) {
Layer.provide(StorageFile.storageFile),
Layer.provide(PodsRpc.podsRpc<never>((podAddress) =>
HttpResolver.make<ShardingServiceRpc.ShardingServiceRpc>(
HttpClient.client.fetchOk.pipe(
HttpClient.client.mapRequest(
HttpClient.request.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`)
HttpClient.fetchOk.pipe(
HttpClient.mapRequest(
HttpClientRequest.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`)
)
)
).pipe(Resolver.toClient)
)),
Layer.provide(ShardManagerClientRpc.shardManagerClientRpc(
(shardManagerUri) =>
HttpResolver.make<ShardingServiceRpc.ShardingServiceRpc>(
HttpClient.client.fetchOk.pipe(
HttpClient.client.mapRequest(
HttpClient.request.prependUrl(shardManagerUri)
HttpClient.fetchOk.pipe(
HttpClient.mapRequest(
HttpClientRequest.prependUrl(shardManagerUri)
)
)
).pipe(Resolver.toClient)
)),
Layer.provide(ShardingConfig.withDefaults({ shardingPort: 54322 })),
Layer.provide(Serialization.json),
Layer.provide(NodeClient.layer)
Layer.provide(NodeHttpClient.layerUndici)
)

Layer.launch(liveLayer).pipe(
Logger.withMinimumLogLevel(LogLevel.All),
Effect.tapErrorCause(Effect.logError),
runMain
NodeRuntime.runMain
)
38 changes: 15 additions & 23 deletions packages/cluster-node/examples/sample-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,58 +5,50 @@ import * as StorageFile from "@effect/cluster-node/StorageFile"
import * as ManagerConfig from "@effect/cluster/ManagerConfig"
import * as PodsHealth from "@effect/cluster/PodsHealth"
import * as ShardManager from "@effect/cluster/ShardManager"
import { NodeHttpServer } from "@effect/platform-node"
import { runMain } from "@effect/platform-node/NodeRuntime"
import * as HttpClient from "@effect/platform/HttpClient"
import * as HttpServer from "@effect/platform/HttpServer"
import { HttpClient, HttpClientRequest, HttpMiddleware, HttpRouter, HttpServer } from "@effect/platform"
import { NodeHttpServer, NodeRuntime } from "@effect/platform-node"
import { Resolver } from "@effect/rpc"
import { HttpResolver, HttpRouter } from "@effect/rpc-http"
import { Context } from "effect"
import * as Effect from "effect/Effect"
import { pipe } from "effect/Function"
import * as Layer from "effect/Layer"
import * as Logger from "effect/Logger"
import * as LogLevel from "effect/LogLevel"
import { HttpResolver, HttpRouter as RpcHttpRouter } from "@effect/rpc-http"
import { Context, Effect, Layer, Logger, LogLevel } from "effect"
import { createServer } from "node:http"

const HttpLive = Layer.flatMap(
Layer.effect(ManagerConfig.ManagerConfig, ManagerConfig.ManagerConfig),
(config) =>
HttpServer.router.empty.pipe(
HttpServer.router.post("/api/rest", HttpRouter.toHttpApp(ShardManagerServiceRpc.router)),
HttpServer.server.serve(HttpServer.middleware.logger),
HttpServer.server.withLogAddress,
HttpRouter.empty.pipe(
HttpRouter.post("/api/rest", RpcHttpRouter.toHttpApp(ShardManagerServiceRpc.router)),
HttpServer.serve(HttpMiddleware.logger),
HttpServer.withLogAddress,
Layer.provide(
NodeHttpServer.server.layer(createServer, {
NodeHttpServer.layer(createServer, {
port: Context.get(config, ManagerConfig.ManagerConfig).apiPort
})
),
Layer.discard
)
)

const liveShardingManager = pipe(
Effect.never,
const liveShardingManager = Effect.never.pipe(
Layer.scopedDiscard,
Layer.provide(HttpLive),
Layer.provide(ShardManager.live),
Layer.provide(StorageFile.storageFile),
Layer.provide(PodsHealth.local),
Layer.provide(PodsRpc.podsRpc<never>((podAddress) =>
HttpResolver.make<ShardingServiceRpc.ShardingServiceRpc>(
HttpClient.client.fetchOk.pipe(
HttpClient.client.mapRequest(
HttpClient.request.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`)
HttpClient.fetchOk.pipe(
HttpClient.mapRequest(
HttpClientRequest.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`)
)
)
).pipe(Resolver.toClient)
)),
Layer.provide(ManagerConfig.fromConfig),
Layer.provide(HttpClient.client.layer)
Layer.provide(HttpClient.layer)
)

Layer.launch(liveShardingManager).pipe(
Logger.withMinimumLogLevel(LogLevel.All),
Effect.tapErrorCause(Effect.logError),
runMain
NodeRuntime.runMain
)
53 changes: 20 additions & 33 deletions packages/cluster-node/examples/sample-shard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,23 @@ import * as RecipientBehaviour from "@effect/cluster/RecipientBehaviour"
import * as Serialization from "@effect/cluster/Serialization"
import * as Sharding from "@effect/cluster/Sharding"
import * as ShardingConfig from "@effect/cluster/ShardingConfig"
import { NodeHttpServer } from "@effect/platform-node"
import * as NodeClient from "@effect/platform-node/NodeHttpClient"
import { runMain } from "@effect/platform-node/NodeRuntime"
import * as HttpClient from "@effect/platform/HttpClient"
import * as HttpServer from "@effect/platform/HttpServer"
import { HttpClient, HttpClientRequest, HttpMiddleware, HttpRouter, HttpServer } from "@effect/platform"
import { NodeHttpClient, NodeHttpServer, NodeRuntime } from "@effect/platform-node"
import { Resolver } from "@effect/rpc"
import { HttpResolver, HttpRouter } from "@effect/rpc-http"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Exit from "effect/Exit"
import { pipe } from "effect/Function"
import * as Layer from "effect/Layer"
import * as Logger from "effect/Logger"
import * as LogLevel from "effect/LogLevel"
import * as Ref from "effect/Ref"
import { HttpResolver, HttpRouter as RpcHttpRouter } from "@effect/rpc-http"
import { Context, Effect, Exit, Layer, Logger, LogLevel, Ref } from "effect"
import { createServer } from "node:http"
import { CounterEntity } from "./sample-common.js"

const HttpLive = Layer.flatMap(
Layer.effect(ShardingConfig.ShardingConfig, ShardingConfig.ShardingConfig),
(config) =>
HttpServer.router.empty.pipe(
HttpServer.router.post("/api/rest", HttpRouter.toHttpApp(ShardingServiceRpc.router)),
HttpServer.server.serve(HttpServer.middleware.logger),
HttpServer.server.withLogAddress,
HttpRouter.empty.pipe(
HttpRouter.post("/api/rest", RpcHttpRouter.toHttpApp(ShardingServiceRpc.router)),
HttpServer.serve(HttpMiddleware.logger),
HttpServer.withLogAddress,
Layer.provide(
NodeHttpServer.server.layer(createServer, {
NodeHttpServer.layer(createServer, {
port: Context.get(config, ShardingConfig.ShardingConfig).shardingPort
})
),
Expand All @@ -49,20 +39,17 @@ const liveLayer = Sharding.registerEntity(
(entityId, message, stateRef) => {
switch (message._tag) {
case "Increment":
return pipe(
Ref.update(stateRef, (count) => count + 1),
return Ref.update(stateRef, (count) => count + 1).pipe(
Effect.zipLeft(Effect.logInfo(`Counter ${entityId} incremented`)),
Effect.as(MessageState.Processed(Exit.void))
)
case "Decrement":
return pipe(
Ref.update(stateRef, (count) => count - 1),
return Ref.update(stateRef, (count) => count - 1).pipe(
Effect.zipLeft(Effect.logInfo(`Counter ${entityId} decremented`)),
Effect.as(MessageState.Processed(Exit.void))
)
case "GetCurrent":
return pipe(
Ref.get(stateRef),
return Ref.get(stateRef).pipe(
Effect.exit,
Effect.map((result) => MessageState.Processed(result))
)
Expand All @@ -77,30 +64,30 @@ const liveLayer = Sharding.registerEntity(
Layer.provide(StorageFile.storageFile),
Layer.provide(PodsRpc.podsRpc<never>((podAddress) =>
HttpResolver.make<ShardingServiceRpc.ShardingServiceRpc>(
HttpClient.client.fetchOk.pipe(
HttpClient.client.mapRequest(
HttpClient.request.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`)
HttpClient.fetchOk.pipe(
HttpClient.mapRequest(
HttpClientRequest.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`)
)
)
).pipe(Resolver.toClient)
)),
Layer.provide(ShardManagerClientRpc.shardManagerClientRpc(
(shardManagerUri) =>
HttpResolver.make<ShardingServiceRpc.ShardingServiceRpc>(
HttpClient.client.fetchOk.pipe(
HttpClient.client.mapRequest(
HttpClient.request.prependUrl(shardManagerUri)
HttpClient.fetchOk.pipe(
HttpClient.mapRequest(
HttpClientRequest.prependUrl(shardManagerUri)
)
)
).pipe(Resolver.toClient)
)),
Layer.provide(Serialization.json),
Layer.provide(NodeClient.layer),
Layer.provide(NodeHttpClient.layerUndici),
Layer.provide(ShardingConfig.fromConfig)
)

Layer.launch(liveLayer).pipe(
Logger.withMinimumLogLevel(LogLevel.Debug),
Effect.tapErrorCause(Effect.logError),
runMain
NodeRuntime.runMain
)
10 changes: 8 additions & 2 deletions packages/sql-drizzle/test/Mysql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ describe.sequential("Mysql", () => {
yield* db.insert(users).values({ name: "Alice", snakeCase: "alice" })
const results = yield* db.select().from(users)
assert.deepStrictEqual(results, [{ id: 1, name: "Alice", snakeCase: "alice" }])
}).pipe(Effect.provide(MysqlContainer.DrizzleLive)), { timeout: 60000 })
}).pipe(
Effect.provide(MysqlContainer.DrizzleLive),
Effect.catchTag("ContainerError", () => Effect.void)
), { timeout: 60000 })

it.effect("remote callback", () =>
Effect.gen(function*(_) {
Expand All @@ -31,5 +34,8 @@ describe.sequential("Mysql", () => {
yield* Effect.promise(() => db.insert(users).values({ name: "Alice", snakeCase: "snake" }))
const results = yield* Effect.promise(() => db.select().from(users))
assert.deepStrictEqual(results, [{ id: 1, name: "Alice", snakeCase: "snake" }])
}).pipe(Effect.provide(MysqlContainer.DrizzleLive)), { timeout: 60000 })
}).pipe(
Effect.provide(MysqlContainer.DrizzleLive),
Effect.catchTag("ContainerError", () => Effect.void)
), { timeout: 60000 })
})
14 changes: 12 additions & 2 deletions packages/sql-drizzle/test/Pg.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ describe.sequential("Pg", () => {
yield* db.insert(users).values({ name: "Alice", snakeCase: "alice" })
const results = yield* db.select().from(users)
assert.deepStrictEqual(results, [{ id: 1, name: "Alice", snakeCase: "alice" }])
}).pipe(Effect.provide(PgContainer.DrizzleLive)), { timeout: 60000 })
}).pipe(
Effect.provide(PgContainer.DrizzleLive),
Effect.catchTag("ContainerError", () => Effect.void)
), {
timeout: 60000
})

it.effect("remote callback", () =>
Effect.gen(function*(_) {
Expand All @@ -31,5 +36,10 @@ describe.sequential("Pg", () => {
yield* Effect.promise(() => db.insert(users).values({ name: "Alice", snakeCase: "snake" }))
const results = yield* Effect.promise(() => db.select().from(users))
assert.deepStrictEqual(results, [{ id: 1, name: "Alice", snakeCase: "snake" }])
}).pipe(Effect.provide(PgContainer.DrizzleLive)), { timeout: 60000 })
}).pipe(
Effect.provide(PgContainer.DrizzleLive),
Effect.catchTag("ContainerError", () => Effect.void)
), {
timeout: 60000
})
})
20 changes: 15 additions & 5 deletions packages/sql-drizzle/test/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import * as Pg from "@effect/sql-pg"
import type { StartedMySqlContainer } from "@testcontainers/mysql"
import { MySqlContainer } from "@testcontainers/mysql"
import { PostgreSqlContainer, type StartedPostgreSqlContainer } from "@testcontainers/postgresql"
import { Config, Context, Effect, Layer, Secret } from "effect"
import { Config, Context, Data, Effect, Layer, Redacted } from "effect"

export class ContainerError extends Data.TaggedError("ContainerError")<{
cause: unknown
}> {}

export class PgContainer extends Context.Tag("test/PgContainer")<
PgContainer,
Expand All @@ -14,7 +18,10 @@ export class PgContainer extends Context.Tag("test/PgContainer")<
static Live = Layer.scoped(
this,
Effect.acquireRelease(
Effect.promise(() => new PostgreSqlContainer("postgres:alpine").start()),
Effect.tryPromise({
try: () => new PostgreSqlContainer("postgres:alpine").start(),
catch: (cause) => new ContainerError({ cause })
}),
(container) => Effect.promise(() => container.stop())
)
)
Expand All @@ -23,7 +30,7 @@ export class PgContainer extends Context.Tag("test/PgContainer")<
Effect.gen(function*(_) {
const container = yield* _(PgContainer)
return Pg.client.layer({
url: Config.succeed(Secret.fromString(container.getConnectionUri()))
url: Config.succeed(Redacted.make(container.getConnectionUri()))
})
})
).pipe(Layer.provide(this.Live))
Expand All @@ -38,7 +45,10 @@ export class MysqlContainer extends Context.Tag("test/MysqlContainer")<
static Live = Layer.scoped(
this,
Effect.acquireRelease(
Effect.promise(() => new MySqlContainer("mysql:lts").start()),
Effect.tryPromise({
try: () => new MySqlContainer("mysql:lts").start(),
catch: (cause) => new ContainerError({ cause })
}),
(container) => Effect.promise(() => container.stop())
)
)
Expand All @@ -47,7 +57,7 @@ export class MysqlContainer extends Context.Tag("test/MysqlContainer")<
Effect.gen(function*(_) {
const container = yield* _(MysqlContainer)
return Mysql.client.layer({
url: Config.succeed(Secret.fromString(container.getConnectionUri()))
url: Config.succeed(Redacted.make(container.getConnectionUri()))
})
})
).pipe(Layer.provide(this.Live))
Expand Down

0 comments on commit 2f619e6

Please sign in to comment.