Skip to content

Commit

Permalink
add Ndjson module to experimental (#2311)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Mar 13, 2024
1 parent 28dc61b commit 9971186
Show file tree
Hide file tree
Showing 13 changed files with 538 additions and 173 deletions.
7 changes: 7 additions & 0 deletions .changeset/honest-glasses-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@effect/experimental": patch
---

add Ndjson module to experimental

Allows you to encode + decode "new line delimited json"
5 changes: 5 additions & 0 deletions .changeset/itchy-months-nail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/experimental": minor
---

propogate channel Done type in MsgPack module apis
8 changes: 8 additions & 0 deletions .changeset/silver-kids-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"effect": patch
---

add Channel.splitLines api

It splits strings on newlines. Handles both Windows newlines (`\r\n`) and UNIX
newlines (`\n`).
5 changes: 5 additions & 0 deletions .changeset/tame-insects-explain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/experimental": minor
---

use Ndson for DevTools protocol (instead of msgpack)
17 changes: 17 additions & 0 deletions packages/effect/src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1951,6 +1951,23 @@ export const scoped: <A, E, R>(
effect: Effect.Effect<A, E, R>
) => Channel<A, unknown, E, unknown, unknown, unknown, Exclude<R, Scope.Scope>> = channel.scoped

/**
* Splits strings on newlines. Handles both Windows newlines (`\r\n`) and UNIX
* newlines (`\n`).
*
* @since 2.0.0
* @category combinators
*/
export const splitLines: <Err, Done>() => Channel<
Chunk.Chunk<string>,
Chunk.Chunk<string>,
Err,
Err,
Done,
Done,
never
> = channel.splitLines

/**
* Constructs a channel that succeeds immediately with the specified value.
*
Expand Down
92 changes: 92 additions & 0 deletions packages/effect/src/internal/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2105,6 +2105,98 @@ export const serviceWithEffect = <T extends Context.Tag<any, any>>(tag: T) =>
): Channel.Channel<never, unknown, OutErr, unknown, OutDone, unknown, Env | Context.Tag.Identifier<T>> =>
mapEffect(service(tag), f)

/** @internal */
export const splitLines = <Err, Done>(): Channel.Channel<
Chunk.Chunk<string>,
Chunk.Chunk<string>,
Err,
Err,
Done,
Done,
never
> =>
core.suspend(() => {
let stringBuilder = ""
let midCRLF = false
const splitLinesChunk = (chunk: Chunk.Chunk<string>): Chunk.Chunk<string> => {
const chunkBuilder: Array<string> = []
Chunk.map(chunk, (str) => {
if (str.length !== 0) {
let from = 0
let indexOfCR = str.indexOf("\r")
let indexOfLF = str.indexOf("\n")
if (midCRLF) {
if (indexOfLF === 0) {
chunkBuilder.push(stringBuilder)
stringBuilder = ""
from = 1
indexOfLF = str.indexOf("\n", from)
} else {
stringBuilder = stringBuilder + "\r"
}
midCRLF = false
}
while (indexOfCR !== -1 || indexOfLF !== -1) {
if (indexOfCR === -1 || (indexOfLF !== -1 && indexOfLF < indexOfCR)) {
if (stringBuilder.length === 0) {
chunkBuilder.push(str.substring(from, indexOfLF))
} else {
chunkBuilder.push(stringBuilder + str.substring(from, indexOfLF))
stringBuilder = ""
}
from = indexOfLF + 1
indexOfLF = str.indexOf("\n", from)
} else {
if (str.length === indexOfCR + 1) {
midCRLF = true
indexOfCR = -1
} else {
if (indexOfLF === indexOfCR + 1) {
if (stringBuilder.length === 0) {
chunkBuilder.push(str.substring(from, indexOfCR))
} else {
stringBuilder = stringBuilder + str.substring(from, indexOfCR)
chunkBuilder.push(stringBuilder)
stringBuilder = ""
}
from = indexOfCR + 2
indexOfCR = str.indexOf("\r", from)
indexOfLF = str.indexOf("\n", from)
} else {
indexOfCR = str.indexOf("\r", indexOfCR + 1)
}
}
}
}
if (midCRLF) {
stringBuilder = stringBuilder + str.substring(from, str.length - 1)
} else {
stringBuilder = stringBuilder + str.substring(from, str.length)
}
}
})
return Chunk.unsafeFromArray(chunkBuilder)
}
const loop: Channel.Channel<Chunk.Chunk<string>, Chunk.Chunk<string>, Err, Err, Done, Done, never> = core
.readWithCause({
onInput: (input: Chunk.Chunk<string>) => {
const out = splitLinesChunk(input)
return Chunk.isEmpty(out)
? loop
: core.flatMap(core.write(out), () => loop)
},
onFailure: (cause) =>
stringBuilder.length === 0
? core.failCause(cause)
: core.flatMap(core.write(Chunk.of(stringBuilder)), () => core.failCause(cause)),
onDone: (done) =>
stringBuilder.length === 0
? core.succeed(done)
: core.flatMap(core.write(Chunk.of(stringBuilder)), () => core.succeed(done))
})
return loop
})

/** @internal */
export const toPubSub = <Done, Err, Elem>(
pubsub: PubSub.PubSub<Either.Either<Elem, Exit.Exit<Done, Err>>>
Expand Down
84 changes: 2 additions & 82 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4491,7 +4491,7 @@ export const pipeThroughChannel = dual<
<R, R2, E, E2, A, A2>(
self: Stream.Stream<A, E, R>,
channel: Channel.Channel<Chunk.Chunk<A2>, Chunk.Chunk<A>, E2, E, unknown, unknown, R2>
): Stream.Stream<A2, E2, R2 | R> => new StreamImpl(pipe(toChannel(self), core.pipeTo(channel)))
): Stream.Stream<A2, E2, R2 | R> => new StreamImpl(core.pipeTo(toChannel(self), channel))
)

/** @internal */
Expand Down Expand Up @@ -5846,87 +5846,7 @@ export const splitOnChunk = dual<

/** @internal */
export const splitLines = <E, R>(self: Stream.Stream<string, E, R>): Stream.Stream<string, E, R> =>
suspend(() => {
let stringBuilder = ""
let midCRLF = false
const splitLinesChunk = (chunk: Chunk.Chunk<string>): Chunk.Chunk<string> => {
const chunkBuilder: Array<string> = []
Chunk.map(chunk, (str) => {
if (str.length !== 0) {
let from = 0
let indexOfCR = str.indexOf("\r")
let indexOfLF = str.indexOf("\n")
if (midCRLF) {
if (indexOfLF === 0) {
chunkBuilder.push(stringBuilder)
stringBuilder = ""
from = 1
indexOfLF = str.indexOf("\n", from)
} else {
stringBuilder = stringBuilder + "\r"
}
midCRLF = false
}
while (indexOfCR !== -1 || indexOfLF !== -1) {
if (indexOfCR === -1 || (indexOfLF !== -1 && indexOfLF < indexOfCR)) {
if (stringBuilder.length === 0) {
chunkBuilder.push(str.substring(from, indexOfLF))
} else {
chunkBuilder.push(stringBuilder + str.substring(from, indexOfLF))
stringBuilder = ""
}
from = indexOfLF + 1
indexOfLF = str.indexOf("\n", from)
} else {
if (str.length === indexOfCR + 1) {
midCRLF = true
indexOfCR = -1
} else {
if (indexOfLF === indexOfCR + 1) {
if (stringBuilder.length === 0) {
chunkBuilder.push(str.substring(from, indexOfCR))
} else {
stringBuilder = stringBuilder + str.substring(from, indexOfCR)
chunkBuilder.push(stringBuilder)
stringBuilder = ""
}
from = indexOfCR + 2
indexOfCR = str.indexOf("\r", from)
indexOfLF = str.indexOf("\n", from)
} else {
indexOfCR = str.indexOf("\r", indexOfCR + 1)
}
}
}
}
if (midCRLF) {
stringBuilder = stringBuilder + str.substring(from, str.length - 1)
} else {
stringBuilder = stringBuilder + str.substring(from, str.length)
}
}
})
return Chunk.unsafeFromArray(chunkBuilder)
}
const loop: Channel.Channel<Chunk.Chunk<string>, Chunk.Chunk<string>, E, E, unknown, unknown, R> = core
.readWithCause({
onInput: (input: Chunk.Chunk<string>) => {
const out = splitLinesChunk(input)
return Chunk.isEmpty(out)
? loop
: core.flatMap(core.write(out), () => loop)
},
onFailure: (cause) =>
stringBuilder.length === 0
? core.failCause(cause)
: core.flatMap(core.write(Chunk.of(stringBuilder)), () => core.failCause(cause)),
onDone: (done) =>
stringBuilder.length === 0
? core.succeed(done)
: core.flatMap(core.write(Chunk.of(stringBuilder)), () => core.succeed(done))
})
return new StreamImpl(core.pipeTo(toChannel(self), loop))
})
pipeThroughChannel(self, channel.splitLines())

/** @internal */
export const succeed = <A>(value: A): Stream.Stream<A> => fromChunk(Chunk.of(value))
Expand Down
4 changes: 2 additions & 2 deletions packages/experimental/src/DevTools/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import * as Queue from "effect/Queue"
import * as Schedule from "effect/Schedule"
import * as Stream from "effect/Stream"
import * as Tracer from "effect/Tracer"
import * as MsgPack from "../MsgPack.js"
import * as Ndjson from "../Ndjson.js"
import * as Domain from "./Domain.js"

/**
Expand Down Expand Up @@ -106,7 +106,7 @@ export const make: Effect.Effect<ClientImpl, never, Scope.Scope | Socket.Socket>
yield* _(
Stream.fromQueue(requests),
Stream.pipeThroughChannel(
MsgPack.duplexSchema(Socket.toChannel(socket), {
Ndjson.duplexSchema(Socket.toChannel(socket), {
inputSchema: Domain.Request,
outputSchema: Domain.Response
})
Expand Down
4 changes: 2 additions & 2 deletions packages/experimental/src/DevTools/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Queue from "effect/Queue"
import * as Stream from "effect/Stream"
import * as MsgPack from "../MsgPack.js"
import * as Ndjson from "../Ndjson.js"
import * as SocketServer from "../SocketServer/Node.js"
import * as Domain from "./Domain.js"

Expand Down Expand Up @@ -64,7 +64,7 @@ export const make = Effect.gen(function*(_) {
yield* _(
Stream.fromQueue(responses),
Stream.pipeThroughChannel(
MsgPack.duplexSchema(Socket.toChannel(socket), {
Ndjson.duplexSchema(Socket.toChannel(socket), {
inputSchema: Domain.Response,
outputSchema: Domain.Request
})
Expand Down
Loading

0 comments on commit 9971186

Please sign in to comment.