Skip to content

Commit

Permalink
Implement take, truncate, skip and drop
Browse files Browse the repository at this point in the history
  • Loading branch information
abelbraaksma committed Dec 18, 2023
1 parent aabde2f commit 04ecbf2
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 22 deletions.
28 changes: 15 additions & 13 deletions src/FSharp.Control.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,15 @@ open System.Threading.Tasks

#nowarn "57"

// Just for convenience
module Internal = TaskSeqInternal

[<AutoOpen>]
module TaskSeqExtensions =
// these need to be in a module, not a type for proper auto-initialization of generic values
module TaskSeq =
let empty<'T> = Internal.empty<'T>

let empty<'T> =
{ new IAsyncEnumerable<'T> with
member _.GetAsyncEnumerator(_) =
{ new IAsyncEnumerator<'T> with
member _.MoveNextAsync() = ValueTask.False
member _.Current = Unchecked.defaultof<'T>
member _.DisposeAsync() = ValueTask.CompletedTask
}
}

// Just for convenience
module Internal = TaskSeqInternal

[<Sealed; AbstractClass>]
type TaskSeq private () =
Expand Down Expand Up @@ -195,7 +188,7 @@ type TaskSeq private () =
static member append (source1: TaskSeq<'T>) (source2: TaskSeq<'T>) =
Internal.checkNonNull (nameof source1) source1
Internal.checkNonNull (nameof source2) source2

taskSeq {
yield! source1
yield! source2
Expand Down Expand Up @@ -289,18 +282,27 @@ type TaskSeq private () =

static member choose chooser source = Internal.choose (TryPick chooser) source
static member chooseAsync chooser source = Internal.choose (TryPickAsync chooser) source

static member filter predicate source = Internal.filter (Predicate predicate) source
static member filterAsync predicate source = Internal.filter (PredicateAsync predicate) source

static member skip count source = Internal.skipOrTake Skip count source
static member drop count source = Internal.skipOrTake Drop count source
static member take count source = Internal.skipOrTake Take count source
static member truncate count source = Internal.skipOrTake Truncate count source

static member takeWhile predicate source = Internal.takeWhile Exclusive (Predicate predicate) source
static member takeWhileAsync predicate source = Internal.takeWhile Exclusive (PredicateAsync predicate) source
static member takeWhileInclusive predicate source = Internal.takeWhile Inclusive (Predicate predicate) source
static member takeWhileInclusiveAsync predicate source = Internal.takeWhile Inclusive (PredicateAsync predicate) source

static member tryPick chooser source = Internal.tryPick (TryPick chooser) source
static member tryPickAsync chooser source = Internal.tryPick (TryPickAsync chooser) source
static member tryFind predicate source = Internal.tryFind (Predicate predicate) source
static member tryFindAsync predicate source = Internal.tryFind (PredicateAsync predicate) source
static member tryFindIndex predicate source = Internal.tryFindIndex (Predicate predicate) source
static member tryFindIndexAsync predicate source = Internal.tryFindIndex (PredicateAsync predicate) source

static member except itemsToExclude source = Internal.except itemsToExclude source
static member exceptOfSeq itemsToExclude source = Internal.exceptOfSeq itemsToExclude source

Expand Down
60 changes: 60 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,66 @@ type TaskSeq =
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
static member filterAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>

/// <summary>
/// Returns a task sequence that, when iterated, skips <paramref name="count" /> elements of the
/// underlying sequence, and then returns the remainder of the elements. Raises an exception if there are not enough
/// elements in the sequence. See <see cref="drop" /> for a version that does not raise an exception.
/// See also <see cref="take" /> for the inverse of this operation.
/// </summary>
///
/// <param name="count">The number of items to skip.</param>
/// <param name="source">The input task sequence.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
/// <exception cref="T:ArgumentException">Thrown when <paramref name="count" /> is less than zero.</exception>
/// <exception cref="T:InvalidOperationException">Thrown when count exceeds the number of elements in the sequence.</exception>
static member skip: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>


/// <summary>
/// Returns a task sequence that, when iterated, drops at most <paramref name="count" /> elements of the
/// underlying sequence, and then returns the remainder of the elements, if any.
/// See <see cref="skip" /> for a version that raises an exception if there
/// are not enough elements. See also <see cref="truncate" /> for the inverse of this operation.
/// </summary>
///
/// <param name="count">The number of items to drop.</param>
/// <param name="source">The input task sequence.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
/// <exception cref="T:ArgumentException">Thrown when <paramref name="count" /> is less than zero.</exception>
static member drop: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>

/// <summary>
/// Returns a task sequence that, when iterated, yields <paramref name="count" /> elements of the
/// underlying sequence, and then returns no further elements. Raises an exception if there are not enough
/// elements in the sequence. See <see cref="truncate" /> for a version that does not raise an exception.
/// See also <see cref="skip" /> for the inverse of this operation.
/// </summary>
///
/// <param name="count">The number of items to take.</param>
/// <param name="source">The input task sequence.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
/// <exception cref="T:ArgumentException">Thrown when <paramref name="count" /> is less than zero.</exception>
/// <exception cref="T:InvalidOperationException">Thrown when count exceeds the number of elements in the sequence.</exception>
static member take: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>

/// <summary>
/// Returns a task sequence that, when iterated, yields at most <paramref name="count" /> elements of the underlying
/// sequence, truncating the remainder, if any.
/// See <see cref="take" /> for a version that raises an exception if there are not enough elements in the
/// sequence. See also <see cref="drop" /> for the inverse of this operation.
/// </summary>
///
/// <param name="count">The maximum number of items to enumerate.</param>
/// <param name="source">The input task sequence.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
/// <exception cref="T:ArgumentException">Thrown when <paramref name="count" /> is less than zero.</exception>
static member truncate: count: int -> source: TaskSeq<'T> -> TaskSeq<'T>


/// <summary>
/// Returns a task sequence that, when iterated, yields elements of the underlying sequence while the
/// given function <paramref name="predicate" /> returns <see cref="true" />, and then returns no further elements.
Expand Down
123 changes: 114 additions & 9 deletions src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ type internal WhileKind =
/// The item under test is always excluded
| Exclusive

[<Struct>]
type internal TakeOrSkipKind =
/// use the Seq.take semantics, raises exception if not enough elements
| Take
/// use the Seq.skip semantics, raises exception if not enough elements
| Skip
/// use the Seq.truncate semantics, safe operation, returns all if count exceeds the seq
| Truncate
/// no Seq equiv, but like Stream.drop in Scala: safe operation, return empty if not enough elements
| Drop

[<Struct>]
type internal Action<'T, 'U, 'TaskU when 'TaskU :> Task<'U>> =
| CountableAction of countable_action: (int -> 'T -> 'U)
Expand Down Expand Up @@ -52,19 +63,16 @@ module internal TaskSeqInternal =
nullArg argName

let inline raiseEmptySeq () =
ArgumentException("The asynchronous input sequence was empty.", "source")
|> raise
invalidArg "source" "The input task sequence was empty."

let inline raiseCannotBeNegative (name: string) =
ArgumentException("The value cannot be negative", name)
|> raise
let inline raiseCannotBeNegative name =
invalidArg name "The value must be non-negative"

let inline raiseInsufficient () =
ArgumentException("The asynchronous input sequence was has an insufficient number of elements.", "source")
|> raise
invalidArg "source" "The input task sequence was has an insufficient number of elements."

let inline raiseNotFound () =
KeyNotFoundException("The predicate function or index did not satisfy any item in the async sequence.")
KeyNotFoundException("The predicate function or index did not satisfy any item in the task sequence.")
|> raise

let isEmpty (source: TaskSeq<_>) =
Expand All @@ -76,6 +84,16 @@ module internal TaskSeqInternal =
return not step
}

let empty<'T> =
{ new IAsyncEnumerable<'T> with
member _.GetAsyncEnumerator(_) =
{ new IAsyncEnumerator<'T> with
member _.MoveNextAsync() = ValueTask.False
member _.Current = Unchecked.defaultof<'T>
member _.DisposeAsync() = ValueTask.CompletedTask
}
}

let singleton (value: 'T) =
{ new IAsyncEnumerable<'T> with
member _.GetAsyncEnumerator(_) =
Expand Down Expand Up @@ -613,9 +631,96 @@ module internal TaskSeqInternal =
| false -> ()
}

let takeWhile whileKind predicate (source: TaskSeq<_>) =

let skipOrTake skipOrTake count (source: TaskSeq<_>) =
checkNonNull (nameof source) source
if count < 0 then raiseCannotBeNegative (nameof count)

match skipOrTake with
| Skip ->
// don't create a new sequence if count = 0
if count = 0 then source
else
taskSeq {
use e = source.GetAsyncEnumerator CancellationToken.None

for _ in 1..count do
let! step = e.MoveNextAsync()
if not step then
raiseInsufficient()

let mutable cont = true

while cont do
yield e.Current
let! moveNext = e.MoveNextAsync()
cont <- moveNext

}
| Drop ->
// don't create a new sequence if count = 0
if count = 0 then source
else
taskSeq {
use e = source.GetAsyncEnumerator CancellationToken.None

let! step = e.MoveNextAsync()
let mutable cont = step
let mutable pos = 0

// skip, or stop looping if we reached the end
while cont do
pos <- pos + 1
let! moveNext = e.MoveNextAsync()
cont <- moveNext && pos <= count

// return the rest
while cont do
yield e.Current
let! moveNext = e.MoveNextAsync()
cont <- moveNext

}
| Take ->
// don't initialize an empty task sequence
if count = 0 then empty
else
taskSeq {
use e = source.GetAsyncEnumerator CancellationToken.None

for _ in count .. - 1 .. 1 do
let! step = e.MoveNextAsync()
if not step then
raiseInsufficient()

yield e.Current
}

| Truncate ->
// don't create a new sequence if count = 0
if count = 0 then empty
else
taskSeq {
use e = source.GetAsyncEnumerator CancellationToken.None

let! step = e.MoveNextAsync()
let mutable cont = step
let mutable pos = 0

// return items until we've exhausted the seq
// report this line, weird error:
//while! e.MoveNextAsync() && pos < 1 do
while cont do
yield e.Current
pos <- pos + 1
let! moveNext = e.MoveNextAsync()
cont <- moveNext && pos <= count

}

let takeWhile whileKind predicate (source: TaskSeq<_>) =
checkNonNull (nameof source) source

taskSeq {
use e = source.GetAsyncEnumerator CancellationToken.None
let! step = e.MoveNextAsync()
Expand Down

0 comments on commit 04ecbf2

Please sign in to comment.