Skip to content

Commit

Permalink
update(CombineLatest*,WithLatestFrom*,ZipWithBuffering*): code refact…
Browse files Browse the repository at this point in the history
…oring
  • Loading branch information
b97tsk committed Jun 25, 2024
1 parent f47d4ab commit e08bf48
Show file tree
Hide file tree
Showing 26 changed files with 714 additions and 1,206 deletions.
62 changes: 27 additions & 35 deletions combinelatest2.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package rx

import "sync"

// CombineLatest2 combines multiple Observables to create an Observable
// that emits mappings of the latest values emitted by each of its input
// Observables.
Expand All @@ -9,38 +11,19 @@ func CombineLatest2[T1, T2, R any](
mapping func(v1 T1, v2 T2) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, cancel := c.WithCancel()
noop := make(chan struct{})
o = o.DoOnTermination(func() {
cancel()
close(noop)
})

chan1 := make(chan Notification[T1])
chan2 := make(chan Notification[T2])

c.Go(func() {
var s combineLatestState2[T1, T2]

cont := true
c, o = Serialize(c, o)

for cont {
select {
case n := <-chan1:
cont = combineLatestEmit2(o, n, mapping, &s, &s.V1, 1)
case n := <-chan2:
cont = combineLatestEmit2(o, n, mapping, &s, &s.V2, 2)
}
}
})
var s combineLatestState2[T1, T2]

_ = true &&
subscribeChannel(c, ob1, chan1, noop) &&
subscribeChannel(c, ob2, chan2, noop)
ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit2(o, n, mapping, &s, &s.V1, 1) }) &&
ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit2(o, n, mapping, &s, &s.V2, 2) })
}
}

type combineLatestState2[T1, T2 any] struct {
sync.Mutex

NBits, CBits uint8

V1 T1
Expand All @@ -54,29 +37,38 @@ func combineLatestEmit2[T1, T2, R, X any](
s *combineLatestState2[T1, T2],
v *X,
bit uint8,
) bool {
) {
const FullBits = 3

switch n.Kind {
case KindNext:
s.Lock()
*v = n.Value
nbits := s.NBits
nbits |= bit
s.NBits = nbits

if s.NBits |= bit; s.NBits == FullBits {
oops := func() { o.Error(ErrOops) }
v := Try21(mapping, s.V1, s.V2, oops)
Try1(o, Next(v), oops)
if nbits == FullBits {
v := Try21(mapping, s.V1, s.V2, s.Unlock)
s.Unlock()
o.Next(v)
return
}

s.Unlock()

case KindError:
o.Error(n.Error)
return false

case KindComplete:
if s.CBits |= bit; s.CBits == FullBits {
s.Lock()
cbits := s.CBits
cbits |= bit
s.CBits = cbits
s.Unlock()

if cbits == FullBits {
o.Complete()
return false
}
}

return true
}
67 changes: 28 additions & 39 deletions combinelatest3.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package rx

import "sync"

// CombineLatest3 combines multiple Observables to create an Observable
// that emits mappings of the latest values emitted by each of its input
// Observables.
Expand All @@ -10,42 +12,20 @@ func CombineLatest3[T1, T2, T3, R any](
mapping func(v1 T1, v2 T2, v3 T3) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, cancel := c.WithCancel()
noop := make(chan struct{})
o = o.DoOnTermination(func() {
cancel()
close(noop)
})

chan1 := make(chan Notification[T1])
chan2 := make(chan Notification[T2])
chan3 := make(chan Notification[T3])

c.Go(func() {
var s combineLatestState3[T1, T2, T3]

cont := true
c, o = Serialize(c, o)

for cont {
select {
case n := <-chan1:
cont = combineLatestEmit3(o, n, mapping, &s, &s.V1, 1)
case n := <-chan2:
cont = combineLatestEmit3(o, n, mapping, &s, &s.V2, 2)
case n := <-chan3:
cont = combineLatestEmit3(o, n, mapping, &s, &s.V3, 4)
}
}
})
var s combineLatestState3[T1, T2, T3]

_ = true &&
subscribeChannel(c, ob1, chan1, noop) &&
subscribeChannel(c, ob2, chan2, noop) &&
subscribeChannel(c, ob3, chan3, noop)
ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit3(o, n, mapping, &s, &s.V1, 1) }) &&
ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit3(o, n, mapping, &s, &s.V2, 2) }) &&
ob3.satcc(c, func(n Notification[T3]) { combineLatestEmit3(o, n, mapping, &s, &s.V3, 4) })
}
}

type combineLatestState3[T1, T2, T3 any] struct {
sync.Mutex

NBits, CBits uint8

V1 T1
Expand All @@ -60,29 +40,38 @@ func combineLatestEmit3[T1, T2, T3, R, X any](
s *combineLatestState3[T1, T2, T3],
v *X,
bit uint8,
) bool {
) {
const FullBits = 7

switch n.Kind {
case KindNext:
s.Lock()
*v = n.Value
nbits := s.NBits
nbits |= bit
s.NBits = nbits

if s.NBits |= bit; s.NBits == FullBits {
oops := func() { o.Error(ErrOops) }
v := Try31(mapping, s.V1, s.V2, s.V3, oops)
Try1(o, Next(v), oops)
if nbits == FullBits {
v := Try31(mapping, s.V1, s.V2, s.V3, s.Unlock)
s.Unlock()
o.Next(v)
return
}

s.Unlock()

case KindError:
o.Error(n.Error)
return false

case KindComplete:
if s.CBits |= bit; s.CBits == FullBits {
s.Lock()
cbits := s.CBits
cbits |= bit
s.CBits = cbits
s.Unlock()

if cbits == FullBits {
o.Complete()
return false
}
}

return true
}
72 changes: 29 additions & 43 deletions combinelatest4.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package rx

import "sync"

// CombineLatest4 combines multiple Observables to create an Observable
// that emits mappings of the latest values emitted by each of its input
// Observables.
Expand All @@ -11,46 +13,21 @@ func CombineLatest4[T1, T2, T3, T4, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, cancel := c.WithCancel()
noop := make(chan struct{})
o = o.DoOnTermination(func() {
cancel()
close(noop)
})

chan1 := make(chan Notification[T1])
chan2 := make(chan Notification[T2])
chan3 := make(chan Notification[T3])
chan4 := make(chan Notification[T4])

c.Go(func() {
var s combineLatestState4[T1, T2, T3, T4]

cont := true
c, o = Serialize(c, o)

for cont {
select {
case n := <-chan1:
cont = combineLatestEmit4(o, n, mapping, &s, &s.V1, 1)
case n := <-chan2:
cont = combineLatestEmit4(o, n, mapping, &s, &s.V2, 2)
case n := <-chan3:
cont = combineLatestEmit4(o, n, mapping, &s, &s.V3, 4)
case n := <-chan4:
cont = combineLatestEmit4(o, n, mapping, &s, &s.V4, 8)
}
}
})
var s combineLatestState4[T1, T2, T3, T4]

_ = true &&
subscribeChannel(c, ob1, chan1, noop) &&
subscribeChannel(c, ob2, chan2, noop) &&
subscribeChannel(c, ob3, chan3, noop) &&
subscribeChannel(c, ob4, chan4, noop)
ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit4(o, n, mapping, &s, &s.V1, 1) }) &&
ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit4(o, n, mapping, &s, &s.V2, 2) }) &&
ob3.satcc(c, func(n Notification[T3]) { combineLatestEmit4(o, n, mapping, &s, &s.V3, 4) }) &&
ob4.satcc(c, func(n Notification[T4]) { combineLatestEmit4(o, n, mapping, &s, &s.V4, 8) })
}
}

type combineLatestState4[T1, T2, T3, T4 any] struct {
sync.Mutex

NBits, CBits uint8

V1 T1
Expand All @@ -66,29 +43,38 @@ func combineLatestEmit4[T1, T2, T3, T4, R, X any](
s *combineLatestState4[T1, T2, T3, T4],
v *X,
bit uint8,
) bool {
) {
const FullBits = 15

switch n.Kind {
case KindNext:
s.Lock()
*v = n.Value
nbits := s.NBits
nbits |= bit
s.NBits = nbits

if s.NBits |= bit; s.NBits == FullBits {
oops := func() { o.Error(ErrOops) }
v := Try41(mapping, s.V1, s.V2, s.V3, s.V4, oops)
Try1(o, Next(v), oops)
if nbits == FullBits {
v := Try41(mapping, s.V1, s.V2, s.V3, s.V4, s.Unlock)
s.Unlock()
o.Next(v)
return
}

s.Unlock()

case KindError:
o.Error(n.Error)
return false

case KindComplete:
if s.CBits |= bit; s.CBits == FullBits {
s.Lock()
cbits := s.CBits
cbits |= bit
s.CBits = cbits
s.Unlock()

if cbits == FullBits {
o.Complete()
return false
}
}

return true
}
Loading

0 comments on commit e08bf48

Please sign in to comment.