Skip to content

Commit

Permalink
update(Multicast): send tracked values to new subscriber first
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Apr 24, 2024
1 parent 242dab7 commit f47d4ab
Showing 1 changed file with 30 additions and 21 deletions.
51 changes: 30 additions & 21 deletions multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,30 +115,17 @@ func (m *multicast[T]) Emit(n Notification[T]) {
func (m *multicast[T]) Subscribe(c Context, o Observer[T]) {
m.Mu.Lock()

lastn := m.LastN
if lastn.Kind == 0 {
c, o = Serialize(c, o)

o := o
m.Mobs.Add(&o)

c.AfterFunc(func() {
m.Mu.Lock()
m.Mobs.Delete(&o)
m.Mu.Unlock()
o.Error(c.Cause())
})
}

buf := m.Buf
if buf != nil {
if buf := m.Buf; buf != nil {
buf.RefCount.Add(1)
defer buf.RefCount.Add(^uint32(0))
}
decrease := true
defer func() {
if decrease {
buf.RefCount.Add(^uint32(0))
}
}()

m.Mu.Unlock()
m.Mu.Unlock()

if buf != nil {
q := buf.Queue
done := c.Done()

Expand All @@ -152,8 +139,30 @@ func (m *multicast[T]) Subscribe(c Context, o Observer[T]) {

Try1(o, Next(q.At(i)), func() { o.Error(ErrOops) })
}

buf.RefCount.Add(^uint32(0))
decrease = false

m.Mu.Lock()
}

lastn := m.LastN
if lastn.Kind == 0 {
c, o = Serialize(c, o)

o := o
m.Mobs.Add(&o)

c.AfterFunc(func() {
m.Mu.Lock()
m.Mobs.Delete(&o)
m.Mu.Unlock()
o.Error(c.Cause())
})
}

m.Mu.Unlock()

switch lastn.Kind {
case KindError:
o.Error(lastn.Error)
Expand Down

0 comments on commit f47d4ab

Please sign in to comment.