Skip to content

Commit

Permalink
update(Unicast): do not unlock mutex early in some cases
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Apr 23, 2024
1 parent 31d1ce2 commit 242dab7
Showing 1 changed file with 27 additions and 7 deletions.
34 changes: 27 additions & 7 deletions unicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,27 @@ type unicast[T any] struct {
Observer Observer[T]
}

func (u *unicast[T]) startEmitting(n Notification[T]) {
func (u *unicast[T]) startEmitting(n Notification[T], unlockEarly bool) {
var deferUnlock bool

u.Emitting = true
u.Mu.Unlock()

if unlockEarly {
u.Mu.Unlock()
} else {
deferUnlock = true
defer func() {
if deferUnlock {
u.Mu.Unlock()
}
}()
}

throw := func(err error) {
u.Mu.Lock()
if !deferUnlock {
u.Mu.Lock()
}
deferUnlock = false
u.Emitting = false
u.LastN = Error[struct{}](err)
u.Buf.Init()
Expand Down Expand Up @@ -93,11 +108,14 @@ func (u *unicast[T]) startEmitting(n Notification[T]) {
}

for first := true; ; first = false {
u.Mu.Lock()
if !deferUnlock {
u.Mu.Lock()
}

if u.Buf.Len() == 0 {
lastn := u.LastN

deferUnlock = false
u.Emitting = false
u.Mu.Unlock()

Expand All @@ -112,6 +130,7 @@ func (u *unicast[T]) startEmitting(n Notification[T]) {
}

if !first && u.Waiters != 0 {
deferUnlock = false
u.Emitting = false
u.Mu.Unlock()
u.Cond.Broadcast()
Expand All @@ -122,6 +141,7 @@ func (u *unicast[T]) startEmitting(n Notification[T]) {

u.Buf, buf = buf, u.Buf

deferUnlock = false
u.Mu.Unlock()
u.Cond.Broadcast()

Expand Down Expand Up @@ -183,7 +203,7 @@ func (u *unicast[T]) Emit(n Notification[T]) {
n.Value = v
}

u.startEmitting(n)
u.startEmitting(n, false)

return
}
Expand Down Expand Up @@ -211,7 +231,7 @@ func (u *unicast[T]) Emit(n Notification[T]) {
return
}

u.startEmitting(n)
u.startEmitting(n, true)
}

func (u *unicast[T]) Subscribe(c Context, o Observer[T]) {
Expand All @@ -238,5 +258,5 @@ func (u *unicast[T]) Subscribe(c Context, o Observer[T]) {
return
}

u.startEmitting(Notification[T]{})
u.startEmitting(Notification[T]{}, false)
}

0 comments on commit 242dab7

Please sign in to comment.