Skip to content

Commit

Permalink
update(Timeout): code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Jun 26, 2024
1 parent e08bf48 commit a4e5ac0
Showing 1 changed file with 41 additions and 23 deletions.
64 changes: 41 additions & 23 deletions timeout.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package rx

import (
"context"
"errors"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -51,39 +50,58 @@ type timeoutObservable[T any] struct {
}

func (ob timeoutObservable[T]) Subscribe(parent Context, o Observer[T]) {
c, cancel := parent.WithCancelCause()
o = o.DoOnTermination(func() { cancel(nil) })
c, cancel := parent.WithCancel()

timeout := errors.Join(context.Canceled)
tm := time.AfterFunc(ob.First, func() { cancel(timeout) })
var x struct {
Context atomic.Value
}

x.Context.Store(c.Context)

if c.WaitGroup != nil {
c.WaitGroup.Add(1)
}

tm := time.AfterFunc(ob.First, func() {
if c.WaitGroup != nil {
defer c.WaitGroup.Done()
}

if x.Context.Swap(sentinel) != sentinel {
cancel()

if ob.With != nil {
ob.With.Subscribe(parent, o)
return
}

o.Error(ErrTimeout)
}
})

ob.Source.Subscribe(c, func(n Notification[T]) {
switch n.Kind {
case KindNext:
if tm.Stop() {
o.Emit(n)
Try1(o, n, func() {
if c.WaitGroup != nil {
c.WaitGroup.Done()
}
})
tm.Reset(ob.Each)
}

case KindError:
tm.Stop()

if errors.Is(n.Error, timeout) {
if ob.With != nil {
ob.With.Subscribe(parent, o)
return
case KindError, KindComplete:
if tm.Stop() {
if c.WaitGroup != nil {
c.WaitGroup.Done()
}

o.Error(ErrTimeout)

return
}

o.Emit(n)

case KindComplete:
tm.Stop()
o.Emit(n)
if x.Context.Swap(sentinel) != sentinel {
cancel()
o.Emit(n)
}
}
})
}

0 comments on commit a4e5ac0

Please sign in to comment.