Skip to content

Commit

Permalink
Merge pull request #8782 from kobergj/FixPostprocessingRestart
Browse files Browse the repository at this point in the history
Repair Restart Postprocessing Logic
  • Loading branch information
kobergj committed Apr 9, 2024
2 parents 4fb4b5a + ce3e863 commit 5f96680
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 21 deletions.
8 changes: 8 additions & 0 deletions changelog/unreleased/fix-postprocessing-restart.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Bugfix: Fix restarting of postprocessing

When an upload is not found, the logic to restart postprocessing was bunked. Additionally we extended the upload sessions
command to be able to restart the uploads without using a second command.

NOTE: This also includes a breaking fix for the deprecated `ocis storage-users uploads list` command

https://github.com/owncloud/ocis/pull/8782
11 changes: 8 additions & 3 deletions services/postprocessing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,17 @@ See the [cs3 org](https://github.com/cs3org/reva/blob/edge/pkg/events/postproces

If postprocessing fails in one step due to an unforseen error, current uploads will not be retried automatically. A system admin can instead run a CLI command to retry the failed upload which is a two step process:

- First find the upload ID of the failed upload.
- First list ongoing upload sessions
```bash
ocis storage-users uploads list
ocis storage-users uploads sessions
```

- Then use the restart command to resume postprocessing of the ID selected.
- If you want to restart all uploads just rerun the command with the `--restart` flag
```bash
ocis storage-users uploads sessions --restart
```

- If you want to restart only one upload use the postprocessing restart command
```bash
ocis postprocessing restart -u <uploadID>
```
Expand Down
5 changes: 3 additions & 2 deletions services/postprocessing/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ func DefaultConfig() *config.Config {
MaxRetries: 14,
},
Store: config.Store{
Store: "memory",
Store: "nats-js-kv",
Nodes: []string{"127.0.0.1:9233"},
Database: "postprocessing",
Table: "postprocessing",
Table: "",
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func New(config config.Postprocessing) *Postprocessing {
}

// Init is the first step of the postprocessing
func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} {
func (pp *Postprocessing) Init(_ events.BytesReceived) interface{} {
if len(pp.Steps) == 0 {
return pp.finished(events.PPOutcomeContinue)
}
Expand Down
37 changes: 23 additions & 14 deletions services/postprocessing/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ type PostprocessingService struct {
}

var (
// errFatal is returned when a fatal error occurs and we want to exit.
errFatal = errors.New("fatal error")
// ErrFatal is returned when a fatal error occurs and we want to exit.
ErrFatal = errors.New("fatal error")
// ErrEvent is returned when something went wrong with a specific event.
errEvent = errors.New("event error")
ErrEvent = errors.New("event error")
// ErrNotFound is returned when a postprocessing is not found in the store.
ErrNotFound = errors.New("postprocessing not found")
)

// NewPostprocessingService returns a new instance of a postprocessing service
Expand Down Expand Up @@ -67,9 +69,9 @@ func (pps *PostprocessingService) Run() error {
err := pps.processEvent(e)
if err != nil {
switch {
case errors.Is(err, errFatal):
case errors.Is(err, ErrFatal):
return err
case errors.Is(err, errEvent):
case errors.Is(err, ErrEvent):
continue
default:
pps.log.Fatal().Err(err).Msg("unknown error - exiting")
Expand Down Expand Up @@ -111,7 +113,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
return fmt.Errorf("%w: cannot get upload", ErrEvent)
}
next = pp.NextStep(ev)

Expand Down Expand Up @@ -143,7 +145,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
return fmt.Errorf("%w: cannot get upload", ErrEvent)
}
next = pp.Delay()
case events.UploadReady:
Expand All @@ -155,7 +157,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
// the storage provider thinks the upload is done - so no need to keep it any more
if err := pps.store.Delete(ev.UploadID); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
return fmt.Errorf("%w: cannot delete upload", errEvent)
return fmt.Errorf("%w: cannot delete upload", ErrEvent)
}
case events.ResumePostprocessing:
return pps.handleResumePPEvent(ctx, ev)
Expand All @@ -166,14 +168,14 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {

if err := storePP(pps.store, pp); err != nil {
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
return fmt.Errorf("%w: cannot store upload", errEvent)
return fmt.Errorf("%w: cannot store upload", ErrEvent)
}
}

if next != nil {
if err := events.Publish(ctx, pps.pub, next); err != nil {
pps.log.Error().Err(err).Msg("unable to publish event")
return fmt.Errorf("%w: unable to publish event", errFatal) // we can't publish -> we are screwed
return fmt.Errorf("%w: unable to publish event", ErrFatal) // we can't publish -> we are screwed
}
}
return nil
Expand All @@ -182,10 +184,17 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
recs, err := sto.Read(uploadID)
if err != nil {
if err == store.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}

if len(recs) != 1 {
if len(recs) == 0 {
return nil, ErrNotFound
}

if len(recs) > 1 {
return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs))
}

Expand Down Expand Up @@ -231,7 +240,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev
for _, id := range ids {
if err := pps.resumePP(ctx, id); err != nil {
pps.log.Error().Str("uploadID", id).Err(err).Msg("cannot resume upload")
return fmt.Errorf("%w: cannot resume upload", errEvent)
return fmt.Errorf("cannot resume upload: %w", err)
}
}
return nil
Expand All @@ -240,7 +249,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev
func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string) error {
pp, err := pps.getPP(pps.store, uploadID)
if err != nil {
if err == store.ErrNotFound {
if err == ErrNotFound {
if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{
UploadID: uploadID,
Timestamp: utils.TSNow(),
Expand All @@ -249,7 +258,7 @@ func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string)
}
return nil
}
return fmt.Errorf("%w: cannot get upload", errEvent)
return fmt.Errorf("cannot get upload: %w", err)
}

return events.Publish(ctx, pps.pub, pp.CurrentStep())
Expand Down
41 changes: 40 additions & 1 deletion services/storage-users/pkg/command/uploads.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package command

import (
"context"
"encoding/json"
"fmt"
"os"
Expand All @@ -13,11 +14,14 @@ import (
"github.com/urfave/cli/v2"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/config"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/event"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/revaconfig"
)

Expand Down Expand Up @@ -69,7 +73,7 @@ func ListUploads(cfg *config.Config) *cli.Command {
fmt.Println("Incomplete uploads:")
for _, u := range uploads {
ref := u.Reference()
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", u.ID(), ref.GetResourceId().GetSpaceId(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
}
return nil
},
Expand Down Expand Up @@ -101,6 +105,10 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
Name: "json",
Usage: "output as json",
},
&cli.BoolFlag{
Name: "restart",
Usage: "send restart event for all listed sessions",
},
},
Before: func(c *cli.Context) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
Expand All @@ -124,6 +132,15 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
os.Exit(1)
}

var stream events.Stream
if c.Bool("restart") {
stream, err = event.NewStream(cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create event stream: %v\n", err)
os.Exit(1)
}
}

var b strings.Builder
filter := storage.UploadSessionFilter{}
if c.IsSet("processing") {
Expand Down Expand Up @@ -200,6 +217,17 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
fmt.Println(err)
}
fmt.Println(string(j))

if c.Bool("restart") {
if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{
UploadID: u.ID(),
Timestamp: utils.TSNow(),
}); err != nil {
fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID())
// if publishing fails there is no need to try publishing other events - they will fail too.
os.Exit(1)
}
}
}
} else {

Expand All @@ -223,6 +251,17 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
u.Expires().Format(time.RFC3339),
strconv.FormatBool(u.IsProcessing()),
})

if c.Bool("restart") {
if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{
UploadID: u.ID(),
Timestamp: utils.TSNow(),
}); err != nil {
fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID())
// if publishing fails there is no need to try publishing other events - they will fail too.
os.Exit(1)
}
}
}
table.Render()
}
Expand Down

0 comments on commit 5f96680

Please sign in to comment.