Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,28 @@
| None -> return def
| Some v -> return v }

let exactlyOne (source : AsyncSeq<'T>) = async {
use ie = source.GetEnumerator()
let! first = ie.MoveNext()
match first with
| None -> return raise (System.InvalidOperationException("The input sequence was empty."))
| Some v ->
let! second = ie.MoveNext()
match second with
| None -> return v
| Some _ -> return raise (System.InvalidOperationException("The input sequence contains more than one element.")) }

let tryExactlyOne (source : AsyncSeq<'T>) = async {
use ie = source.GetEnumerator()
let! first = ie.MoveNext()
match first with
| None -> return None
| Some v ->
let! second = ie.MoveNext()
match second with
| None -> return Some v
| Some _ -> return None }

let scanAsync f (state:'TState) (source : AsyncSeq<'T>) = asyncSeq {
yield state
let z = ref state
Expand Down Expand Up @@ -1252,6 +1274,22 @@
return LanguagePrimitives.DivideByInt sum count
}

let countByAsync (projection : 'T -> Async<'Key>) (source : AsyncSeq<'T>) : Async<('Key * int) array> =
async {
let! dict =
source |> foldAsync (fun (d : System.Collections.Generic.Dictionary<'Key,int>) v ->
async {
let! k = projection v
let mutable cnt = 0
if d.TryGetValue(k, &cnt) then d.[k] <- cnt + 1
else d.[k] <- 1
return d
}) (System.Collections.Generic.Dictionary<'Key,int>())
return dict |> Seq.map (fun kv -> kv.Key, kv.Value) |> Seq.toArray }

let countBy (projection : 'T -> 'Key) (source : AsyncSeq<'T>) : Async<('Key * int) array> =
countByAsync (projection >> async.Return) source

let scan f (state:'State) (source : AsyncSeq<'T>) =
scanAsync (fun st v -> f st v |> async.Return) state source

Expand Down Expand Up @@ -1997,6 +2035,19 @@
let distinctUntilChanged (s:AsyncSeq<'T>) : AsyncSeq<'T> =
distinctUntilChangedWith ((=)) s

let distinctByAsync (projection : 'T -> Async<'Key>) (source : AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
let seen = System.Collections.Generic.HashSet<'Key>()
for v in source do
let! k = projection v
if seen.Add(k) then
yield v }

let distinctBy (projection : 'T -> 'Key) (source : AsyncSeq<'T>) : AsyncSeq<'T> =
distinctByAsync (projection >> async.Return) source

let distinct (source : AsyncSeq<'T>) : AsyncSeq<'T> =
distinctBy id source

let getIterator (s:AsyncSeq<'T>) : (unit -> Async<'T option>) =
let curr = s.GetEnumerator()
fun () -> curr.MoveNext()
Expand Down Expand Up @@ -2121,7 +2172,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2175 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2175 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
28 changes: 28 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ module AsyncSeq =
/// given asynchronous sequence (or None if the sequence is empty).
val tryFirst : source:AsyncSeq<'T> -> Async<'T option>

/// Asynchronously returns the only element of the asynchronous sequence.
/// Raises InvalidOperationException if the sequence is empty or contains more than one element.
val exactlyOne : source:AsyncSeq<'T> -> Async<'T>

/// Asynchronously returns the only element of the asynchronous sequence, or None if the
/// sequence is empty or contains more than one element.
val tryExactlyOne : source:AsyncSeq<'T> -> Async<'T option>

/// Aggregates the elements of the input asynchronous sequence using the
/// specified 'aggregation' function. The result is an asynchronous
/// sequence of intermediate aggregation result.
Expand Down Expand Up @@ -276,6 +284,14 @@ module AsyncSeq =
and ^U : (static member DivideByInt : ^U * int -> ^U)
and ^U : (static member Zero : ^U)

/// Asynchronously count the elements of the input asynchronous sequence grouped by the result of the given asynchronous key projection.
/// Returns an array of (key, count) pairs.
val countByAsync : projection:('T -> Async<'Key>) -> source:AsyncSeq<'T> -> Async<('Key * int) array> when 'Key : equality

/// Asynchronously count the elements of the input asynchronous sequence grouped by the result of the given key projection.
/// Returns an array of (key, count) pairs.
val countBy : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> Async<('Key * int) array> when 'Key : equality

/// Asynchronously determine if the sequence contains the given value
val contains : value:'T -> source:AsyncSeq<'T> -> Async<bool> when 'T : equality

Expand Down Expand Up @@ -594,6 +610,18 @@ module AsyncSeq =
/// Returns an async sequence which contains no contiguous duplicate elements.
val distinctUntilChanged : source:AsyncSeq<'T> -> AsyncSeq<'T> when 'T : equality

/// Returns an async sequence containing only distinct elements, determined by the given asynchronous key projection.
/// Elements are compared using structural equality on the projected key.
val distinctByAsync : projection:('T -> Async<'Key>) -> source:AsyncSeq<'T> -> AsyncSeq<'T> when 'Key : equality

/// Returns an async sequence containing only distinct elements, determined by the given key projection.
/// Elements are compared using structural equality on the projected key.
val distinctBy : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> AsyncSeq<'T> when 'Key : equality

/// Returns an async sequence containing only distinct elements.
/// Elements are compared using structural equality.
val distinct : source:AsyncSeq<'T> -> AsyncSeq<'T> when 'T : equality

#if FABLE_COMPILER
[<System.Obsolete("Use .GetEnumerator directly") >]
#endif
Expand Down
103 changes: 103 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1964,7 +1964,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 1967 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -1973,7 +1973,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 1976 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -2907,3 +2907,106 @@
} |> Async.RunSynchronously



// ===== distinct / distinctBy / distinctByAsync =====

[<Test>]
let ``AsyncSeq.distinct removes all duplicates`` () =
let source = asyncSeq { yield 1; yield 2; yield 1; yield 3; yield 2 }
let result = AsyncSeq.distinct source |> AsyncSeq.toListSynchronously
Assert.AreEqual([1; 2; 3], result)

[<Test>]
let ``AsyncSeq.distinct empty sequence returns empty`` () =
let result = AsyncSeq.distinct AsyncSeq.empty<int> |> AsyncSeq.toListSynchronously
Assert.AreEqual([], result)

[<Test>]
let ``AsyncSeq.distinct all unique elements returns all`` () =
let source = asyncSeq { yield 1; yield 2; yield 3 }
let result = AsyncSeq.distinct source |> AsyncSeq.toListSynchronously
Assert.AreEqual([1; 2; 3], result)

[<Test>]
let ``AsyncSeq.distinctBy removes duplicates by key`` () =
let source = asyncSeq { yield (1, "a"); yield (2, "b"); yield (1, "c") }
let result = AsyncSeq.distinctBy fst source |> AsyncSeq.toListSynchronously
Assert.AreEqual([(1, "a"); (2, "b")], result)

[<Test>]
let ``AsyncSeq.distinctByAsync removes duplicates by async key`` () =
async {
let source = asyncSeq { yield 1; yield 2; yield 1; yield 3 }
let result = AsyncSeq.distinctByAsync (fun x -> async { return x % 2 }) source |> AsyncSeq.toListSynchronously
Assert.AreEqual([1; 2], result)
} |> Async.RunSynchronously

// ===== countBy / countByAsync =====

[<Test>]
let ``AsyncSeq.countBy counts elements by key`` () =
async {
let source = asyncSeq { yield 1; yield 2; yield 1; yield 3; yield 2; yield 2 }
let result = AsyncSeq.countBy id source |> Async.RunSynchronously
let sorted = result |> Array.sortBy fst
Assert.AreEqual([| (1, 2); (2, 3); (3, 1) |], sorted)
} |> Async.RunSynchronously

[<Test>]
let ``AsyncSeq.countBy empty sequence returns empty array`` () =
async {
let result = AsyncSeq.countBy id AsyncSeq.empty<int> |> Async.RunSynchronously
Assert.AreEqual([||], result)
} |> Async.RunSynchronously

[<Test>]
let ``AsyncSeq.countByAsync counts elements by async key`` () =
async {
let source = asyncSeq { yield 1; yield 2; yield 3; yield 4 }
let result = AsyncSeq.countByAsync (fun x -> async { return x % 2 }) source |> Async.RunSynchronously
let sorted = result |> Array.sortBy fst
Assert.AreEqual([| (0, 2); (1, 2) |], sorted)
} |> Async.RunSynchronously

// ===== exactlyOne / tryExactlyOne =====

[<Test>]
let ``AsyncSeq.exactlyOne returns single element`` () =
async {
let source = asyncSeq { yield 42 }
let result = AsyncSeq.exactlyOne source |> Async.RunSynchronously
Assert.AreEqual(42, result)
} |> Async.RunSynchronously

[<Test>]
let ``AsyncSeq.exactlyOne raises on empty sequence`` () =
Assert.Throws<InvalidOperationException>(fun () ->
AsyncSeq.exactlyOne AsyncSeq.empty<int> |> Async.RunSynchronously |> ignore) |> ignore

[<Test>]
let ``AsyncSeq.exactlyOne raises on sequence with more than one element`` () =
Assert.Throws<InvalidOperationException>(fun () ->
asyncSeq { yield 1; yield 2 } |> AsyncSeq.exactlyOne |> Async.RunSynchronously |> ignore) |> ignore

[<Test>]
let ``AsyncSeq.tryExactlyOne returns Some for single element`` () =
async {
let source = asyncSeq { yield 42 }
let result = AsyncSeq.tryExactlyOne source |> Async.RunSynchronously
Assert.AreEqual(Some 42, result)
} |> Async.RunSynchronously

[<Test>]
let ``AsyncSeq.tryExactlyOne returns None for empty sequence`` () =
async {
let result = AsyncSeq.tryExactlyOne AsyncSeq.empty<int> |> Async.RunSynchronously
Assert.AreEqual(None, result)
} |> Async.RunSynchronously

[<Test>]
let ``AsyncSeq.tryExactlyOne returns None for sequence with more than one element`` () =
async {
let source = asyncSeq { yield 1; yield 2 }
let result = AsyncSeq.tryExactlyOne source |> Async.RunSynchronously
Assert.AreEqual(None, result)
} |> Async.RunSynchronously