Skip to content

Commit

Permalink
update(Unicast): code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Jul 5, 2024
1 parent e18982c commit 8fcc46d
Showing 1 changed file with 84 additions and 110 deletions.
194 changes: 84 additions & 110 deletions unicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,106 +58,6 @@ type unicast[T any] struct {
Observer Observer[T]
}

func (u *unicast[T]) startEmitting(n Notification[T], unlockEarly bool) {
var deferUnlock bool

u.Emitting = true

if unlockEarly {
u.Mu.Unlock()
} else {
deferUnlock = true
defer func() {
if deferUnlock {
u.Mu.Unlock()
}
}()
}

throw := func(err error) {
if !deferUnlock {
u.Mu.Lock()
}
deferUnlock = false
u.Emitting = false
u.LastN = Error[struct{}](err)
u.Buf.Init()
u.Mu.Unlock()
u.Cond.Broadcast()
u.Observer.Error(err)
}

oops := func() { throw(ErrOops) }

o := u.Observer

switch n.Kind {
case KindNext:
select {
default:
case <-u.DoneChan:
throw(context.Cause(u.Context))
return
}

Try1(o, n, oops)

case KindError, KindComplete:
o.Emit(n)
return
}

for first := true; ; first = false {
if !deferUnlock {
u.Mu.Lock()
}

if u.Buf.Len() == 0 {
lastn := u.LastN

deferUnlock = false
u.Emitting = false
u.Mu.Unlock()

switch lastn.Kind {
case KindError:
o.Error(lastn.Error)
case KindComplete:
o.Complete()
}

return
}

if !first && u.Waiters != 0 {
deferUnlock = false
u.Emitting = false
u.Mu.Unlock()
u.Cond.Broadcast()
return
}

var buf queue.Queue[T]

u.Buf, buf = buf, u.Buf

deferUnlock = false
u.Mu.Unlock()
u.Cond.Broadcast()

for i, j := 0, buf.Len(); i < j; i++ {
select {
default:
case <-u.DoneChan:
throw(context.Cause(u.Context))
return
}

Try1(o, Next(buf.At(i)), oops)
}
}
}

func (u *unicast[T]) Emit(n Notification[T]) {
u.Mu.Lock()

Expand Down Expand Up @@ -197,14 +97,7 @@ func (u *unicast[T]) Emit(n Notification[T]) {
}

if !u.Emitting {
if n.Kind == KindNext {
v := u.Buf.Pop()
u.Buf.Push(n.Value)
n.Value = v
}

u.startEmitting(n, false)

u.startEmitting(n)
return
}

Expand All @@ -231,7 +124,7 @@ func (u *unicast[T]) Emit(n Notification[T]) {
return
}

u.startEmitting(n, true)
u.startEmitting(n)
}

func (u *unicast[T]) Subscribe(c Context, o Observer[T]) {
Expand All @@ -258,5 +151,86 @@ func (u *unicast[T]) Subscribe(c Context, o Observer[T]) {
return
}

u.startEmitting(Notification[T]{}, false)
u.startEmitting(Notification[T]{})
}

func (u *unicast[T]) startEmitting(n Notification[T]) {
throw := func(err error) {
u.Mu.Lock()
u.Emitting = false
u.LastN = Error[struct{}](err)
u.Buf.Init()
u.Mu.Unlock()
u.Cond.Broadcast()
u.Observer.Error(err)
}

oops := func() { throw(ErrOops) }

o := u.Observer

u.Emitting = true

for {
var buf queue.Queue[T]

u.Buf, buf = buf, u.Buf

u.Mu.Unlock()
u.Cond.Broadcast()

for i, j := 0, buf.Len(); i < j; i++ {
select {
default:
case <-u.DoneChan:
throw(context.Cause(u.Context))
return
}

Try1(o, Next(buf.At(i)), oops)
}

switch n.Kind {
case 0:
case KindNext:
select {
default:
case <-u.DoneChan:
throw(context.Cause(u.Context))
return
}

Try1(o, n, oops)

n = Notification[T]{}
case KindError, KindComplete:
o.Emit(n)
return
}

u.Mu.Lock()

if u.Buf.Len() == 0 {
lastn := u.LastN

u.Emitting = false
u.Mu.Unlock()

switch lastn.Kind {
case KindError:
o.Error(lastn.Error)
case KindComplete:
o.Complete()
}

return
}

if u.Waiters != 0 {
u.Emitting = false
u.Mu.Unlock()
u.Cond.Broadcast()
return
}
}
}

0 comments on commit 8fcc46d

Please sign in to comment.