Skip to content

Commit

Permalink
rename Serialize function to Synchronize
Browse files Browse the repository at this point in the history
  • Loading branch information
b97tsk committed Aug 10, 2024
1 parent 52938f9 commit ce033c6
Show file tree
Hide file tree
Showing 30 changed files with 46 additions and 47 deletions.
2 changes: 1 addition & 1 deletion combinelatest2.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func CombineLatest2[T1, T2, R any](
mapping func(v1 T1, v2 T2) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s combineLatestState2[T1, T2]

Expand Down
2 changes: 1 addition & 1 deletion combinelatest3.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func CombineLatest3[T1, T2, T3, R any](
mapping func(v1 T1, v2 T2, v3 T3) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s combineLatestState3[T1, T2, T3]

Expand Down
2 changes: 1 addition & 1 deletion combinelatest4.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func CombineLatest4[T1, T2, T3, T4, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s combineLatestState4[T1, T2, T3, T4]

Expand Down
2 changes: 1 addition & 1 deletion combinelatest5.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func CombineLatest5[T1, T2, T3, T4, T5, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s combineLatestState5[T1, T2, T3, T4, T5]

Expand Down
2 changes: 1 addition & 1 deletion combinelatest6.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func CombineLatest6[T1, T2, T3, T4, T5, T6, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s combineLatestState6[T1, T2, T3, T4, T5, T6]

Expand Down
2 changes: 1 addition & 1 deletion combinelatest7.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func CombineLatest7[T1, T2, T3, T4, T5, T6, T7, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s combineLatestState7[T1, T2, T3, T4, T5, T6, T7]

Expand Down
2 changes: 1 addition & 1 deletion combinelatest8.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func CombineLatest8[T1, T2, T3, T4, T5, T6, T7, T8, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s combineLatestState8[T1, T2, T3, T4, T5, T6, T7, T8]

Expand Down
2 changes: 1 addition & 1 deletion combinelatest9.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func CombineLatest9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s combineLatestState9[T1, T2, T3, T4, T5, T6, T7, T8, T9]

Expand Down
12 changes: 6 additions & 6 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,17 @@
//
// The following operations may cause concurrent behavior due to [Context]
// cancellation:
// - CombineLatest operators (due to use of [Serialize]);
// - CombineLatest operators (due to use of [Synchronize]);
// - [Connect] (due to use of [Multicast]);
// - Merge operators (due to use of [Serialize]);
// - Merge operators (due to use of [Synchronize]);
// - [Multicast] and other relatives (due to use of [Context.AfterFunc] and
// [Serialize]);
// [Synchronize]);
// - [Never] (due to use of [Context.AfterFunc]);
// - [Serialize] (due to use of [Context.AfterFunc]);
// - [Share] (due to use of [Context.AfterFunc] and [Multicast]);
// - [Synchronize] (due to use of [Context.AfterFunc]);
// - [Unicast] and other relatives (due to use of [Context.AfterFunc]);
// - WithLatestFrom operators (due to use of [Serialize]);
// - ZipWithBuffering operators (due to use of [Serialize]).
// - WithLatestFrom operators (due to use of [Synchronize]);
// - ZipWithBuffering operators (due to use of [Synchronize]).
//
// Since [Context] cancellations are very common in this library, and that
// a [Context] cancellation usually results in a [Stop] notification, emitted
Expand Down
6 changes: 3 additions & 3 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type mergeWithObservable[T any] struct {
}

func (ob mergeWithObservable[T]) Subscribe(c Context, o Observer[T]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var num atomic.Uint32

Expand Down Expand Up @@ -156,7 +156,7 @@ func (ob mergeMapObservable[T, R]) Subscribe(c Context, o Observer[R]) {
return
}

c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var x struct {
mu sync.Mutex
Expand Down Expand Up @@ -254,7 +254,7 @@ func (ob mergeMapObservable[T, R]) Subscribe(c Context, o Observer[R]) {
}

func (ob mergeMapObservable[T, R]) SubscribeWithBuffering(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var x struct {
mu sync.Mutex
Expand Down
2 changes: 1 addition & 1 deletion multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (m *multicast[T]) Subscribe(c Context, o Observer[T]) {

lastn := m.lastn
if lastn.Kind == 0 {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

o := o
m.mobs.Add(&o)
Expand Down
2 changes: 1 addition & 1 deletion multicast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestMulticast(t *testing.T) {

subscribeThenComplete := rx.NewObservable(
func(c rx.Context, o rx.Observer[string]) {
c, o = rx.Serialize(c, o)
c, o = rx.Synchronize(c, o)
m.Subscribe(c, o)
o.Complete()
},
Expand Down
11 changes: 11 additions & 0 deletions observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ func (o Observer[T]) DoOnTermination(f func()) Observer[T] {
}
}

// Synchronize returns an [Observer] that passes incoming emissions to o
// in a mutually exclusive way.
// Synchronize also returns a copy of c that will be cancelled when o is
// about to receive a notification of [Complete], [Error] or [Stop].
func Synchronize[T any](c Context, o Observer[T]) (Context, Observer[T]) {
c, cancel := c.WithCancel()
u := new(unicast[T])
u.Subscribe(c, o.DoOnTermination(cancel))
return c, u.Emit
}

// WithRuntimeFinalizer creates an Observer with a runtime finalizer set to
// run o.Stop(ErrFinalized) in a goroutine.
// o must be safe for concurrent use.
Expand Down
12 changes: 0 additions & 12 deletions serialize.go

This file was deleted.

2 changes: 1 addition & 1 deletion withlatestfrom1.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func withLatestFrom2[T1, T2, R any](
mapping func(v1 T1, v2 T2) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s withLatestFromState2[T1, T2]

Expand Down
2 changes: 1 addition & 1 deletion withlatestfrom2.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func withLatestFrom3[T1, T2, T3, R any](
mapping func(v1 T1, v2 T2, v3 T3) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s withLatestFromState3[T1, T2, T3]

Expand Down
2 changes: 1 addition & 1 deletion withlatestfrom3.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func withLatestFrom4[T1, T2, T3, T4, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s withLatestFromState4[T1, T2, T3, T4]

Expand Down
2 changes: 1 addition & 1 deletion withlatestfrom4.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func withLatestFrom5[T1, T2, T3, T4, T5, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s withLatestFromState5[T1, T2, T3, T4, T5]

Expand Down
2 changes: 1 addition & 1 deletion withlatestfrom5.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func withLatestFrom6[T1, T2, T3, T4, T5, T6, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s withLatestFromState6[T1, T2, T3, T4, T5, T6]

Expand Down
2 changes: 1 addition & 1 deletion withlatestfrom6.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func withLatestFrom7[T1, T2, T3, T4, T5, T6, T7, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s withLatestFromState7[T1, T2, T3, T4, T5, T6, T7]

Expand Down
2 changes: 1 addition & 1 deletion withlatestfrom7.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func withLatestFrom8[T1, T2, T3, T4, T5, T6, T7, T8, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s withLatestFromState8[T1, T2, T3, T4, T5, T6, T7, T8]

Expand Down
2 changes: 1 addition & 1 deletion withlatestfrom8.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func withLatestFrom9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s withLatestFromState9[T1, T2, T3, T4, T5, T6, T7, T8, T9]

Expand Down
2 changes: 1 addition & 1 deletion zipwithbuffering2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func ZipWithBuffering2[T1, T2, R any](
mapping func(v1 T1, v2 T2) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s zipState2[T1, T2]

Expand Down
2 changes: 1 addition & 1 deletion zipwithbuffering3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func ZipWithBuffering3[T1, T2, T3, R any](
mapping func(v1 T1, v2 T2, v3 T3) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s zipState3[T1, T2, T3]

Expand Down
2 changes: 1 addition & 1 deletion zipwithbuffering4.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func ZipWithBuffering4[T1, T2, T3, T4, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s zipState4[T1, T2, T3, T4]

Expand Down
2 changes: 1 addition & 1 deletion zipwithbuffering5.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func ZipWithBuffering5[T1, T2, T3, T4, T5, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s zipState5[T1, T2, T3, T4, T5]

Expand Down
2 changes: 1 addition & 1 deletion zipwithbuffering6.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func ZipWithBuffering6[T1, T2, T3, T4, T5, T6, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s zipState6[T1, T2, T3, T4, T5, T6]

Expand Down
2 changes: 1 addition & 1 deletion zipwithbuffering7.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func ZipWithBuffering7[T1, T2, T3, T4, T5, T6, T7, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s zipState7[T1, T2, T3, T4, T5, T6, T7]

Expand Down
2 changes: 1 addition & 1 deletion zipwithbuffering8.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func ZipWithBuffering8[T1, T2, T3, T4, T5, T6, T7, T8, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s zipState8[T1, T2, T3, T4, T5, T6, T7, T8]

Expand Down
2 changes: 1 addition & 1 deletion zipwithbuffering9.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func ZipWithBuffering9[T1, T2, T3, T4, T5, T6, T7, T8, T9, R any](
mapping func(v1 T1, v2 T2, v3 T3, v4 T4, v5 T5, v6 T6, v7 T7, v8 T8, v9 T9) R,
) Observable[R] {
return func(c Context, o Observer[R]) {
c, o = Serialize(c, o)
c, o = Synchronize(c, o)

var s zipState9[T1, T2, T3, T4, T5, T6, T7, T8, T9]

Expand Down

0 comments on commit ce033c6

Please sign in to comment.