Skip to content

Commit

Permalink
add Stop notification
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Aug 8, 2024
1 parent 86f4ab0 commit 65bf6b2
Show file tree
Hide file tree
Showing 159 changed files with 2,382 additions and 1,354 deletions.
45 changes: 25 additions & 20 deletions audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

// AuditTime ignores source values for a duration, then emits the most recent
// value from the source Observable, then repeats this process.
// value from the source [Observable], then repeats this process.
//
// When it sees a source value, it ignores that plus the next ones for a
// duration, and then it emits the most recent value from the source.
Expand All @@ -16,12 +16,12 @@ func AuditTime[T any](d time.Duration) Operator[T, T] {
return Audit(func(T) Observable[time.Time] { return ob })
}

// Audit ignores source values for a duration determined by another Observable,
// then emits the most recent value from the source Observable, then repeats
// this process.
// Audit ignores source values for a duration determined by another
// [Observable], then emits the most recent value from the source
// [Observable], then repeats this process.
//
// It's like [AuditTime], but the silencing duration is determined by a second
// Observable.
// It's like [AuditTime], but the silencing duration is determined by
// a second [Observable].
func Audit[T, U any](durationSelector func(v T) Observable[U]) Operator[T, T] {
return NewOperator(
func(source Observable[T]) Observable[T] {
Expand Down Expand Up @@ -80,23 +80,28 @@ func (ob auditObservable[T, U]) Subscribe(c Context, o Observer[T]) {

Try1(o, Next(value), func() {
if x.context.Swap(sentinel) != sentinel {
o.Error(ErrOops)
o.Stop(ErrOops)
}
})

if x.context.CompareAndSwap(w.Context, c.Context) && x.complete.Load() && x.context.CompareAndSwap(c.Context, sentinel) {
o.Complete()
}

case KindError:
if x.context.Swap(sentinel) != sentinel {
o.Error(n.Error)
}

case KindComplete:
if x.context.CompareAndSwap(w.Context, c.Context) && x.complete.Load() && x.context.CompareAndSwap(c.Context, sentinel) {
o.Complete()
}

case KindError, KindStop:
if x.context.Swap(sentinel) != sentinel {
switch n.Kind {
case KindError:
o.Error(n.Error)
case KindStop:
o.Stop(n.Error)
}
}
}
})
}
Expand All @@ -112,7 +117,14 @@ func (ob auditObservable[T, U]) Subscribe(c Context, o Observer[T]) {
startWorker(n.Value)
}

case KindError:
case KindComplete:
x.complete.Store(true)

if x.context.CompareAndSwap(c.Context, sentinel) {
o.Emit(n)
}

case KindError, KindStop:
old := x.context.Swap(sentinel)

cancel()
Expand All @@ -121,13 +133,6 @@ func (ob auditObservable[T, U]) Subscribe(c Context, o Observer[T]) {
if old != sentinel {
o.Emit(n)
}

case KindComplete:
x.complete.Store(true)

if x.context.CompareAndSwap(c.Context, sentinel) {
o.Emit(n)
}
}
})
}
16 changes: 10 additions & 6 deletions audit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,16 @@ func TestAudit(t *testing.T) {
),
ErrTest,
).Case(
rx.Pipe1(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
DelaySubscription[string](1),
rx.Audit(
func(string) rx.Observable[int] {
return rx.Pipe1(
rx.Throw[int](ErrTest),
DelaySubscription[int](1),
)
return rx.Oops[int](ErrTest)
},
),
),
ErrTest,
rx.ErrOops, ErrTest,
).Case(
rx.Pipe2(
rx.Just("A", "B", "C", "D", "E"),
Expand All @@ -90,6 +88,12 @@ func TestAudit(t *testing.T) {
rx.AuditTime[string](Step(3)),
),
ErrTest,
).Case(
rx.Pipe1(
rx.Oops[string](ErrTest),
rx.AuditTime[string](Step(3)),
),
rx.ErrOops, ErrTest,
).Case(
rx.Pipe1(
rx.Just("A", "B", "C", "D", "E"),
Expand Down
44 changes: 21 additions & 23 deletions backpressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package rx

import "github.com/b97tsk/rx/internal/queue"

// OnBackpressureBuffer mirrors the source Observable, buffering emissions
// OnBackpressureBuffer mirrors the source [Observable], buffering emissions
// if the source emits too fast, and terminating the subscription with
// a notification of ErrBufferOverflow if the buffer is full.
// an [Error] notification of [ErrBufferOverflow] if the buffer is full.
func OnBackpressureBuffer[T any](capacity int) Operator[T, T] {
return Channelize(
func(upstream <-chan Notification[T], downstream chan<- Notification[T]) {
Expand Down Expand Up @@ -37,16 +37,16 @@ func OnBackpressureBuffer[T any](capacity int) Operator[T, T] {
}

buf.Push(n)
case KindError:
downstream <- n
return
case KindComplete:
complete = true

if buf.Len() == 0 {
downstream <- n
return
}
case KindError, KindStop:
downstream <- n
return
}
case out <- outv:
buf.Pop()
Expand All @@ -61,9 +61,8 @@ func OnBackpressureBuffer[T any](capacity int) Operator[T, T] {
)
}

// OnBackpressureCongest mirrors the source Observable, buffering emissions
// if the source emits too fast, and blocking the source if the buffer is
// full.
// OnBackpressureCongest mirrors the source [Observable], buffering emissions
// if the source emits too fast, and blocking the source if the buffer is full.
func OnBackpressureCongest[T any](capacity int) Operator[T, T] {
return Channelize(
func(upstream <-chan Notification[T], downstream chan<- Notification[T]) {
Expand Down Expand Up @@ -97,16 +96,16 @@ func OnBackpressureCongest[T any](capacity int) Operator[T, T] {
switch n.Kind {
case KindNext:
buf.Push(n)
case KindError:
downstream <- n
return
case KindComplete:
complete = true

if buf.Len() == 0 {
downstream <- n
return
}
case KindError, KindStop:
downstream <- n
return
}
case out <- outv:
buf.Pop()
Expand All @@ -121,9 +120,8 @@ func OnBackpressureCongest[T any](capacity int) Operator[T, T] {
)
}

// OnBackpressureDrop mirrors the source Observable, buffering emissions
// if the source emits too fast, and dropping emissions if the buffer is
// full.
// OnBackpressureDrop mirrors the source [Observable], buffering emissions
// if the source emits too fast, and dropping emissions if the buffer is full.
func OnBackpressureDrop[T any](capacity int) Operator[T, T] {
return Channelize(
func(upstream <-chan Notification[T], downstream chan<- Notification[T]) {
Expand Down Expand Up @@ -153,16 +151,16 @@ func OnBackpressureDrop[T any](capacity int) Operator[T, T] {
if buf.Len() < capacity {
buf.Push(n)
}
case KindError:
downstream <- n
return
case KindComplete:
complete = true

if buf.Len() == 0 {
downstream <- n
return
}
case KindError, KindStop:
downstream <- n
return
}
case out <- outv:
buf.Pop()
Expand All @@ -177,9 +175,9 @@ func OnBackpressureDrop[T any](capacity int) Operator[T, T] {
)
}

// OnBackpressureLatest mirrors the source Observable, buffering emissions
// if the source emits too fast, and dropping oldest emissions from
// the buffer if it is full.
// OnBackpressureLatest mirrors the source [Observable], buffering emissions
// if the source emits too fast, and dropping oldest emissions from the buffer
// if it is full.
func OnBackpressureLatest[T any](capacity int) Operator[T, T] {
return Channelize(
func(upstream <-chan Notification[T], downstream chan<- Notification[T]) {
Expand Down Expand Up @@ -211,16 +209,16 @@ func OnBackpressureLatest[T any](capacity int) Operator[T, T] {
}

buf.Push(n)
case KindError:
downstream <- n
return
case KindComplete:
complete = true

if buf.Len() == 0 {
downstream <- n
return
}
case KindError, KindStop:
downstream <- n
return
}
case out <- outv:
buf.Pop()
Expand Down
24 changes: 24 additions & 0 deletions backpressure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ func TestOnBackpressure(t *testing.T) {
rx.OnBackpressureBuffer[int](3),
),
ErrTest,
).Case(
rx.Pipe1(
rx.Oops[int](ErrTest),
rx.OnBackpressureBuffer[int](3),
),
rx.ErrOops, ErrTest,
).Case(
rx.Pipe1(
rx.Empty[int](),
Expand Down Expand Up @@ -87,6 +93,12 @@ func TestOnBackpressure(t *testing.T) {
rx.OnBackpressureCongest[int](3),
),
ErrTest,
).Case(
rx.Pipe1(
rx.Oops[int](ErrTest),
rx.OnBackpressureCongest[int](3),
),
rx.ErrOops, ErrTest,
).Case(
rx.Pipe1(
rx.Empty[int](),
Expand Down Expand Up @@ -127,6 +139,12 @@ func TestOnBackpressure(t *testing.T) {
rx.OnBackpressureDrop[int](3),
),
ErrTest,
).Case(
rx.Pipe1(
rx.Oops[int](ErrTest),
rx.OnBackpressureDrop[int](3),
),
rx.ErrOops, ErrTest,
).Case(
rx.Pipe1(
rx.Empty[int](),
Expand Down Expand Up @@ -167,6 +185,12 @@ func TestOnBackpressure(t *testing.T) {
rx.OnBackpressureLatest[int](3),
),
ErrTest,
).Case(
rx.Pipe1(
rx.Oops[int](ErrTest),
rx.OnBackpressureLatest[int](3),
),
rx.ErrOops, ErrTest,
).Case(
rx.Pipe1(
rx.Empty[int](),
Expand Down
Loading

0 comments on commit 65bf6b2

Please sign in to comment.