Skip to content

Commit

Permalink
fix: ensure runners provide a result after being interrupted
Browse files Browse the repository at this point in the history
  • Loading branch information
jvillafanez committed Apr 19, 2024
1 parent 5fe4718 commit a63bd8e
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 179 deletions.
18 changes: 9 additions & 9 deletions ocis-pkg/runner/grouprunner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ var _ = Describe("GroupRunner", func() {

task1Ch := make(chan error)
task1 := TimedTask(task1Ch, 30*time.Second)
gr.Add(runner.New("task1", task1, func() {
gr.Add(runner.New("task1", 30*time.Second, task1, func() {
task1Ch <- nil
close(task1Ch)
}))

task2Ch := make(chan error)
task2 := TimedTask(task2Ch, 20*time.Second)
gr.Add(runner.New("task2", task2, func() {
gr.Add(runner.New("task2", 30*time.Second, task2, func() {
task2Ch <- nil
close(task2Ch)
}))
Expand All @@ -35,7 +35,7 @@ var _ = Describe("GroupRunner", func() {
Describe("Add", func() {
It("Duplicated runner id panics", func() {
Expect(func() {
gr.Add(runner.New("task1", func() error {
gr.Add(runner.New("task1", 30*time.Second, func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
Expand Down Expand Up @@ -64,7 +64,7 @@ var _ = Describe("GroupRunner", func() {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
Expect(func() {
gr.Add(runner.New("task3", task3, func() {
gr.Add(runner.New("task3", 30*time.Second, task3, func() {
task3Ch <- nil
close(task3Ch)
}))
Expand All @@ -78,7 +78,7 @@ var _ = Describe("GroupRunner", func() {
Expect(func() {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
gr.Add(runner.New("task3", task3, func() {
gr.Add(runner.New("task3", 30*time.Second, task3, func() {
task3Ch <- nil
close(task3Ch)
}))
Expand All @@ -90,7 +90,7 @@ var _ = Describe("GroupRunner", func() {
It("Context is done", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
gr.Add(runner.New("task3", task3, func() {
gr.Add(runner.New("task3", 30*time.Second, task3, func() {
task3Ch <- nil
close(task3Ch)
}))
Expand All @@ -117,7 +117,7 @@ var _ = Describe("GroupRunner", func() {
It("One task finishes early", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 1*time.Second)
gr.Add(runner.New("task3", task3, func() {
gr.Add(runner.New("task3", 30*time.Second, task3, func() {
task3Ch <- nil
close(task3Ch)
}))
Expand Down Expand Up @@ -157,7 +157,7 @@ var _ = Describe("GroupRunner", func() {
It("Wait in channel", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 1*time.Second)
gr.Add(runner.New("task3", task3, func() {
gr.Add(runner.New("task3", 30*time.Second, task3, func() {
task3Ch <- nil
close(task3Ch)
}))
Expand All @@ -182,7 +182,7 @@ var _ = Describe("GroupRunner", func() {
It("Interrupt async", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
gr.Add(runner.New("task3", task3, func() {
gr.Add(runner.New("task3", 30*time.Second, task3, func() {
task3Ch <- nil
close(task3Ch)
}))
Expand Down
55 changes: 0 additions & 55 deletions ocis-pkg/runner/helper.go

This file was deleted.

87 changes: 0 additions & 87 deletions ocis-pkg/runner/helper_test.go

This file was deleted.

94 changes: 77 additions & 17 deletions ocis-pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,53 @@ package runner

import (
"context"
"fmt"
"sync/atomic"
"time"
)

// Runner represents the one executing a long running task, such as a server
// or a service.
// The ID of the runner is public to make identification easier, and the
// Result that it will generated will contain the same ID, so we can
// know which runner provided which result.
//
// Runners are intended to be used only once. Reusing them isn't possible.
// You'd need to create a new runner if you want to rerun the same task.
type Runner struct {
ID string
fn Runable
interrupt Stopper
running atomic.Bool
interrupted atomic.Bool
finished chan struct{}
ID string
interruptDur time.Duration
fn Runable
interrupt Stopper
running atomic.Bool
interrupted atomic.Bool
interruptedCh chan time.Duration
finished chan struct{}
}

// New will create a new runner.
// The runner will be created with the provided id (the id must be unique,
// otherwise undefined behavior might occur), and will run the provided
// runable task, using the "interrupt" function to stop that task if needed.
//
// The interrupt duration will be used to ensure the runner doesn't block
// forever. The interrupt duration will be used to start a timeout when the
// runner gets interrupted (either the context of the `Run` method is done
// or this runner's `Interrupt` method is called). If the timeout is reached,
// a timeout result will be returned instead of whatever result the task should
// be returning.
//
// Note that it's your responsibility to provide a proper stopper for the task.
// The runner will just call that method assuming it will be enough to
// eventually stop the task at some point.
func New(id string, fn Runable, interrupt Stopper) *Runner {
func New(id string, interruptDur time.Duration, fn Runable, interrupt Stopper) *Runner {
return &Runner{
ID: id,
fn: fn,
interrupt: interrupt,
finished: make(chan struct{}),
ID: id,
interruptDur: interruptDur,
fn: fn,
interrupt: interrupt,
interruptedCh: make(chan time.Duration),
finished: make(chan struct{}),
}
}

Expand All @@ -48,6 +64,11 @@ func New(id string, fn Runable, interrupt Stopper) *Runner {
// make the task to eventually complete.
//
// Once the task finishes, the result will be returned.
// When the context is done, or if the runner is interrupted, a timeout will
// start using the provided "interrupt duration". If this timeout is reached,
// a timeout result will be returned instead of the one from the task. This is
// intended to prevent blocking the main thread indefinitely. A suitable
// duration should be used depending on the task, usually 5, 10 or 30 secs
//
// Some nice things you can do:
// - Use signal.NotifyContext(...) to call the stopper and provide a clean
Expand Down Expand Up @@ -97,9 +118,24 @@ func (r *Runner) RunAsync(ch chan<- *Result) {
// in order for it to finish.
// The stopper will be called immediately, although it's expected the
// consequences to take a while (task might need a while to stop)
// A timeout will start using the provided "interrupt duration". Once that
// timeout is reached, the task must provide a result with a timeout error.
// Note that, even after returning the timeout result, the task could still
// be being executed and consuming resource.
// This method will be called only once. Further calls won't do anything
func (r *Runner) Interrupt() {
if r.interrupted.CompareAndSwap(false, true) {
go func() {
select {
case <-r.Finished():
// Task finished -> runner should be delivering the result
case <-time.After(r.interruptDur):
// timeout reached -> send it through the channel so our runner
// can abort
r.interruptedCh <- r.interruptDur
close(r.interruptedCh)
}
}()
r.interrupt()
}
}
Expand All @@ -115,17 +151,41 @@ func (r *Runner) Finished() <-chan struct{} {

// doTask will perform this runner's task and write the result in the provided
// channel. The channel will be closed if requested.
// A result will be provided when either the task finishes naturally or we
// reach the timeout after being interrupted
func (r *Runner) doTask(ch chan<- *Result, closeChan bool) {
err := r.fn()
tmpCh := make(chan *Result)

// spawn the task and return the result in a temporary channel
go func(tmpCh chan *Result) {
err := r.fn()

close(r.finished)

close(r.finished)
result := &Result{
RunnerID: r.ID,
RunnerError: err,
}
tmpCh <- result

result := &Result{
RunnerID: r.ID,
RunnerError: err,
close(tmpCh)
}(tmpCh)

// wait for the result in the temporary channel or until we get the
// interrupted signal
var result *Result
select {
case d := <-r.interruptedCh:
result = &Result{
RunnerID: r.ID,
RunnerError: fmt.Errorf("runner %s timed out after waiting for %s", r.ID, d.String()),
}
case result = <-tmpCh:
// Just assign the received value, nothing else to do
}
ch <- result

// send the result
ch <- result
if closeChan {
close(ch)
}
Expand Down
2 changes: 1 addition & 1 deletion ocis-pkg/runner/runner_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
. "github.com/onsi/gomega"
)

func TestGraph(t *testing.T) {
func TestRunner(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Runner Suite")
}
Loading

0 comments on commit a63bd8e

Please sign in to comment.