diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index 909cc13..3bbf6df 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -1087,6 +1087,21 @@ module AsyncSeq = let! moven = ie.MoveNext() b := moven } + let windowed (windowSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> = + if windowSize < 1 then invalidArg (nameof windowSize) "must be positive" + asyncSeq { + let window = System.Collections.Generic.Queue<'T>(windowSize) + use ie = source.GetEnumerator() + let! move = ie.MoveNext() + let b = ref move + while b.Value.IsSome do + window.Enqueue(b.Value.Value) + if window.Count = windowSize then + yield window.ToArray() + window.Dequeue() |> ignore + let! moven = ie.MoveNext() + b := moven } + let pickAsync (f:'T -> Async<'U option>) (source:AsyncSeq<'T>) = async { use ie = source.GetEnumerator() let! v = ie.MoveNext() diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi index f801cc4..abf7da3 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi @@ -198,6 +198,14 @@ module AsyncSeq = /// singleton input sequence. val pairwise : source:AsyncSeq<'T> -> AsyncSeq<'T * 'T> + /// Returns an asynchronous sequence that yields sliding windows of the given size + /// over the source sequence, each yielded as an array. The first window is emitted + /// once windowSize elements have been consumed; subsequent windows slide one + /// element at a time. The sequence is empty when the source has fewer than + /// windowSize elements. Raises System.ArgumentException if + /// windowSize is less than 1. + val windowed : windowSize:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []> + /// Asynchronously aggregate the elements of the input asynchronous sequence using the /// specified asynchronous 'aggregation' function. val foldAsync : folder:('State -> 'T -> Async<'State>) -> state:'State -> source:AsyncSeq<'T> -> Async<'State> diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index fcf55b2..49e2c58 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -2553,6 +2553,46 @@ let ``AsyncSeq.pairwise with three elements should produce two pairs`` () = let result = AsyncSeq.pairwise source |> AsyncSeq.toListSynchronously Assert.AreEqual([(1, 2); (2, 3)], result) +[] +let ``AsyncSeq.windowed empty sequence returns empty`` () = + let result = AsyncSeq.windowed 3 AsyncSeq.empty |> AsyncSeq.toListSynchronously + Assert.AreEqual([], result) + +[] +let ``AsyncSeq.windowed fewer elements than window returns empty`` () = + let source = asyncSeq { yield 1; yield 2 } + let result = AsyncSeq.windowed 3 source |> AsyncSeq.toListSynchronously + Assert.AreEqual([], result) + +[] +let ``AsyncSeq.windowed exact window size returns single window`` () = + let source = asyncSeq { yield 1; yield 2; yield 3 } + let result = AsyncSeq.windowed 3 source |> AsyncSeq.toListSynchronously + Assert.AreEqual([[|1; 2; 3|]], result) + +[] +let ``AsyncSeq.windowed sliding window produces correct windows`` () = + let source = asyncSeq { yield 1; yield 2; yield 3; yield 4; yield 5 } + let result = AsyncSeq.windowed 3 source |> AsyncSeq.toListSynchronously + Assert.AreEqual([[|1;2;3|]; [|2;3;4|]; [|3;4;5|]], result) + +[] +let ``AsyncSeq.windowed size 1 returns each element as singleton array`` () = + let source = asyncSeq { yield 10; yield 20; yield 30 } + let result = AsyncSeq.windowed 1 source |> AsyncSeq.toListSynchronously + Assert.AreEqual([[|10|]; [|20|]; [|30|]], result) + +[] +let ``AsyncSeq.windowed size 2 is equivalent to pairwise as arrays`` () = + let source = asyncSeq { yield 1; yield 2; yield 3; yield 4 } + let result = AsyncSeq.windowed 2 source |> AsyncSeq.toListSynchronously + Assert.AreEqual([[|1;2|]; [|2;3|]; [|3;4|]], result) + +[] +let ``AsyncSeq.windowed with size 0 raises ArgumentException`` () = + Assert.Throws(fun () -> + AsyncSeq.windowed 0 (asyncSeq { yield 1 }) |> AsyncSeq.toListSynchronously |> ignore) |> ignore + [] let ``AsyncSeq.distinctUntilChangedWith should work with custom equality`` () = let source = asyncSeq { yield "a"; yield "A"; yield "B"; yield "b"; yield "c" }