Skip to content

Commit

Permalink
fix: interruptibility of zipWithC (via ZIO)
Browse files Browse the repository at this point in the history
  • Loading branch information
0x706b committed Jun 12, 2022
1 parent 56dd399 commit 286f472
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 42 deletions.
5 changes: 5 additions & 0 deletions .changeset/brave-crabs-change.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@fncts/io": patch
---

fix: interruptibility of zipWithC (via ZIO)
2 changes: 1 addition & 1 deletion packages/io/src/IO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ export * from "./IO/api/sleep.js";
export * from "./IO/api/stateful.js";
export * from "./IO/api/timeout.js";
export * from "./IO/api/withChildren.js";
export * from "./IO/api/withEarlyRelease.js";
export * from "./IO/api/withFinalizer.js";
export * from "./IO/api/withFinalizerExit.js";
export * from "./IO/api/withRuntimeConfig.js";
export * from "./IO/api/zipC.js";
export * from "./IO/api/zipWithC.js";
// codegen:end
61 changes: 61 additions & 0 deletions packages/io/src/IO/api/zipC.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,69 @@
import { tuple } from "@fncts/base/data/function";
import { AtomicReference } from "@fncts/base/internal/AtomicReference";

/**
* @tsplus fluent fncts.io.IO zipC
*/
export function zipC_<R, E, A, R1, E1, B>(self: IO<R, E, A>, that: IO<R1, E1, B>): IO<R & R1, E | E1, readonly [A, B]> {
return self.zipWithC(that, tuple);
}

/**
* @tsplus fluent fncts.io.IO zipWithC
*/
export function zipWithC_<R, E, A, R1, E1, B, C>(
self: IO<R, E, A>,
that: IO<R1, E1, B>,
f: (a: A, b: B) => C,
): IO<R & R1, E | E1, C> {
return IO.descriptorWith((descriptor) =>
IO.uninterruptibleMask(({ restore }) => {
const future = Future.unsafeMake<void, C>(FiberId.none);
const ref = new AtomicReference<Maybe<Either<A, B>>>(Nothing());
return IO.transplant((graft) =>
graft(
restore(self).matchCauseIO(
(cause) => future.fail(undefined) > IO.failCauseNow(cause),
(a) =>
ref.getAndSet(Just(Either.left(a))).match(
() => IO.unit,
(value) =>
value.match(
() => IO.unit,
(b) => future.succeed(f(a, b)).asUnit,
),
),
),
)
.forkDaemon.zip(
graft(
restore(that).matchCauseIO(
(cause) => future.fail(undefined) > IO.failCauseNow(cause),
(b) =>
ref.getAndSet(Just(Either.right(b))).match(
() => IO.unit,
(value) =>
value.match(
(a) => future.succeed(f(a, b)).asUnit,
() => IO.unit,
),
),
),
).forkDaemon,
)
.flatMap(([left, right]) =>
restore(future.await).matchCauseIO(
(cause) =>
left
.interruptAs(descriptor.id)
.zipC(right.interruptAs(descriptor.id))
.flatMap(([left, right]) =>
left.zipC(right).match(IO.failCauseNow, () => IO.failCauseNow(cause.stripFailures)),
),
(c) => left.inheritRefs.zip(right.inheritRefs).as(c),
),
),
);
}),
);
}
41 changes: 0 additions & 41 deletions packages/io/src/IO/api/zipWithC.ts

This file was deleted.

18 changes: 18 additions & 0 deletions packages/io/test/IOSpec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { completes } from "@fncts/test/control/Assertion";
import { Live } from "@fncts/test/control/Live";

import { withLatch } from "./Latch.js";
Expand Down Expand Up @@ -532,6 +533,23 @@ class IOSpec extends DefaultRunnableSpec {
}),
),
),
suite(
"zipC",
testIO(
"is interruptible",
Do((Δ) => {
const future1 = Δ(Future.make<never, void>());
const future2 = Δ(Future.make<never, void>());
const left = future1.succeed(undefined) > IO.never;
const right = future2.succeed(undefined) > IO.never;
const fiber = Δ(left.zipC(right).fork);
Δ(future1.await);
Δ(future2.await);
Δ(fiber.interrupt);
return true.assert(completes);
}),
),
),
);
}

Expand Down

0 comments on commit 286f472

Please sign in to comment.