Skip to content

Commit

Permalink
update(Unicast): do not have one goroutine busy for too long
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Apr 20, 2024
1 parent aedaf14 commit dd4dc9a
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 3 deletions.
55 changes: 52 additions & 3 deletions unicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ func UnicastBuffer[T any](n int) Subject[T] {

type unicast[T any] struct {
Mu sync.Mutex
Cond sync.Cond
Cap int
Waiters int
Emitting bool
LastN Notification[struct{}]
Buf queue.Queue[T]
Expand All @@ -66,6 +68,7 @@ func (u *unicast[T]) startEmitting(n Notification[T]) {
u.LastN = Error[struct{}](err)
u.Buf.Init()
u.Mu.Unlock()
u.Cond.Broadcast()
u.Observer.Error(err)
}

Expand All @@ -89,7 +92,7 @@ func (u *unicast[T]) startEmitting(n Notification[T]) {
return
}

for {
for first := true; ; first = false {
u.Mu.Lock()

if u.Buf.Len() == 0 {
Expand All @@ -108,11 +111,19 @@ func (u *unicast[T]) startEmitting(n Notification[T]) {
return
}

if !first && u.Waiters != 0 {
u.Emitting = false
u.Mu.Unlock()
u.Cond.Signal()
return
}

var buf queue.Queue[T]

u.Buf, buf = buf, u.Buf

u.Mu.Unlock()
u.Cond.Broadcast()

for i, j := 0, buf.Len(); i < j; i++ {
select {
Expand Down Expand Up @@ -156,9 +167,47 @@ func (u *unicast[T]) Emit(n Notification[T]) {
return
}

if u.Emitting {
if u.Emitting || u.Waiters != 0 {
if n.Kind == KindNext {
u.Buf.Push(n.Value)
const bufLimit = 32 // Only up to this number of values can fill into u.Buf.
Again:
for u.Buf.Len() >= max(u.Buf.Cap(), bufLimit) {
if n.Kind == 0 && u.Waiters != 0 {
if !u.Emitting {
u.Cond.Signal()
}

break
}

if !u.Emitting {
if n.Kind == KindNext {
v := u.Buf.Pop()
u.Buf.Push(n.Value)
n.Value = v
}

u.startEmitting(n)

return
}

if u.Cond.L == nil {
u.Cond.L = &u.Mu
}

u.Waiters++
u.Cond.Wait()
u.Waiters--
}

if n.Kind == KindNext && u.LastN.Kind == 0 {
u.Buf.Push(n.Value)
if u.Waiters == 0 {
n = Notification[T]{}
goto Again
}
}
}

u.Mu.Unlock()
Expand Down
42 changes: 42 additions & 0 deletions unicast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rx_test
import (
"context"
"runtime"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -185,4 +186,45 @@ func TestUnicast(t *testing.T) {
t.Fatal("timeout waiting for running finalizer")
}
})

t.Run("Race", func(t *testing.T) {
t.Parallel()

u := rx.UnicastBufferAll[int]()

go func() {
const N = 5

var wg sync.WaitGroup

wg.Add(N)

for range N {
go func() {
defer wg.Done()
for i := 1; i <= 100; i++ {
u.Next(i)
runtime.Gosched()
}
}()
}

wg.Wait()
u.Complete()
}()

tk := time.NewTicker(10 * time.Millisecond)
defer tk.Stop()

NewTestSuite[int](t).Case(
rx.Pipe1(
u.Observable,
rx.Reduce(0, func(a, b int) int {
<-tk.C
return a + b
}),
),
25250, ErrComplete,
)
})
}

0 comments on commit dd4dc9a

Please sign in to comment.