Skip to content

Commit

Permalink
update(Unicast): preserve buffers for later reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Aug 9, 2024
1 parent 65bf6b2 commit 52938f9
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 18 deletions.
33 changes: 20 additions & 13 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ func (q *Queue[E]) Init() {
q.head, q.tail = nil, nil
}

// Clear clears q, preserving existing buffer.
func (q *Queue[E]) Clear() {
clear(q.head)
clear(q.tail)
s := q.tail[:0]
q.head, q.tail = s, s
}

// Cap returns the capacity of the internal buffer. If Cap() equals to Len(),
// new Push(x) causes the internal buffer to grow.
func (q *Queue[E]) Cap() int {
Expand All @@ -27,9 +35,8 @@ func (q *Queue[E]) Len() int {
// Push inserts an element at the end of q.
func (q *Queue[E]) Push(x E) {
if q.Len() == q.Cap() { // Grow if full.
buf := append(append(q.head, q.tail...), x)
q.head, q.tail = buf, buf[:0]

s := append(append(q.head, q.tail...), x)
q.head, q.tail = s, s[:0]
return
}

Expand All @@ -42,7 +49,7 @@ func (q *Queue[E]) Push(x E) {

// Pop removes and returns the first element. It panics if q is empty.
func (q *Queue[E]) Pop() E {
if len(q.head) > 0 {
if len(q.head) != 0 {
x := q.head[0]

var zero E
Expand All @@ -55,8 +62,8 @@ func (q *Queue[E]) Pop() E {
}

if n, m := q.Len(), q.Cap(); n == m>>2 && m > smallSize { // Shrink if sparse.
buf := make([]E, n<<1)
q.head, q.tail = buf[:q.CopyTo(buf)], buf[:0]
s := make([]E, n<<1)
q.head, q.tail = s[:q.CopyTo(s)], s[:0]
}

return x
Expand Down Expand Up @@ -86,7 +93,7 @@ func (q *Queue[E]) At(i int) E {

// Front returns the first element. It panics if q is empty.
func (q *Queue[E]) Front() E {
if len(q.head) > 0 {
if len(q.head) != 0 {
return q.head[0]
}

Expand All @@ -95,11 +102,11 @@ func (q *Queue[E]) Front() E {

// Back returns the last element. It panics if q is empty.
func (q *Queue[E]) Back() E {
if n := len(q.tail); n > 0 {
if n := len(q.tail); n != 0 {
return q.tail[n-1]
}

if n := len(q.head); n > 0 {
if n := len(q.head); n != 0 {
return q.head[n-1]
}

Expand All @@ -116,12 +123,12 @@ func (q *Queue[E]) CopyTo(dst []E) int {

// Clone clones q.
func (q *Queue[E]) Clone() Queue[E] {
var buf []E
var s []E

if q.head != nil {
buf = make([]E, q.Len(), q.Cap())
q.CopyTo(buf)
s = make([]E, q.Len(), q.Cap())
q.CopyTo(s)
}

return Queue[E]{buf, buf[:0]}
return Queue[E]{s, s[:0]}
}
6 changes: 4 additions & 2 deletions internal/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ func TestQueue(t *testing.T) {

var q queue.Queue[string]

q.Init()

t.Logf("Len=%v Cap=%v", q.Len(), q.Cap())

if q.Len() != 0 {
Expand Down Expand Up @@ -67,10 +69,10 @@ func TestQueue(t *testing.T) {
t.FailNow()
}

cloned.Init()
cloned.Clear()

if cloned.Len() != 0 {
t.Logf("Init(): Len=%v Cap=%v", cloned.Len(), cloned.Cap())
t.Logf("Clear(): Len=%v Cap=%v", cloned.Len(), cloned.Cap())
t.FailNow()
}

Expand Down
17 changes: 14 additions & 3 deletions unicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ type unicast[T any] struct {
waiters int
emitting bool
lastn Notification[struct{}]
buf queue.Queue[T]
buf, alt queue.Queue[T]
context context.Context
done <-chan struct{}
observer Observer[T]
}

const bufLimit = 32

func (u *unicast[T]) Emit(n Notification[T]) {
u.mu.Lock()

Expand Down Expand Up @@ -96,7 +98,6 @@ func (u *unicast[T]) Emit(n Notification[T]) {
}

if n.Kind == KindNext {
const bufLimit = 32 // Only up to this number of values can fill into u.Buf.
Again:
for u.buf.Len() >= max(u.buf.Cap(), bufLimit) {
if n.Kind == 0 && u.waiters != 0 {
Expand Down Expand Up @@ -162,6 +163,7 @@ func (u *unicast[T]) startEmitting(n Notification[T]) {
u.emitting = false
u.lastn = Stop[struct{}](err)
u.buf.Init()
u.alt.Init()
u.mu.Unlock()
u.co.Broadcast()
u.observer.Stop(err)
Expand All @@ -176,7 +178,7 @@ func (u *unicast[T]) startEmitting(n Notification[T]) {
for {
var buf queue.Queue[T]

u.buf, buf = buf, u.buf
buf, u.buf, u.alt = u.buf, u.alt, buf

u.mu.Unlock()
u.co.Broadcast()
Expand Down Expand Up @@ -212,6 +214,15 @@ func (u *unicast[T]) startEmitting(n Notification[T]) {

u.mu.Lock()

if n := buf.Cap(); n != 0 && n <= bufLimit {
buf.Clear()
if u.buf.Cap() == 0 {
u.buf = buf
} else {
u.alt = buf
}
}

if u.buf.Len() == 0 {
lastn := u.lastn

Expand Down

0 comments on commit 52938f9

Please sign in to comment.