Skip to content

Commit

Permalink
update(GroupBy): add GroupByOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Aug 11, 2024
1 parent b7283c1 commit ad6b9b8
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 19 deletions.
2 changes: 1 addition & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
// The following operations may cause concurrent behavior due to [Context]
// cancellation:
// - CombineLatest operators (due to use of [Synchronize]);
// - [Connect] (due to use of [Multicast]);
// - [Connect] and [GroupBy] (due to use of [Multicast]);
// - Merge operators (due to use of [Synchronize]);
// - [Multicast] and other relatives (due to use of [Context.AfterFunc] and
// [Synchronize]);
Expand Down
42 changes: 30 additions & 12 deletions groupby.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,39 @@ package rx
// GroupBy groups the values emitted by the source [Observable] according to
// a specified criterion, and emits these grouped values as Pairs, one [Pair]
// per group.
func GroupBy[T any, K comparable](
keySelector func(v T) K,
groupFactory func() Subject[T],
) Operator[T, Pair[K, Observable[T]]] {
return NewOperator(
func(source Observable[T]) Observable[Pair[K, Observable[T]]] {
return groupByObservable[T, K]{source, keySelector, groupFactory}.Subscribe
func GroupBy[T any, K comparable](keySelector func(v T) K) GroupByOperator[T, K] {
return GroupByOperator[T, K]{
ts: groupByConfig[T, K]{
keySelector: keySelector,
groupSupplier: Multicast[T],
},
)
}
}

type groupByConfig[T any, K comparable] struct {
keySelector func(T) K
groupSupplier func() Subject[T]
}

// GroupByOperator is an [Operator] type for [GroupBy].
type GroupByOperator[T any, K comparable] struct {
ts groupByConfig[T, K]
}

// WithGroupSupplier sets GroupSupplier option to a given value.
func (op GroupByOperator[T, K]) WithGroupSupplier(groupSupplier func() Subject[T]) GroupByOperator[T, K] {
op.ts.groupSupplier = groupSupplier
return op
}

// Apply implements the [Operator] interface.
func (op GroupByOperator[T, K]) Apply(source Observable[T]) Observable[Pair[K, Observable[T]]] {
return groupByObservable[T, K]{source, op.ts}.Subscribe
}

type groupByObservable[T any, K comparable] struct {
source Observable[T]
keySelector func(T) K
groupFactory func() Subject[T]
source Observable[T]
groupByConfig[T, K]
}

func (ob groupByObservable[T, K]) Subscribe(c Context, o Observer[Pair[K, Observable[T]]]) {
Expand All @@ -30,7 +48,7 @@ func (ob groupByObservable[T, K]) Subscribe(c Context, o Observer[Pair[K, Observ
group, exists := groups[key]

if !exists {
g := ob.groupFactory()
g := ob.groupSupplier()
group = g.Observer
groups[key] = group
o.Next(NewPair(key, g.Observable))
Expand Down
9 changes: 3 additions & 6 deletions groupby_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ func TestGroupBy(t *testing.T) {

group := rx.GroupBy(
func(v string) string { return v },
rx.MulticastBufferAll[string],
)
).WithGroupSupplier(rx.MulticastBufferAll[string])

count := rx.ConcatMap(
func(g rx.Pair[string, rx.Observable[string]]) rx.Observable[rx.Pair[string, int]] {
Expand Down Expand Up @@ -56,8 +55,7 @@ func TestGroupBy(t *testing.T) {
source,
rx.GroupBy(
func(v string) string { panic(ErrTest) },
rx.MulticastBufferAll[string],
),
).WithGroupSupplier(rx.MulticastBufferAll[string]),
count,
tostring,
),
Expand All @@ -67,8 +65,7 @@ func TestGroupBy(t *testing.T) {
source,
rx.GroupBy(
func(v string) string { return v },
func() rx.Subject[string] { panic(ErrTest) },
),
).WithGroupSupplier(func() rx.Subject[string] { panic(ErrTest) }),
count,
tostring,
),
Expand Down

0 comments on commit ad6b9b8

Please sign in to comment.