Skip to content

Commit

Permalink
Revert "update(*): use c.Cause() rather than c.Err()"
Browse files Browse the repository at this point in the history
This reverts commit 4071f7f.
  • Loading branch information
b97tsk committed Jun 28, 2024
1 parent 343c9b6 commit a07b7c3
Show file tree
Hide file tree
Showing 20 changed files with 42 additions and 37 deletions.
16 changes: 8 additions & 8 deletions blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package rx
// the error.
//
// The cancellation of parent will cause BlockingFirst to immediately return
// the zero value of T and parent.Cause().
// the zero value of T and parent.Err().
func (ob Observable[T]) BlockingFirst(parent Context) (v T, err error) {
res := Error[T](ErrEmpty)
c, cancel := parent.WithCancel()
Expand Down Expand Up @@ -36,7 +36,7 @@ func (ob Observable[T]) BlockingFirst(parent Context) (v T, err error) {
select {
default:
case <-parent.Done():
return v, parent.Cause()
return v, parent.Err()
}

switch res.Kind {
Expand Down Expand Up @@ -69,7 +69,7 @@ func (ob Observable[T]) BlockingFirstOrElse(parent Context, def T) T {
// the error.
//
// The cancellation of parent will cause BlockingLast to immediately return
// the zero value of T and parent.Cause().
// the zero value of T and parent.Err().
func (ob Observable[T]) BlockingLast(parent Context) (v T, err error) {
res := Error[T](ErrEmpty)
c, cancel := parent.WithCancel()
Expand All @@ -91,7 +91,7 @@ func (ob Observable[T]) BlockingLast(parent Context) (v T, err error) {
select {
default:
case <-parent.Done():
return v, parent.Cause()
return v, parent.Err()
}

switch res.Kind {
Expand Down Expand Up @@ -124,7 +124,7 @@ func (ob Observable[T]) BlockingLastOrElse(parent Context, def T) T {
// error, it returns the zero value of T and the error.
//
// The cancellation of parent will cause BlockingSingle to immediately return
// the zero value of T and parent.Cause().
// the zero value of T and parent.Err().
func (ob Observable[T]) BlockingSingle(parent Context) (v T, err error) {
res := Error[T](ErrEmpty)
c, cancel := parent.WithCancel()
Expand Down Expand Up @@ -159,7 +159,7 @@ func (ob Observable[T]) BlockingSingle(parent Context) (v T, err error) {
select {
default:
case <-parent.Done():
return v, parent.Cause()
return v, parent.Err()
}

switch res.Kind {
Expand All @@ -177,7 +177,7 @@ func (ob Observable[T]) BlockingSingle(parent Context) (v T, err error) {
// otherwise, it returns the emitted error.
//
// The cancellation of parent will cause BlockingSubscribe to immediately
// return parent.Cause().
// return parent.Err().
func (ob Observable[T]) BlockingSubscribe(parent Context, o Observer[T]) error {
var res Notification[T]

Expand All @@ -197,7 +197,7 @@ func (ob Observable[T]) BlockingSubscribe(parent Context, o Observer[T]) error {
select {
default:
case <-parent.Done():
return parent.Cause()
return parent.Err()
}

switch res.Kind {
Expand Down
6 changes: 3 additions & 3 deletions catch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func Catch[T any](selector func(err error) Observable[T]) Operator[T, T] {
select {
default:
case <-c.Done():
o.Error(c.Cause())
o.Error(c.Err())
return
}

Expand Down Expand Up @@ -50,7 +50,7 @@ func OnErrorResumeWith[T any](ob Observable[T]) Operator[T, T] {
select {
default:
case <-c.Done():
o.Error(c.Cause())
o.Error(c.Err())
return
}

Expand Down Expand Up @@ -82,7 +82,7 @@ func OnErrorComplete[T any]() Operator[T, T] {
select {
default:
case <-c.Done():
o.Error(c.Cause())
o.Error(c.Err())
return
}

Expand Down
4 changes: 2 additions & 2 deletions concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (ob concatWithObservable[T]) Subscribe(c Context, o Observer[T]) {
select {
default:
case <-done:
o.Error(c.Cause())
o.Error(c.Err())
return
}

Expand Down Expand Up @@ -209,7 +209,7 @@ func (ob concatMapObservable[T, R]) SubscribeWithBuffering(c Context, o Observer
select {
default:
case <-w.Done():
n = Error[R](w.Cause())
n = Error[R](w.Err())
}
}

Expand Down
5 changes: 5 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func (c Context) Done() <-chan struct{} {
return c.Context.Done()
}

// Err returns c.Context.Err().
func (c Context) Err() error {
return c.Context.Err()
}

// Wait runs c.WaitGroup.Wait().
// If c.WaitGroup is not set, Wait panics.
func (c Context) Wait() {
Expand Down
2 changes: 1 addition & 1 deletion delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (ob delayObservable[T]) Subscribe(c Context, o Observer[T]) {
x.Queue.Unlock()

if old != sentinel {
o.Error(w.Cause())
o.Error(w.Err())
}

return
Expand Down
4 changes: 2 additions & 2 deletions empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ func Empty[T any]() Observable[T] {

// Never returns an Observable that never emits anything, except
// when a context cancellation is detected, emits an error notification
// of whatever [Context.Cause] returns.
// of whatever that context reports.
func Never[T any]() Observable[T] {
return func(c Context, o Observer[T]) {
if c.Done() != nil {
c.AfterFunc(func() {
o.Error(c.Cause())
o.Error(c.Err())
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion ifempty.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func DefaultIfEmpty[T any](s ...T) Operator[T, T] {
select {
default:
case <-done:
o.Error(c.Cause())
o.Error(c.Err())
return
}

Expand Down
4 changes: 2 additions & 2 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (ob mergeWithObservable[T]) Subscribe(c Context, o Observer[T]) {
select {
default:
case <-done:
o.Error(c.Cause())
o.Error(c.Err())
return
}
}
Expand All @@ -65,7 +65,7 @@ func (ob mergeWithObservable[T]) Subscribe(c Context, o Observer[T]) {
select {
default:
case <-done:
o.Error(c.Cause())
o.Error(c.Err())
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (m *multicast[T]) Subscribe(c Context, o Observer[T]) {
select {
default:
case <-done:
o.Error(c.Cause())
o.Error(c.Err())
return
}

Expand All @@ -157,7 +157,7 @@ func (m *multicast[T]) Subscribe(c Context, o Observer[T]) {
m.Mu.Lock()
m.Mobs.Delete(&o)
m.Mu.Unlock()
o.Error(c.Cause())
o.Error(c.Err())
})
}

Expand Down
4 changes: 2 additions & 2 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package rx
//
// An Observable must honor the cancellation of the given [Context].
// When the cancellation of the given [Context] is detected, an Observable
// must emit an error notification of whatever [Context.Cause] returns, as
// a termination, to the given [Observer] as soon as possible.
// must emit a notification of error (as a termination) to the given [Observer]
// as soon as possible.
//
// If an Observable needs to start goroutines, it must use [Context.Go] to do
// so; if an Observable needs to start an asynchronous operation other than
Expand Down
2 changes: 1 addition & 1 deletion pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func FromMap[M ~map[K]V, K comparable, V any](m M) Observable[Pair[K, V]] {
select {
default:
case <-done:
o.Error(c.Cause())
o.Error(c.Err())
return
}

Expand Down
4 changes: 2 additions & 2 deletions range.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func Range[T constraints.Integer](low, high T) Observable[T] {
select {
default:
case <-done:
o.Error(c.Cause())
o.Error(c.Err())
return
}

Expand All @@ -33,7 +33,7 @@ func Iota[T constraints.Integer](init T) Observable[T] {
select {
default:
case <-done:
o.Error(c.Cause())
o.Error(c.Err())
return
}

Expand Down
2 changes: 1 addition & 1 deletion repeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (ob repeatObservable[T]) Subscribe(c Context, o Observer[T]) {
select {
default:
case <-done:
o.Error(c.Cause())
o.Error(c.Err())
return
}

Expand Down
2 changes: 1 addition & 1 deletion retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (ob retryObservable[T]) Subscribe(c Context, o Observer[T]) {
select {
default:
case <-done:
o.Error(c.Cause())
o.Error(c.Err())
return
}

Expand Down
2 changes: 1 addition & 1 deletion skipuntil.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (ob skipUntilObservable[T, U]) Subscribe(c Context, o Observer[T]) {
select {
default:
case <-c.Done():
terminate(Error[T](c.Cause()))
terminate(Error[T](c.Err()))
return
}

Expand Down
2 changes: 1 addition & 1 deletion slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func FromSlice[S ~[]T, T any](s S) Observable[T] {
select {
default:
case <-done:
o.Error(c.Cause())
o.Error(c.Err())
return
}

Expand Down
2 changes: 1 addition & 1 deletion takelast.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TakeLast[T any](count int) Operator[T, T] {
select {
default:
case <-done:
o.Error(c.Cause())
o.Error(c.Err())
return
}

Expand Down
2 changes: 1 addition & 1 deletion takeuntil.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (ob takeUntilObservable[T, U]) Subscribe(c Context, o Observer[T]) {
select {
default:
case <-c.Done():
terminate(Error[T](c.Cause()))
terminate(Error[T](c.Err()))
return
}

Expand Down
4 changes: 2 additions & 2 deletions timing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func Ticker(d time.Duration) Observable[time.Time] {
for {
select {
case <-done:
o.Error(c.Cause())
o.Error(c.Err())
return
case t := <-tk.C:
Try1(o, Next(t), func() { o.Error(ErrOops) })
Expand All @@ -36,7 +36,7 @@ func Timer(d time.Duration) Observable[time.Time] {
select {
case <-c.Done():
tm.Stop()
o.Error(c.Cause())
o.Error(c.Err())
case t := <-tm.C:
Try1(o, Next(t), func() { o.Error(ErrOops) })
o.Complete()
Expand Down
6 changes: 3 additions & 3 deletions unicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (u *unicast[T]) startEmitting(n Notification[T], unlockEarly bool) {
select {
default:
case <-u.DoneChan:
throw(context.Cause(u.Context))
throw(u.Context.Err())
return
}

Expand Down Expand Up @@ -149,7 +149,7 @@ func (u *unicast[T]) startEmitting(n Notification[T], unlockEarly bool) {
select {
default:
case <-u.DoneChan:
throw(context.Cause(u.Context))
throw(u.Context.Err())
return
}

Expand Down Expand Up @@ -245,7 +245,7 @@ func (u *unicast[T]) Subscribe(c Context, o Observer[T]) {

done := c.Done()
if done != nil {
stop := c.AfterFunc(func() { u.Emit(Error[T](c.Cause())) })
stop := c.AfterFunc(func() { u.Emit(Error[T](c.Err())) })
o = o.DoOnTermination(func() { stop() })
}

Expand Down

0 comments on commit a07b7c3

Please sign in to comment.