Skip to content

Commit

Permalink
update(*): rename struct fields
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Aug 7, 2024
1 parent 2b4b409 commit 86f4ab0
Show file tree
Hide file tree
Showing 50 changed files with 1,230 additions and 1,229 deletions.
56 changes: 28 additions & 28 deletions audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,34 @@ func Audit[T, U any](durationSelector func(v T) Observable[U]) Operator[T, T] {
}

type auditObservable[T, U any] struct {
Source Observable[T]
DurationSelector func(T) Observable[U]
source Observable[T]
durationSelector func(T) Observable[U]
}

func (ob auditObservable[T, U]) Subscribe(c Context, o Observer[T]) {
c, cancel := c.WithCancel()
o = o.DoOnTermination(cancel)

var x struct {
Context atomic.Value
Complete atomic.Bool
Latest struct {
context atomic.Value
complete atomic.Bool
latest struct {
sync.Mutex
Value T
value T
}
Worker struct {
worker struct {
sync.WaitGroup
}
}

x.Context.Store(c.Context)
x.context.Store(c.Context)

startWorker := func(v T) {
obs := ob.DurationSelector(v)
obs := ob.durationSelector(v)
w, cancelw := c.WithCancel()

x.Context.Store(w.Context)
x.Worker.Add(1)
x.context.Store(w.Context)
x.worker.Add(1)

var noop bool

Expand All @@ -67,65 +67,65 @@ func (ob auditObservable[T, U]) Subscribe(c Context, o Observer[T]) {
return
}

defer x.Worker.Done()
defer x.worker.Done()

noop = true
cancelw()

switch n.Kind {
case KindNext:
x.Latest.Lock()
value := x.Latest.Value
x.Latest.Unlock()
x.latest.Lock()
value := x.latest.value
x.latest.Unlock()

Try1(o, Next(value), func() {
if x.Context.Swap(sentinel) != sentinel {
if x.context.Swap(sentinel) != sentinel {
o.Error(ErrOops)
}
})

if x.Context.CompareAndSwap(w.Context, c.Context) && x.Complete.Load() && x.Context.CompareAndSwap(c.Context, sentinel) {
if x.context.CompareAndSwap(w.Context, c.Context) && x.complete.Load() && x.context.CompareAndSwap(c.Context, sentinel) {
o.Complete()
}

case KindError:
if x.Context.Swap(sentinel) != sentinel {
if x.context.Swap(sentinel) != sentinel {
o.Error(n.Error)
}

case KindComplete:
if x.Context.CompareAndSwap(w.Context, c.Context) && x.Complete.Load() && x.Context.CompareAndSwap(c.Context, sentinel) {
if x.context.CompareAndSwap(w.Context, c.Context) && x.complete.Load() && x.context.CompareAndSwap(c.Context, sentinel) {
o.Complete()
}
}
})
}

ob.Source.Subscribe(c, func(n Notification[T]) {
ob.source.Subscribe(c, func(n Notification[T]) {
switch n.Kind {
case KindNext:
x.Latest.Lock()
x.Latest.Value = n.Value
x.Latest.Unlock()
x.latest.Lock()
x.latest.value = n.Value
x.latest.Unlock()

if x.Context.Load() == c.Context {
if x.context.Load() == c.Context {
startWorker(n.Value)
}

case KindError:
old := x.Context.Swap(sentinel)
old := x.context.Swap(sentinel)

cancel()
x.Worker.Wait()
x.worker.Wait()

if old != sentinel {
o.Emit(n)
}

case KindComplete:
x.Complete.Store(true)
x.complete.Store(true)

if x.Context.CompareAndSwap(c.Context, sentinel) {
if x.context.CompareAndSwap(c.Context, sentinel) {
o.Emit(n)
}
}
Expand Down
32 changes: 16 additions & 16 deletions buffercount.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ package rx
func BufferCount[T any](bufferSize int) BufferCountOperator[T] {
return BufferCountOperator[T]{
ts: bufferCountConfig{
BufferSize: bufferSize,
StartBufferEvery: bufferSize,
bufferSize: bufferSize,
startBufferEvery: bufferSize,
},
}
}

type bufferCountConfig struct {
BufferSize int
StartBufferEvery int
bufferSize int
startBufferEvery int
}

// BufferCountOperator is an [Operator] type for [BufferCount].
Expand All @@ -28,33 +28,33 @@ type BufferCountOperator[T any] struct {

// WithStartBufferEvery sets StartBufferEvery option to a given value.
func (op BufferCountOperator[T]) WithStartBufferEvery(n int) BufferCountOperator[T] {
op.ts.StartBufferEvery = n
op.ts.startBufferEvery = n
return op
}

// Apply implements the Operator interface.
func (op BufferCountOperator[T]) Apply(source Observable[T]) Observable[[]T] {
if op.ts.BufferSize <= 0 {
if op.ts.bufferSize <= 0 {
return Oops[[]T]("BufferCount: BufferSize <= 0")
}

if op.ts.StartBufferEvery <= 0 {
if op.ts.startBufferEvery <= 0 {
return Oops[[]T]("BufferCount: StartBufferEvery <= 0")
}

return bufferCountObservable[T]{source, op.ts}.Subscribe
}

type bufferCountObservable[T any] struct {
Source Observable[T]
source Observable[T]
bufferCountConfig
}

func (ob bufferCountObservable[T]) Subscribe(c Context, o Observer[[]T]) {
s := make([]T, 0, ob.BufferSize)
s := make([]T, 0, ob.bufferSize)
skip := 0

ob.Source.Subscribe(c, func(n Notification[T]) {
ob.source.Subscribe(c, func(n Notification[T]) {
switch n.Kind {
case KindNext:
if skip > 0 {
Expand All @@ -64,17 +64,17 @@ func (ob bufferCountObservable[T]) Subscribe(c Context, o Observer[[]T]) {

s = append(s, n.Value)

if len(s) < ob.BufferSize {
if len(s) < ob.bufferSize {
return
}

o.Next(s)

if ob.StartBufferEvery < ob.BufferSize {
s = append(s[:0], s[ob.StartBufferEvery:]...)
if ob.startBufferEvery < ob.bufferSize {
s = append(s[:0], s[ob.startBufferEvery:]...)
} else {
s = s[:0]
skip = ob.StartBufferEvery - ob.BufferSize
skip = ob.startBufferEvery - ob.bufferSize
}

case KindError:
Expand All @@ -85,11 +85,11 @@ func (ob bufferCountObservable[T]) Subscribe(c Context, o Observer[[]T]) {
for {
Try1(o, Next(s), func() { o.Error(ErrOops) })

if len(s) <= ob.StartBufferEvery {
if len(s) <= ob.startBufferEvery {
break
}

s = s[ob.StartBufferEvery:]
s = s[ob.startBufferEvery:]
}
}

Expand Down
32 changes: 16 additions & 16 deletions combinelatest2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ func CombineLatest2[T1, T2, R any](
var s combineLatestState2[T1, T2]

_ = true &&
ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit2(o, n, mapping, &s, &s.V1, 1) }) &&
ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit2(o, n, mapping, &s, &s.V2, 2) })
ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit2(o, n, mapping, &s, &s.v1, 1) }) &&
ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit2(o, n, mapping, &s, &s.v2, 2) })
}
}

type combineLatestState2[T1, T2 any] struct {
sync.Mutex
mu sync.Mutex

NBits, CBits uint8
nbits, cbits uint8

V1 T1
V2 T2
v1 T1
v2 T2
}

func combineLatestEmit2[T1, T2, R, X any](
Expand All @@ -42,30 +42,30 @@ func combineLatestEmit2[T1, T2, R, X any](

switch n.Kind {
case KindNext:
s.Lock()
s.mu.Lock()
*v = n.Value
nbits := s.NBits
nbits := s.nbits
nbits |= bit
s.NBits = nbits
s.nbits = nbits

if nbits == FullBits {
v := Try21(mapping, s.V1, s.V2, s.Unlock)
s.Unlock()
v := Try21(mapping, s.v1, s.v2, s.mu.Unlock)
s.mu.Unlock()
o.Next(v)
return
}

s.Unlock()
s.mu.Unlock()

case KindError:
o.Error(n.Error)

case KindComplete:
s.Lock()
cbits := s.CBits
s.mu.Lock()
cbits := s.cbits
cbits |= bit
s.CBits = cbits
s.Unlock()
s.cbits = cbits
s.mu.Unlock()

if cbits == FullBits {
o.Complete()
Expand Down
36 changes: 18 additions & 18 deletions combinelatest3.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ func CombineLatest3[T1, T2, T3, R any](
var s combineLatestState3[T1, T2, T3]

_ = true &&
ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit3(o, n, mapping, &s, &s.V1, 1) }) &&
ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit3(o, n, mapping, &s, &s.V2, 2) }) &&
ob3.satcc(c, func(n Notification[T3]) { combineLatestEmit3(o, n, mapping, &s, &s.V3, 4) })
ob1.satcc(c, func(n Notification[T1]) { combineLatestEmit3(o, n, mapping, &s, &s.v1, 1) }) &&
ob2.satcc(c, func(n Notification[T2]) { combineLatestEmit3(o, n, mapping, &s, &s.v2, 2) }) &&
ob3.satcc(c, func(n Notification[T3]) { combineLatestEmit3(o, n, mapping, &s, &s.v3, 4) })
}
}

type combineLatestState3[T1, T2, T3 any] struct {
sync.Mutex
mu sync.Mutex

NBits, CBits uint8
nbits, cbits uint8

V1 T1
V2 T2
V3 T3
v1 T1
v2 T2
v3 T3
}

func combineLatestEmit3[T1, T2, T3, R, X any](
Expand All @@ -45,30 +45,30 @@ func combineLatestEmit3[T1, T2, T3, R, X any](

switch n.Kind {
case KindNext:
s.Lock()
s.mu.Lock()
*v = n.Value
nbits := s.NBits
nbits := s.nbits
nbits |= bit
s.NBits = nbits
s.nbits = nbits

if nbits == FullBits {
v := Try31(mapping, s.V1, s.V2, s.V3, s.Unlock)
s.Unlock()
v := Try31(mapping, s.v1, s.v2, s.v3, s.mu.Unlock)
s.mu.Unlock()
o.Next(v)
return
}

s.Unlock()
s.mu.Unlock()

case KindError:
o.Error(n.Error)

case KindComplete:
s.Lock()
cbits := s.CBits
s.mu.Lock()
cbits := s.cbits
cbits |= bit
s.CBits = cbits
s.Unlock()
s.cbits = cbits
s.mu.Unlock()

if cbits == FullBits {
o.Complete()
Expand Down
Loading

0 comments on commit 86f4ab0

Please sign in to comment.