Skip to content

Commit

Permalink
a potential way of cancelling correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
udsamani committed Jul 4, 2024
1 parent c242c5c commit b90dbc0
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 21 deletions.
17 changes: 9 additions & 8 deletions pkg/compute/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,6 @@ func (s BaseEndpoint) CancelExecution(ctx context.Context, request CancelExecuti
localExecutionState.Execution.ID, localExecutionState.State)
}

if localExecutionState.State.IsExecuting() {
err = s.executor.Cancel(ctx, localExecutionState)
if err != nil {
return CancelExecutionResponse{}, err
}
}

err = s.executionStore.UpdateExecutionState(ctx, store.UpdateExecutionStateRequest{
ExecutionID: request.ExecutionID,
NewState: store.ExecutionStateCancelled,
Expand All @@ -155,13 +148,21 @@ func (s BaseEndpoint) CancelExecution(ctx context.Context, request CancelExecuti
if err != nil {
return CancelExecutionResponse{}, err
}

if localExecutionState.State.IsExecuting() {
err = s.executor.Cancel(ctx, localExecutionState)
if err != nil {
return CancelExecutionResponse{}, err
}
}
return CancelExecutionResponse{
ExecutionMetadata: NewExecutionMetadata(localExecutionState.Execution),
}, nil
}

func (s BaseEndpoint) ExecutionLogs(ctx context.Context, request ExecutionLogsRequest) (
<-chan *concurrency.AsyncResult[models.ExecutionLog], error) {
<-chan *concurrency.AsyncResult[models.ExecutionLog], error,
) {
return s.logServer.GetLogStream(ctx, executor.LogStreamRequest{
ExecutionID: request.ExecutionID,
Tail: request.Tail,
Expand Down
15 changes: 9 additions & 6 deletions pkg/compute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/system"
)

const StorageDirectoryPerms = 0755
const StorageDirectoryPerms = 0o755

type BaseExecutorParams struct {
ID string
Expand Down Expand Up @@ -66,7 +66,8 @@ func prepareInputVolumes(
ctx context.Context,
strgprovider storage.StorageProvider,
storageDirectory string, inputSources ...*models.InputSource) (
[]storage.PreparedStorage, func(context.Context) error, error) {
[]storage.PreparedStorage, func(context.Context) error, error,
) {
inputVolumes, err := storage.ParallelPrepareStorage(ctx, strgprovider, storageDirectory, inputSources...)
if err != nil {
return nil, nil, err
Expand All @@ -80,7 +81,8 @@ func prepareWasmVolumes(
ctx context.Context,
strgprovider storage.StorageProvider,
storageDirectory string, wasmEngine wasmmodels.EngineSpec) (
map[string][]storage.PreparedStorage, func(context.Context) error, error) {
map[string][]storage.PreparedStorage, func(context.Context) error, error,
) {
importModuleVolumes, err := storage.ParallelPrepareStorage(ctx, strgprovider, storageDirectory, wasmEngine.ImportModules...)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -300,7 +302,7 @@ func (e *BaseExecutor) Run(ctx context.Context, state store.LocalExecutionState)
stopwatch := telemetry.Timer(ctx, jobDurationMilliseconds, state.Execution.Job.MetricAttributes()...)
topic := EventTopicExecutionRunning
defer func() {
if err != nil {
if err.Error() != executor.ErrAlreadyCancelled.Error() {
e.handleFailure(ctx, state, err, topic)
}
dur := stopwatch()
Expand Down Expand Up @@ -352,7 +354,7 @@ func (e *BaseExecutor) Run(ctx context.Context, state store.LocalExecutionState)
return err
}
if result.ErrorMsg != "" {
return fmt.Errorf("execution error: %s", result.ErrorMsg)
return fmt.Errorf(result.ErrorMsg)
}
jobsCompleted.Add(ctx, 1)

Expand Down Expand Up @@ -427,7 +429,8 @@ func (e *BaseExecutor) Run(ctx context.Context, state store.LocalExecutionState)

// Publish the result of an execution after it has been verified.
func (e *BaseExecutor) publish(ctx context.Context, localExecutionState store.LocalExecutionState,
resultFolder string) (publishedResult models.SpecConfig, err error) {
resultFolder string,
) (publishedResult models.SpecConfig, err error) {
execution := localExecutionState.Execution
log.Ctx(ctx).Debug().Msgf("Publishing execution %s", execution.ID)

Expand Down
11 changes: 7 additions & 4 deletions pkg/executor/docker/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func (e *Executor) Start(ctx context.Context, request *executor.RunCommandReques
// bacalhau execution label _before_ we do anything else. If we are able to find one then we
// will use that container in the executionHandler that we create.
containerID, err := e.FindRunningContainer(ctx, request.ExecutionID)

if err != nil {
// Unable to find a running container for this execution, we will instead check for a handler, and
// failing that will create a new container.
Expand Down Expand Up @@ -153,6 +152,7 @@ func (e *Executor) Start(ctx context.Context, request *executor.RunCommandReques
containerID = jobContainer.ID
}

childCtx, cancel := context.WithCancelCause(ctx)
handler := &executionHandler{
client: e.client,
logger: log.With().
Expand All @@ -169,12 +169,13 @@ func (e *Executor) Start(ctx context.Context, request *executor.RunCommandReques
waitCh: make(chan bool),
activeCh: make(chan bool),
running: atomic.NewBool(false),
cancelFunc: cancel,
}

// register the handler for this executionID
e.handlers.Put(request.ExecutionID, handler)
// run the container.
go handler.run(ctx)
go handler.run(childCtx)
return nil
}

Expand Down Expand Up @@ -234,7 +235,8 @@ func (e *Executor) Cancel(ctx context.Context, executionID string) error {
if !found {
return fmt.Errorf("canceling execution (%s): %w", executionID, executor.ErrNotFound)
}
return handler.kill(ctx)
handler.cancelFunc(executor.ErrAlreadyCancelled)
return nil
}

// GetLogStream provides a stream of output logs for a specific execution.
Expand Down Expand Up @@ -438,7 +440,8 @@ func configureDevices(ctx context.Context, resources *models.Resources) ([]conta
}

func makeContainerMounts(
ctx context.Context, inputs []storage.PreparedStorage, outputs []*models.ResultPath, resultsDir string) ([]mount.Mount, error) {
ctx context.Context, inputs []storage.PreparedStorage, outputs []*models.ResultPath, resultsDir string,
) ([]mount.Mount, error) {
// the actual mounts we will give to the container
// these are paths for both input and output data
var mounts []mount.Mount
Expand Down
11 changes: 8 additions & 3 deletions pkg/executor/docker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type executionHandler struct {
waitCh chan bool
// true until the run method returns
running *atomic.Bool
// cancel function
cancelFunc context.CancelCauseFunc

//
// results
Expand Down Expand Up @@ -89,9 +91,12 @@ func (h *executionHandler) run(ctx context.Context) {
select {
case <-ctx.Done():
// failure case, the context has been canceled. We are aborting this execution
reason := fmt.Errorf("context canceled while waiting on container status: %w", ctx.Err())
h.logger.Err(reason).Msg("cancel waiting on container status")
h.result = executor.NewFailedResult(reason.Error())
cause := context.Cause(ctx)
if cause == nil {
cause = fmt.Errorf("context canceled while waiting on container status: %w", ctx.Err())
}
h.logger.Err(cause).Msg("cancel waiting on container status")
h.result = executor.NewFailedResult(cause.Error())
// the context was canceled, bail.
return
case err := <-errCh:
Expand Down
2 changes: 2 additions & 0 deletions pkg/executor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ var (
// ErrAlreadyStarted is returned when trying to start an already started execution.
ErrAlreadyStarted = fmt.Errorf("execution already started")

ErrAlreadyCancelled = fmt.Errorf("execution already cancelled")

// ErrAlreadyComplete is returned when action is attempted on an execution that is already complete.
ErrAlreadyComplete = fmt.Errorf("execution already complete")

Expand Down

0 comments on commit b90dbc0

Please sign in to comment.