Skip to content

Commit

Permalink
fix: add group runner's timeout and make some channels buffered.
Browse files Browse the repository at this point in the history
  • Loading branch information
jvillafanez committed Apr 29, 2024
1 parent 08c4763 commit 05f684a
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 52 deletions.
158 changes: 114 additions & 44 deletions ocis-pkg/runner/grouprunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package runner
import (
"context"
"sync"
"sync/atomic"
"time"
)

// GroupRunner represent a group of tasks that need to run together.
Expand All @@ -17,21 +19,44 @@ import (
// providing a piece of functionality, however, if any of them fails, the
// feature provided by them would be incomplete or broken.
//
// The interrupt duration for the group can be set through the
// `WithInterruptDuration` option. If the option isn't supplied, the default
// value (15 secs) will be used.
//
// It's recommended that the timeouts are handled by each runner individually,
// meaning that each runner's timeout should be less than the group runner's
// timeout. This way, we can know which runner timed out.
// If the group timeout is reached, the remaining results will have the
// runner's id as "_unknown_".
//
// Note that, as services, the task aren't expected to stop by default.
// This means that, if a task finishes naturally, the rest of the task will
// asked to stop as well.
type GroupRunner struct {
runners sync.Map
runnersCount int
isRunning bool
runningMutex sync.Mutex
runners sync.Map
runnersCount int
isRunning bool
interruptDur time.Duration
interrupted atomic.Bool
interruptedCh chan time.Duration
runningMutex sync.Mutex
}

// NewGroup will create a GroupRunner
func NewGroup() *GroupRunner {
func NewGroup(opts ...Option) *GroupRunner {
options := Options{
InterruptDuration: DefaultGroupInterruptDuration,
}

for _, o := range opts {
o(&options)
}

return &GroupRunner{
runners: sync.Map{},
runningMutex: sync.Mutex{},
runners: sync.Map{},
runningMutex: sync.Mutex{},
interruptDur: options.InterruptDuration,
interruptedCh: make(chan time.Duration, 1),
}
}

Expand Down Expand Up @@ -85,7 +110,7 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result {
gr.isRunning = true
gr.runningMutex.Unlock()

results := make(map[string]*Result)
results := make([]*Result, 0, gr.runnersCount)

ch := make(chan *Result, gr.runnersCount) // no need to block writing results
gr.runners.Range(func(_, value any) bool {
Expand All @@ -94,45 +119,46 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result {
return true
})

var d time.Duration
// wait for a result or for the context to be done
select {
case result := <-ch:
results[result.RunnerID] = result
results = append(results, result)
case d = <-gr.interruptedCh:
results = append(results, &Result{
RunnerID: "_unknown_",
RunnerError: NewGroupTimeoutError(d),
})
case <-ctx.Done():
// Do nothing
}

// interrupt the rest of the runners
gr.runners.Range(func(_, value any) bool {
r := value.(*Runner)
if _, ok := results[r.ID]; !ok {
select {
case <-r.Finished():
// No data should be sent through the channel, so we'd be
// here only if the channel is closed. This means the task
// has finished and we don't need to interrupt. We do
// nothing in this case
default:
r.Interrupt()
}
}
return true
})
gr.Interrupt()

// Having notified that the context has been finished, we still need to
// wait for the rest of the results
for i := len(results); i < gr.runnersCount; i++ {
result := <-ch
results[result.RunnerID] = result
select {
case result := <-ch:
results = append(results, result)
case d2, ok := <-gr.interruptedCh:
if ok {
d = d2
}
results = append(results, &Result{
RunnerID: "_unknown_",
RunnerError: NewGroupTimeoutError(d),
})
}
}

close(ch)

values := make([]*Result, 0, gr.runnersCount)
for _, val := range results {
values = append(values, val)
}
return values
// Even if we reach the group time out and bail out early, tasks might
// be running and eventually deliver the result through the channel.
// We'll rely on the buffered channel so the tasks won't block and the
// data can be eventually garbage-collected along with the unused
// channel, so we won't close the channel here.
return results
}

// RunAsync will execute the tasks in the group asynchronously.
Expand All @@ -158,12 +184,38 @@ func (gr *GroupRunner) RunAsync(ch chan<- *Result) {
})

go func() {
result := <-interCh
var result *Result
var d time.Duration

select {
case result = <-interCh:
// result already assigned, so do nothing
case d = <-gr.interruptedCh:
// we aren't tracking which runners have finished and which are still
// running, so we'll use "_unknown_" as runner id
result = &Result{
RunnerID: "_unknown_",
RunnerError: NewGroupTimeoutError(d),
}
}
gr.Interrupt()

ch <- result
for i := 1; i < gr.runnersCount; i++ {
result = <-interCh
select {
case result = <-interCh:
// result already assigned, so do nothing
case d2, ok := <-gr.interruptedCh:
// if ok is true, d2 will have a good value; if false, the channel
// is closed and we get a default value
if ok {
d = d2
}
result = &Result{
RunnerID: "_unknown_",
RunnerError: NewGroupTimeoutError(d),
}
}
ch <- result
}
}()
Expand All @@ -179,14 +231,32 @@ func (gr *GroupRunner) RunAsync(ch chan<- *Result) {
// As said, this will affect ALL the tasks in the group. It isn't possible to
// try to stop just one task.
// If a task has finished, the corresponding stopper won't be called
//
// The interrupt timeout for the group will start after all the runners in the
// group have been notified. Note that, if the task's stopper for a runner
// takes a lot of time to return, it will delay the timeout's start, so it's
// advised that the stopper either returns fast or is run asynchronously.
func (gr *GroupRunner) Interrupt() {
gr.runners.Range(func(_, value any) bool {
r := value.(*Runner)
select {
case <-r.Finished():
default:
r.Interrupt()
}
return true
})
if gr.interrupted.CompareAndSwap(false, true) {
gr.runners.Range(func(_, value any) bool {
r := value.(*Runner)
select {
case <-r.Finished():
// No data should be sent through the channel, so we'd be
// here only if the channel is closed. This means the task
// has finished and we don't need to interrupt. We do
// nothing in this case
default:
r.Interrupt()
}
return true
})

_ = time.AfterFunc(gr.interruptDur, func() {
// timeout reached -> send it through the channel so our runner
// can abort
gr.interruptedCh <- gr.interruptDur
close(gr.interruptedCh)
})
}
}
101 changes: 97 additions & 4 deletions ocis-pkg/runner/grouprunner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var _ = Describe("GroupRunner", func() {
)))

task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
task3 := TimedTask(task3Ch, 6*time.Second)
Expect(func() {
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
Expand All @@ -77,7 +77,7 @@ var _ = Describe("GroupRunner", func() {

Expect(func() {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
task3 := TimedTask(task3Ch, 6*time.Second)
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
close(task3Ch)
Expand All @@ -89,7 +89,7 @@ var _ = Describe("GroupRunner", func() {
Describe("Run", func() {
It("Context is done", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
task3 := TimedTask(task3Ch, 6*time.Second)
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
close(task3Ch)
Expand Down Expand Up @@ -141,6 +141,75 @@ var _ = Describe("GroupRunner", func() {
)))
}, SpecTimeout(5*time.Second))

It("Context done and group timeout reached", func(ctx SpecContext) {
gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second))

gr.Add(runner.New("task1", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))

gr.Add(runner.New("task2", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))

// context will be done in 1 second
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

// spawn a new goroutine and return the result in the channel
ch2 := make(chan []*runner.Result)
go func(ch2 chan []*runner.Result) {
ch2 <- gr.Run(myCtx)
close(ch2)
}(ch2)

// context finishes in 1 sec, tasks will be interrupted
// group timeout will be reached after 2 extra seconds
Eventually(ctx, ch2).Should(Receive(ContainElements(
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
)))
}, SpecTimeout(5*time.Second))

It("Interrupted and group timeout reached", func(ctx SpecContext) {
gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second))

gr.Add(runner.New("task1", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))

gr.Add(runner.New("task2", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))

// context will be done in 10 second
myCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

// spawn a new goroutine and return the result in the channel
ch2 := make(chan []*runner.Result)
go func(ch2 chan []*runner.Result) {
ch2 <- gr.Run(myCtx)
close(ch2)
}(ch2)
gr.Interrupt()

// tasks will be interrupted
// group timeout will be reached after 2 extra seconds
Eventually(ctx, ch2).Should(Receive(ContainElements(
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
)))
}, SpecTimeout(5*time.Second))

It("Doble run panics", func(ctx SpecContext) {
// context will be done in 1 second
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
Expand Down Expand Up @@ -181,7 +250,7 @@ var _ = Describe("GroupRunner", func() {

It("Interrupt async", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
task3 := TimedTask(task3Ch, 6*time.Second)
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
close(task3Ch)
Expand All @@ -196,5 +265,29 @@ var _ = Describe("GroupRunner", func() {
Eventually(ctx, ch2).Should(Receive())
Eventually(ctx, ch2).Should(Receive())
}, SpecTimeout(5*time.Second))

It("Interrupt async group timeout reached", func(ctx SpecContext) {
gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second))

gr.Add(runner.New("task1", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))

gr.Add(runner.New("task2", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))

ch2 := make(chan *runner.Result)
gr.RunAsync(ch2)
gr.Interrupt()

// group timeout will be reached after 2 extra seconds
Eventually(ctx, ch2).Should(Receive(Equal(&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)})))
Eventually(ctx, ch2).Should(Receive(Equal(&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)})))
}, SpecTimeout(5*time.Second))
})
})
5 changes: 4 additions & 1 deletion ocis-pkg/runner/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (

var (
// DefaultInterruptDuration is the default value for the `WithInterruptDuration`
// This global value can be adjusted if needed.
// for the "regular" runners. This global value can be adjusted if needed.
DefaultInterruptDuration = 10 * time.Second
// DefaultGroupInterruptDuration is the default value for the `WithInterruptDuration`
// for the group runners. This global value can be adjusted if needed.
DefaultGroupInterruptDuration = 15 * time.Second
)

// Option defines a single option function.
Expand Down
Loading

0 comments on commit 05f684a

Please sign in to comment.