Skip to content

Commit

Permalink
Discard mode (#4)
Browse files Browse the repository at this point in the history
* discard mode, wip

* add Discard option and update docs

* lint: unusual return of private struct

* minor refactoring
  • Loading branch information
umputun committed Apr 9, 2023
1 parent 2305c11 commit b4d22b8
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 33 deletions.
41 changes: 28 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[![Build Status](https://github.com/go-pkgz/syncs/workflows/build/badge.svg)](https://github.com/go-pkgz/syncs/actions) [![Go Report Card](https://goreportcard.com/badge/github.com/go-pkgz/syncs)](https://goreportcard.com/report/github.com/go-pkgz/syncs) [![Coverage Status](https://coveralls.io/repos/github/go-pkgz/syncs/badge.svg?branch=master)](https://coveralls.io/github/go-pkgz/syncs?branch=master)

Package syncs provides additional synchronization primitives.
The `syncs` package offers extra synchronization primitives, such as `Semaphore`, `SizedGroup`, and `ErrSizedGroup`, to help manage concurrency in Go programs. With `syncs` package, you can efficiently manage concurrency in your Go programs using additional synchronization primitives. Use them according to your specific use-case requirements to control and limit concurrent goroutines while handling errors and early termination effectively.

## Install and update

Expand All @@ -12,7 +12,8 @@ Package syncs provides additional synchronization primitives.

### Semaphore

Implements `sync.Locker` interface but for given capacity, thread safe. Lock increases count and Unlock - decreases. Unlock on 0 count will be blocked.
`Semaphore` implements the `sync.Locker` interface with an additional `TryLock` function and a specified capacity.
It is thread-safe. The `Lock` function increases the count, while Unlock decreases it. When the count is 0, `Unlock` will block, and `Lock` will block until the count is greater than 0. The `TryLock` function will return false if locking failed (i.e. semaphore is locked) and true otherwise.

```go
sema := syncs.NewSemaphore(10) // make semaphore with 10 initial capacity
Expand All @@ -23,14 +24,17 @@ Implements `sync.Locker` interface but for given capacity, thread safe. Lock inc

// in some other place/goroutine
sema.Unlock() // decrease semaphore counter
ok := sema.TryLock() // try to lock, will return false if semaphore is locked
```

### SizedGroup

Mix semaphore and WaitGroup to provide sized waiting group. The result is a wait group allowing limited number of goroutine to run in parallel.
`SizedGroup` combines `Semaphore` and `WaitGroup` to provide a wait group that allows a limited number of goroutines to run in parallel.

By default, locking happens inside the goroutine. This means every call will be non-blocking, but some goroutines may wait if the semaphore is locked. Technically, it doesn't limit the number of goroutines but rather the number of running (active) goroutines.

To block goroutines from starting, use the `Preemptive` option. Important: With `Preemptive`, the `Go` call can block. If the maximum size is reached, the call will wait until the number of running goroutines drops below the maximum. This not only limits the number of running goroutines but also the number of waiting goroutines.

By default, the locking happens inside of goroutine, i.e. **every call will be non-blocked**, but some goroutines may wait if semaphore locked. It means - technically it doesn't limit number of goroutines, but rather number of running (active) goroutines.
In order to block goroutines from even starting use `Preemptive` option (see below).

```go
swg := syncs.NewSizedGroup(5) // wait group with max size=5
Expand All @@ -42,17 +46,27 @@ In order to block goroutines from even starting use `Preemptive` option (see bel
swg.Wait()
```

Another option is `Discard`, which will skip (won't start) goroutines if the semaphore is locked. In other words, if a defined number of goroutines are already running, the call will be discarded. `Discard` is useful when you don't care about the results of extra goroutines; i.e., you just want to run some tasks in parallel but can allow some number of them to be ignored. This flag sets `Preemptive` as well, because otherwise, it doesn't make sense.


```go
swg := syncs.NewSizedGroup(5, Discard) // wait group with max size=5 and discarding extra goroutines
for i :=0; i<10; i++ {
swg.Go(func(ctx context.Context){
doThings(ctx) // only 5 of these will run in parallel and 5 other can be discarded
})
}
swg.Wait()
```


### ErrSizedGroup

Sized error group is a SizedGroup with error control.
Works the same as errgrp.Group, i.e. returns first error.
Can work as regular errgrp.Group or with early termination.
Thread safe.
`ErrSizedGroup` is a `SizedGroup` with error control. It works the same as `errgrp.Group`, i.e., it returns the first error.
It can work as a regular errgrp.Group or with early termination. It is thread-safe.

Supports both in-goroutine-wait via `NewErrSizedGroup` as well as outside of goroutine wait with `Preemptive` option. Another options are `TermOnErr` which will skip (won't start) all other goroutines if any error returned, and `Context` for early termination/timeouts.

Important! With `Preemptive` Go call **can block**. In case if maximum size reached the call will wait till number of running goroutines
dropped under max. This way we not only limiting number of running goroutines but also number of waiting goroutines.
`ErrSizedGroup` supports both in-goroutine-wait as well as outside of goroutine wait with `Preemptive` and `Discard` options (see above). Other options include `TermOnErr`, which skips (won't start) all other goroutines if any error is returned, and `Context` for early termination/timeouts.


```go
Expand All @@ -64,4 +78,5 @@ dropped under max. This way we not only limiting number of running goroutines bu
})
}
err := ewg.Wait()
```
```

16 changes: 11 additions & 5 deletions errsizedgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
type ErrSizedGroup struct {
options
wg sync.WaitGroup
sema sync.Locker
sema Locker

err *multierror
errLock sync.RWMutex
Expand All @@ -23,7 +23,6 @@ type ErrSizedGroup struct {
// By default all goroutines will be started but will wait inside. For limited number of goroutines use Preemptive() options.
// TermOnErr will skip (won't start) all other goroutines if any error returned.
func NewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup {

res := ErrSizedGroup{
sema: NewSemaphore(size),
err: new(multierror),
Expand All @@ -40,11 +39,18 @@ func NewErrSizedGroup(size int, options ...GroupOption) *ErrSizedGroup {
// The first call to return a non-nil error cancels the group if termOnError; its error will be
// returned by Wait. If no termOnError all errors will be collected in multierror.
func (g *ErrSizedGroup) Go(f func() error) {

g.wg.Add(1)

if g.preLock {
g.sema.Lock()
lockOk := g.sema.TryLock()
if !lockOk && g.discardIfFull {
// lock failed and discardIfFull is set, discard this goroutine
g.wg.Done()
return
}
if !lockOk && !g.discardIfFull {
g.sema.Lock() // make sure we have block until lock is acquired
}
}

go func() {
Expand Down Expand Up @@ -115,7 +121,7 @@ func (m *multierror) errorOrNil() error {
return m
}

// Error returns multierror string
// Error returns multi-error string
func (m *multierror) Error() string {
m.lock.Lock()
defer m.lock.Unlock()
Expand Down
27 changes: 23 additions & 4 deletions errsizedgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ func TestErrorSizedGroup_Preemptive(t *testing.T) {
ewg := NewErrSizedGroup(10, Preemptive)
var c uint32

for i := 0; i < 1000; i++ {
for i := 0; i < 100; i++ {
i := i
ewg.Go(func() error {
assert.True(t, runtime.NumGoroutine() < 20, "goroutines %d", runtime.NumGoroutine())
atomic.AddUint32(&c, 1)
if i == 100 {
if i == 10 {
return errors.New("err1")
}
if i == 200 {
if i == 20 {
return errors.New("err2")
}
time.Sleep(time.Millisecond)
Expand All @@ -64,7 +64,26 @@ func TestErrorSizedGroup_Preemptive(t *testing.T) {
err := ewg.Wait()
require.NotNil(t, err)
assert.True(t, strings.HasPrefix(err.Error(), "2 error(s) occurred:"))
assert.Equal(t, uint32(1000), c, fmt.Sprintf("%d, not all routines have been executed.", c))
assert.Equal(t, uint32(100), c, fmt.Sprintf("%d, not all routines have been executed.", c))
}

func TestErrorSizedGroup_Discard(t *testing.T) {
ewg := NewErrSizedGroup(10, Discard)
var c uint32

for i := 0; i < 1000; i++ {
ewg.Go(func() error {
assert.True(t, runtime.NumGoroutine() < 20, "goroutines %d", runtime.NumGoroutine())
atomic.AddUint32(&c, 1)
time.Sleep(10 * time.Millisecond)
return nil
})
}

assert.True(t, runtime.NumGoroutine() <= 20, "goroutines %d", runtime.NumGoroutine())
err := ewg.Wait()
assert.NoError(t, err)
assert.Equal(t, uint32(10), c)
}

func TestErrorSizedGroup_NoError(t *testing.T) {
Expand Down
15 changes: 11 additions & 4 deletions group_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package syncs
import "context"

type options struct {
ctx context.Context
cancel context.CancelFunc
preLock bool
termOnError bool
ctx context.Context
cancel context.CancelFunc
preLock bool
termOnError bool
discardIfFull bool
}

// GroupOption functional option type
Expand All @@ -28,3 +29,9 @@ func Preemptive(o *options) {
func TermOnErr(o *options) {
o.termOnError = true
}

// Discard will discard new goroutines if semaphore is full, i.e. no more goroutines allowed
func Discard(o *options) {
o.discardIfFull = true
o.preLock = true // discard implies preemptive
}
20 changes: 18 additions & 2 deletions semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@ package syncs

import "sync"

// Locker is a superset of sync.Locker interface with TryLock method.
type Locker interface {
sync.Locker
TryLock() bool
}

// Semaphore implementation, counted lock only. Implements sync.Locker interface, thread safe.
type semaphore struct {
sync.Locker
Locker
ch chan struct{}
}

// NewSemaphore makes Semaphore with given capacity
func NewSemaphore(capacity int) sync.Locker {
func NewSemaphore(capacity int) Locker {
if capacity <= 0 {
capacity = 1
}
Expand All @@ -25,3 +31,13 @@ func (s *semaphore) Lock() {
func (s *semaphore) Unlock() {
<-s.ch
}

// TryLock acquires semaphore if possible, returns true if acquired, false otherwise.
func (s *semaphore) TryLock() bool {
select {
case s.ch <- struct{}{}:
return true
default:
return false
}
}
15 changes: 10 additions & 5 deletions sizedgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type SizedGroup struct {
options
wg sync.WaitGroup
sema sync.Locker
sema Locker
}

// NewSizedGroup makes wait group with limited size alive goroutines
Expand All @@ -27,7 +27,6 @@ func NewSizedGroup(size int, opts ...GroupOption) *SizedGroup {
// Go calls the given function in a new goroutine.
// Every call will be unblocked, but some goroutines may wait if semaphore locked.
func (g *SizedGroup) Go(fn func(ctx context.Context)) {

canceled := func() bool {
select {
case <-g.ctx.Done():
Expand All @@ -41,12 +40,18 @@ func (g *SizedGroup) Go(fn func(ctx context.Context)) {
return
}

g.wg.Add(1)

if g.preLock {
g.sema.Lock()
lockOk := g.sema.TryLock()
if !lockOk && g.discardIfFull {
// lock failed and discardIfFull is set, discard this goroutine
return
}
if !lockOk && !g.discardIfFull {
g.sema.Lock() // make sure we have block until lock is acquired
}
}

g.wg.Add(1)
go func() {
defer g.wg.Done()

Expand Down
15 changes: 15 additions & 0 deletions sizedgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ func TestSizedGroup(t *testing.T) {
assert.Equal(t, uint32(1000), c, fmt.Sprintf("%d, not all routines have been executed", c))
}

func TestSizedGroup_Discard(t *testing.T) {
swg := NewSizedGroup(10, Preemptive, Discard)
var c uint32

for i := 0; i < 100; i++ {
swg.Go(func(ctx context.Context) {
time.Sleep(5 * time.Millisecond)
atomic.AddUint32(&c, 1)
})
}
assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine())
swg.Wait()
assert.Equal(t, uint32(10), c, fmt.Sprintf("%d, not all routines have been executed", c))
}

func TestSizedGroup_Preemptive(t *testing.T) {
swg := NewSizedGroup(10, Preemptive)
var c uint32
Expand Down

0 comments on commit b4d22b8

Please sign in to comment.