Skip to content

Commit

Permalink
update(Share): add Context option
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Aug 12, 2024
1 parent ad6b9b8 commit 4f19df7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
10 changes: 8 additions & 2 deletions share.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
// [Observable]. When subscribed multiple times, it guarantees that only one
// subscription is made to the source at the same time. When all subscribers
// have unsubscribed it will unsubscribe from the source.
func Share[T any](c Context) ShareOperator[T] {
func Share[T any]() ShareOperator[T] {
return ShareOperator[T]{
ts: shareConfig[T]{
context: c,
context: NewBackgroundContext(),
connector: Multicast[T],
},
}
Expand All @@ -34,6 +34,12 @@ func (op ShareOperator[T]) WithConnector(connector func() Subject[T]) ShareOpera
return op
}

// WithContext sets Context option to a given value.
func (op ShareOperator[T]) WithContext(c Context) ShareOperator[T] {
op.ts.context = c
return op
}

// Apply implements the [Operator] interface.
func (op ShareOperator[T]) Apply(source Observable[T]) Observable[T] {
ob := &shareObservable[T]{
Expand Down
20 changes: 9 additions & 11 deletions share_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ import (
func TestShare(t *testing.T) {
t.Parallel()

ctx := rx.NewBackgroundContext()

NewTestSuite[int](t).Case(
func() rx.Observable[int] {
ob := rx.Pipe3(
rx.Ticker(Step(3)),
rx.Scan(-1, func(i int, _ time.Time) int { return i + 1 }),
rx.Take[int](4),
rx.Share[int](ctx),
rx.Share[int](),
)
return rx.Merge(
ob,
Expand All @@ -34,7 +32,7 @@ func TestShare(t *testing.T) {
ob := rx.Pipe3(
rx.Ticker(Step(3)),
rx.Scan(-1, func(i int, _ time.Time) int { return i + 1 }),
rx.Share[int](ctx),
rx.Share[int](),
rx.Take[int](4),
)
return rx.Merge(
Expand All @@ -51,11 +49,11 @@ func TestShare(t *testing.T) {
rx.Ticker(Step(3)),
rx.Scan(-1, func(i int, _ time.Time) int { return i + 1 }),
rx.Take[int](4),
rx.Share[int](ctx).WithConnector(
rx.Share[int]().WithConnector(
func() rx.Subject[int] {
return rx.MulticastBuffer[int](1)
},
),
).WithContext(rx.NewBackgroundContext()),
)
return rx.Merge(
ob,
Expand All @@ -70,11 +68,11 @@ func TestShare(t *testing.T) {
ob := rx.Pipe3(
rx.Ticker(Step(3)),
rx.Scan(-1, func(i int, _ time.Time) int { return i + 1 }),
rx.Share[int](ctx).WithConnector(
rx.Share[int]().WithConnector(
func() rx.Subject[int] {
return rx.MulticastBuffer[int](1)
},
),
).WithContext(rx.NewBackgroundContext()),
rx.Take[int](4),
)
return rx.Merge(
Expand All @@ -88,7 +86,7 @@ func TestShare(t *testing.T) {
).Case(
rx.Pipe1(
rx.Oops[int]("should not happen"),
rx.Share[int](ctx).WithConnector(
rx.Share[int]().WithConnector(
func() rx.Subject[int] {
return rx.Subject[int]{
Observable: rx.Throw[int](ErrTest),
Expand All @@ -101,7 +99,7 @@ func TestShare(t *testing.T) {
).Case(
rx.Pipe1(
rx.Oops[int]("should not happen"),
rx.Share[int](ctx).WithConnector(
rx.Share[int]().WithConnector(
func() rx.Subject[int] {
return rx.Subject[int]{
Observable: rx.Oops[int](ErrTest),
Expand All @@ -114,7 +112,7 @@ func TestShare(t *testing.T) {
).Case(
rx.Pipe1(
rx.Oops[int]("should not happen"),
rx.Share[int](ctx).WithConnector(func() rx.Subject[int] { panic(ErrTest) }),
rx.Share[int]().WithConnector(func() rx.Subject[int] { panic(ErrTest) }),
),
rx.ErrOops, ErrTest,
)
Expand Down

0 comments on commit 4f19df7

Please sign in to comment.