Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,46 @@
let inline sum (source : AsyncSeq<'T>) : Async<'T> =
(LanguagePrimitives.GenericZero, source) ||> fold (+)

let minByAsync (projection: 'T -> Async<'Key>) (source: AsyncSeq<'T>) : Async<'T> =
async {
let! result =
source |> foldAsync (fun (acc: ('T * 'Key) option) v ->
async {
let! k = projection v
match acc with
| None -> return Some (v, k)
| Some (_, ak) -> return if k < ak then Some (v, k) else acc
}) None
match result with
| None -> return raise (System.InvalidOperationException("The input sequence was empty."))
| Some (v, _) -> return v }

let minBy (projection: 'T -> 'Key) (source: AsyncSeq<'T>) : Async<'T> =
minByAsync (projection >> async.Return) source

let maxByAsync (projection: 'T -> Async<'Key>) (source: AsyncSeq<'T>) : Async<'T> =
async {
let! result =
source |> foldAsync (fun (acc: ('T * 'Key) option) v ->
async {
let! k = projection v
match acc with
| None -> return Some (v, k)
| Some (_, ak) -> return if k > ak then Some (v, k) else acc
}) None
match result with
| None -> return raise (System.InvalidOperationException("The input sequence was empty."))
| Some (v, _) -> return v }

let maxBy (projection: 'T -> 'Key) (source: AsyncSeq<'T>) : Async<'T> =
maxByAsync (projection >> async.Return) source

let min (source: AsyncSeq<'T>) : Async<'T> =
minBy id source

let max (source: AsyncSeq<'T>) : Async<'T> =
maxBy id source

let inline sumBy (projection : 'T -> ^U) (source : AsyncSeq<'T>) : Async<^U> =
fold (fun s x -> s + projection x) LanguagePrimitives.GenericZero source

Expand Down Expand Up @@ -1747,7 +1787,7 @@
| Some rem -> async.Return rem
| None -> Async.StartChildAsTask(ie.MoveNext())
let t = Stopwatch.GetTimestamp()
let! time = Async.StartChildAsTask(Async.Sleep (max 0 rt))
let! time = Async.StartChildAsTask(Async.Sleep (Operators.max 0 rt))
let! moveOr = Async.chooseTasks move time
let delta = int ((Stopwatch.GetTimestamp() - t) * 1000L / Stopwatch.Frequency)
match moveOr with
Expand Down Expand Up @@ -2081,7 +2121,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2124 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2124 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
18 changes: 18 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,24 @@ module AsyncSeq =
when ^T : (static member ( + ) : ^T * ^T -> ^T)
and ^T : (static member Zero : ^T)

/// Asynchronously find the element with the minimum projected value. Raises InvalidOperationException if the sequence is empty.
val minByAsync : projection:('T -> Async<'Key>) -> source:AsyncSeq<'T> -> Async<'T> when 'Key : comparison

/// Asynchronously find the element with the minimum projected value. Raises InvalidOperationException if the sequence is empty.
val minBy : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> Async<'T> when 'Key : comparison

/// Asynchronously find the element with the maximum projected value. Raises InvalidOperationException if the sequence is empty.
val maxByAsync : projection:('T -> Async<'Key>) -> source:AsyncSeq<'T> -> Async<'T> when 'Key : comparison

/// Asynchronously find the element with the maximum projected value. Raises InvalidOperationException if the sequence is empty.
val maxBy : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> Async<'T> when 'Key : comparison

/// Asynchronously find the minimum element. Raises InvalidOperationException if the sequence is empty.
val min : source:AsyncSeq<'T> -> Async<'T> when 'T : comparison

/// Asynchronously find the maximum element. Raises InvalidOperationException if the sequence is empty.
val max : source:AsyncSeq<'T> -> Async<'T> when 'T : comparison

/// Asynchronously sum the mapped elements of an asynchronous sequence using a synchronous projection.
val inline sumBy : projection:('T -> ^U) -> source:AsyncSeq<'T> -> Async< ^U>
when ^U : (static member ( + ) : ^U * ^U -> ^U)
Expand Down
48 changes: 48 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,54 @@
let expected = ls |> List.sum
Assert.True((expected = actual))

[<Test>]
let ``AsyncSeq.min returns minimum element``() =
for i in 1 .. 10 do
let ls = [ 1 .. i ] |> List.rev
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.min |> Async.RunSynchronously
Assert.AreEqual(1, actual)

[<Test>]
let ``AsyncSeq.min raises on empty sequence``() =
Assert.Throws<System.InvalidOperationException>(fun () ->
AsyncSeq.empty<int> |> AsyncSeq.min |> Async.RunSynchronously |> ignore) |> ignore

[<Test>]
let ``AsyncSeq.max returns maximum element``() =
for i in 1 .. 10 do
let ls = [ 1 .. i ]
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.max |> Async.RunSynchronously
Assert.AreEqual(i, actual)

[<Test>]
let ``AsyncSeq.max raises on empty sequence``() =
Assert.Throws<System.InvalidOperationException>(fun () ->
AsyncSeq.empty<int> |> AsyncSeq.max |> Async.RunSynchronously |> ignore) |> ignore

[<Test>]
let ``AsyncSeq.minBy returns element with minimum projected value``() =
let ls = [ ("b", 2); ("a", 1); ("c", 3) ]
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.minBy snd |> Async.RunSynchronously
Assert.AreEqual(("a", 1), actual)

[<Test>]
let ``AsyncSeq.maxBy returns element with maximum projected value``() =
let ls = [ ("b", 2); ("a", 1); ("c", 3) ]
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.maxBy snd |> Async.RunSynchronously
Assert.AreEqual(("c", 3), actual)

[<Test>]
let ``AsyncSeq.minByAsync uses async projection``() =
let ls = [ 3; 1; 4; 1; 5; 9 ]
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.minByAsync (fun x -> async.Return x) |> Async.RunSynchronously
Assert.AreEqual(1, actual)

[<Test>]
let ``AsyncSeq.maxByAsync uses async projection``() =
let ls = [ 3; 1; 4; 1; 5; 9 ]
let actual = AsyncSeq.ofSeq ls |> AsyncSeq.maxByAsync (fun x -> async.Return x) |> Async.RunSynchronously
Assert.AreEqual(9, actual)

[<Test>]
let ``AsyncSeq.sumBy works``() =
for i in 0 .. 10 do
Expand Down Expand Up @@ -1916,7 +1964,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 1967 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -1925,7 +1973,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 1976 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down