Skip to content

Commit

Permalink
fix: ensure the task hasn't finished before interrupt it
Browse files Browse the repository at this point in the history
  • Loading branch information
jvillafanez committed Apr 10, 2024
1 parent 6a64918 commit d4ad735
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
21 changes: 15 additions & 6 deletions ocis-pkg/runner/grouprunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,15 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result {
// interrupt the rest of the runners
for _, runner := range gr.runners {
if _, ok := results[runner.ID]; !ok {
// there might still be race conditions because the result might not have
// been made available even though the runner has finished. We assume
// that calling the `Interrupt` method multiple times and / or calling
// the `Interrupt` method when the task has finished is safe
runner.Interrupt()
select {
case <-runner.Finished:
// No data should be sent through the channel, so we'd be
// here only if the channel is closed. This means the task
// has finished and we don't need to interrupt. We do
// nothing in this case
default:
runner.Interrupt()
}
}
}

Expand Down Expand Up @@ -122,8 +126,13 @@ func (gr *GroupRunner) RunAsync(ch chan<- *Result) {
//
// As said, this will affect ALL the tasks in the group. It isn't possible to
// try to stop just one task.
// If a task has finished, the corresponding stopper won't be called
func (gr *GroupRunner) Interrupt() {
for _, runner := range gr.runners {
runner.Interrupt()
select {
case <-runner.Finished:
default:
runner.Interrupt()
}
}
}
10 changes: 10 additions & 0 deletions ocis-pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@ import (
// The ID of the runner is public to make identification easier, and the
// Result that it will generated will contain the same ID, so we can
// know which runner provided which result.
//
// The Finished channel can be used to know when the task has finished but the
// result hasn't been made available yet. The channel will be closed (without
// sending any message) when the task has finished.
// This can be used specially with the `RunAsync` method when multiple runners
// use the same channel: results could be waiting on your side of the channel
type Runner struct {
ID string
fn Runable
interrupt Stopper
Finished <-chan struct{}
}

// New will create a new runner.
Expand All @@ -28,6 +35,7 @@ func New(id string, fn Runable, interrupt Stopper) *Runner {
ID: id,
fn: fn,
interrupt: interrupt,
Finished: make(chan struct{}),
}
}

Expand Down Expand Up @@ -90,6 +98,8 @@ func (r *Runner) Interrupt() {
func (r *Runner) doTask(ch chan<- *Result, closeChan bool) {
err := r.fn()

close(r.Finished)

result := &Result{
RunnerID: r.ID,
RunnerError: err,
Expand Down

0 comments on commit d4ad735

Please sign in to comment.