Skip to content

Commit

Permalink
fix: reuse types from it-stream-types to simplify (#68)
Browse files Browse the repository at this point in the history
Removes the sync/async types for pipe source/sink functions as they are redundant.

Reuses the duplex/transform/sink interfaces from it-stream-types
  • Loading branch information
achingbrain authored Apr 4, 2023
1 parent bde41f4 commit 9fa98e1
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 97 deletions.
10 changes: 5 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@
},
"dependencies": {
"it-merge": "^3.0.0",
"it-pushable": "^3.1.0"
"it-pushable": "^3.1.2",
"it-stream-types": "^2.0.1"
},
"devDependencies": {
"aegir": "^38.1.8",
"delay": "^5.0.0",
"it-all": "^3.0.0",
"it-drain": "^3.0.0",
"it-map": "^3.0.0",
"it-stream-types": "^1.0.3",
"it-all": "^3.0.1",
"it-drain": "^3.0.1",
"it-map": "^3.0.2",
"p-defer": "^4.0.0",
"streaming-iterables": "^7.0.4"
}
Expand Down
167 changes: 77 additions & 90 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,116 +1,41 @@
import { pushable } from 'it-pushable'
import merge from 'it-merge'
import type { Duplex, Transform, Sink } from 'it-stream-types'

export const rawPipe = (...fns: any): any => {
let res
while (fns.length > 0) {
res = fns.shift()(res)
}
return res
}

const isAsyncIterable = (obj: any): obj is AsyncIterable<unknown> => {
return obj?.[Symbol.asyncIterator] != null
}

const isIterable = (obj: any): obj is Iterable<unknown> => {
return obj?.[Symbol.iterator] != null
}

const isDuplex = (obj: any): obj is Duplex => {
if (obj == null) {
return false
}

return obj.sink != null && obj.source != null
}

const duplexPipelineFn = (duplex: Duplex<any, any, any>) => {
return (source: any) => {
const p = duplex.sink(source)

if (p?.then != null) {
const stream = pushable<any>({
objectMode: true
})
p.then(() => {
stream.end()
}, (err: Error) => {
stream.end(err)
})

let sourceWrap: () => Iterable<any> | AsyncIterable<any>
const source = duplex.source

if (isAsyncIterable(source)) {
sourceWrap = async function * () {
yield * source
stream.end()
}
} else if (isIterable(source)) {
sourceWrap = function * () {
yield * source
stream.end()
}
} else {
throw new Error('Unknown duplex source type - must be Iterable or AsyncIterable')
}

return merge(stream, sourceWrap())
}

return duplex.source
}
}

export interface Duplex<TSource = unknown, TSink = unknown, RSink = unknown> {
source: TSource
sink: (source: TSink) => RSink
}

export interface FnSource<A = any, B = any> { (source: A): Iterable<B> }
export interface FnAsyncSource<A = any, B = any> { (source: A): AsyncIterable<B> }

export interface FnSink<A = any, B = any> { (source: A): B }
export interface FnAsyncSink<A = any, B = any> { (source: A): Promise<B> }
interface SourceFn<A = any> { (): A }

type PipeSource<A = any> =
Iterable<A> |
AsyncIterable<A> |
FnSource<any, A> |
FnAsyncSource<any, A> |
SourceFn<A> |
Duplex<A, any, any>

type PipeTransform<A = any, B extends Iterable<any> | AsyncIterable<any> = any> =
FnSource<A, B> |
FnAsyncSource<A, B> |
Duplex<B, A, any>
type PipeTransform<A = any, B = any> =
Transform<A, B> |
Duplex<B, A>

type PipeSink<A = any, B = any> =
FnSink<A, B> |
FnAsyncSink<A, B> |
Sink<A, B> |
Duplex<any, A, B>

type PipeOutput<A> =
A extends FnSink ? ReturnType<A> :
A extends FnAsyncSink ? ReturnType<A> :
A extends Duplex<any, any, any> ? ReturnType<A['sink']> :
never
A extends Sink<any> ? ReturnType<A> :
A extends Duplex<any, any, any> ? ReturnType<A['sink']> :
never

// single item pipe output includes pipe source types
type SingleItemPipeOutput<A> =
A extends Iterable<any> ? A :
A extends AsyncIterable<any> ? A :
A extends FnSource ? ReturnType<A> :
A extends FnAsyncSource ? ReturnType<A> :
A extends Duplex<any, any, any> ? A['source'] :
PipeOutput<A>
A extends SourceFn ? ReturnType<A> :
A extends Duplex<any, any, any> ? A['source'] :
PipeOutput<A>

type PipeFnInput<A> =
A extends Iterable<any> ? A :
A extends AsyncIterable<any> ? A :
A extends FnSource ? ReturnType<A> :
A extends FnAsyncSource ? ReturnType<A> :
A extends SourceFn ? ReturnType<A> :
A extends Transform<any, any> ? ReturnType<A> :
A extends Duplex<any, any, any> ? A['source'] :
never

Expand Down Expand Up @@ -365,3 +290,65 @@ export function pipe (first: any, ...rest: any[]): any {

return rawPipe(...fns)
}

export const rawPipe = (...fns: any): any => {
let res
while (fns.length > 0) {
res = fns.shift()(res)
}
return res
}

const isAsyncIterable = (obj: any): obj is AsyncIterable<unknown> => {
return obj?.[Symbol.asyncIterator] != null
}

const isIterable = (obj: any): obj is Iterable<unknown> => {
return obj?.[Symbol.iterator] != null
}

const isDuplex = (obj: any): obj is Duplex => {
if (obj == null) {
return false
}

return obj.sink != null && obj.source != null
}

const duplexPipelineFn = (duplex: Duplex<any, any, any>) => {
return (source: any) => {
const p = duplex.sink(source)

if (p?.then != null) {
const stream = pushable<any>({
objectMode: true
})
p.then(() => {
stream.end()
}, (err: Error) => {
stream.end(err)
})

let sourceWrap: () => Iterable<any> | AsyncIterable<any>
const source = duplex.source

if (isAsyncIterable(source)) {
sourceWrap = async function * () {
yield * source
stream.end()
}
} else if (isIterable(source)) {
sourceWrap = function * () {
yield * source
stream.end()
}
} else {
throw new Error('Unknown duplex source type - must be Iterable or AsyncIterable')
}

return merge(stream, sourceWrap())
}

return duplex.source
}
}
3 changes: 1 addition & 2 deletions test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import map from 'it-map'
import { filter, collect, consume } from 'streaming-iterables'
import delay from 'delay'
import defer from 'p-defer'
import type { Source } from 'it-stream-types'
import type { Duplex } from '../src/index.js'
import type { Source, Duplex } from 'it-stream-types'

const oneTwoThree = (): number[] => [1, 2, 3]

Expand Down

0 comments on commit 9fa98e1

Please sign in to comment.