Skip to content

Commit

Permalink
Revert "update(SkipUntil,TakeUntil): code refactoring"
Browse files Browse the repository at this point in the history
This reverts commit 8927cb0.
  • Loading branch information
b97tsk committed Jun 27, 2024
1 parent a4e5ac0 commit 343c9b6
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 25 deletions.
30 changes: 24 additions & 6 deletions skipuntil.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type skipUntilObservable[T, U any] struct {
}

func (ob skipUntilObservable[T, U]) Subscribe(c Context, o Observer[T]) {
c, cancel := c.WithCancelCause()
o = o.DoOnTermination(func() { cancel(nil) })
c, cancel := c.WithCancel()
o = o.DoOnTermination(cancel)

var x struct {
Context atomic.Value
Expand Down Expand Up @@ -47,20 +47,38 @@ func (ob skipUntilObservable[T, U]) Subscribe(c Context, o Observer[T]) {
switch n.Kind {
case KindNext:
x.Context.CompareAndSwap(w.Context, c.Context)

case KindError:
cancel(n.Error)
if x.Context.CompareAndSwap(w.Context, sentinel) {
o.Error(n.Error)
}

case KindComplete:
return
}
},
func() { o.Error(ErrOops) },
func() {
if x.Context.Swap(sentinel) != sentinel {
o.Error(ErrOops)
}
},
)
}

terminate := func(n Notification[T]) {
old := x.Context.Swap(sentinel)

cancel()

if old != sentinel {
o.Emit(n)
}
}

select {
default:
case <-c.Done():
o.Error(c.Cause())
terminate(Error[T](c.Cause()))
return
}

Expand All @@ -76,7 +94,7 @@ func (ob skipUntilObservable[T, U]) Subscribe(c Context, o Observer[T]) {
o.Emit(n)
}
case KindError, KindComplete:
o.Emit(n)
terminate(n)
}
})
}
58 changes: 39 additions & 19 deletions takeuntil.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package rx

import (
"context"
"errors"
"sync"
"sync/atomic"
)

Expand All @@ -22,17 +21,19 @@ type takeUntilObservable[T, U any] struct {
}

func (ob takeUntilObservable[T, U]) Subscribe(c Context, o Observer[T]) {
c, cancel := c.WithCancelCause()
o = o.DoOnTermination(func() { cancel(nil) })
c, cancel := c.WithCancel()
o = o.DoOnTermination(cancel)

var x struct {
Context atomic.Value
Source struct {
sync.Mutex
sync.WaitGroup
}
}

x.Context.Store(c.Context)

complete := errors.Join(context.Canceled)

{
w, cancelw := c.WithCancel()

Expand All @@ -51,29 +52,48 @@ func (ob takeUntilObservable[T, U]) Subscribe(c Context, o Observer[T]) {
cancelw()

switch n.Kind {
case KindNext:
if x.Context.CompareAndSwap(c.Context, w.Context) {
cancel(complete)
}
case KindError:
if x.Context.CompareAndSwap(c.Context, w.Context) {
cancel(n.Error)
case KindNext, KindError:
if x.Context.Swap(sentinel) != sentinel {
cancel()

x.Source.Lock()
x.Source.Wait()
x.Source.Unlock()

switch n.Kind {
case KindNext:
o.Complete()
case KindError:
o.Error(n.Error)
}
}

case KindComplete:
return
}
},
func() { o.Error(ErrOops) },
func() {
if x.Context.Swap(sentinel) != sentinel {
o.Error(ErrOops)
}
},
)
}

x.Source.Lock()
x.Source.Add(1)
x.Source.Unlock()

terminate := func(n Notification[T]) {
if n.Kind == KindError && errors.Is(n.Error, complete) {
o.Complete()
return
}
defer x.Source.Done()

old := x.Context.Swap(sentinel)

o.Emit(n)
cancel()

if old != sentinel {
o.Emit(n)
}
}

select {
Expand Down

0 comments on commit 343c9b6

Please sign in to comment.