Skip to content

Commit

Permalink
feat(storage-users): assimilate clean command into sessions
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed May 2, 2024
1 parent 1fad9ac commit 4941620
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 114 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/assimilate-clean-into-sessions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Assimilate `clean` into `sessions` command

We deprecated `ocis storage-user uploads clean` and added the same logic to `ocis storage-users uploads session --clean`

https://github.com/owncloud/ocis/pull/9041
269 changes: 155 additions & 114 deletions services/storage-users/pkg/command/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ import (
"github.com/owncloud/ocis/v2/services/storage-users/pkg/revaconfig"
)

// Session contains the information of an upload session
type Session struct {
ID string `json:"id"`
Space string `json:"space"`
Filename string `json:"filename"`
Offset int64 `json:"offset"`
Size int64 `json:"size"`
Executant userpb.UserId `json:"executant"`
SpaceOwner *userpb.UserId `json:"spaceowner,omitempty"`
Expires time.Time `json:"expires"`
Processing bool `json:"processing"`
ScanDate time.Time `json:"virus_scan_date"`
ScanResult string `json:"virus_scan_result"`
}

// Uploads is the entry point for the uploads command
func Uploads(cfg *config.Config) *cli.Command {
return &cli.Command{

Expand All @@ -47,6 +63,8 @@ func ListUploads(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
fmt.Println("Warning: This command is deprecated, use 'uploads sessions' instead")

f, ok := registry.NewFuncs[cfg.Driver]
if !ok {
fmt.Fprintf(os.Stderr, "Unknown filesystem driver '%s'\n", cfg.Driver)
Expand Down Expand Up @@ -109,6 +127,10 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
Name: "restart",
Usage: "send restart event for all listed sessions",
},
&cli.BoolFlag{
Name: "clean",
Usage: "remove uploads",
},
},
Before: func(c *cli.Context) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
Expand Down Expand Up @@ -141,138 +163,91 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
}
}

var b strings.Builder
filter := storage.UploadSessionFilter{}
if c.IsSet("processing") {
processingValue := c.Bool("processing")
filter.Processing = &processingValue
if !processingValue {
b.WriteString("Not ")
}
if b.Len() == 0 {
b.WriteString("Processing ")
} else {
b.WriteString("processing ")
}
}
if c.IsSet("expired") {
expiredValue := c.Bool("expired")
filter.Expired = &expiredValue
if !expiredValue {
if b.Len() == 0 {
b.WriteString("Not ")
} else {
b.WriteString(", not ")
}
}
if b.Len() == 0 {
b.WriteString("Expired ")
} else {
b.WriteString("expired ")
}
}
if b.Len() == 0 {
b.WriteString("Sessions")
} else {
b.WriteString("sessions")
}
if c.IsSet("id") {
idValue := c.String("id")
filter.ID = &idValue
b.WriteString(" with id '" + idValue + "'")
}
b.WriteString(":")
filter := buildFilter(c)
uploads, err := managingFS.ListUploadSessions(c.Context, filter)
if err != nil {
return err
}

var table *tw.Table
if c.Bool("json") {
for _, u := range uploads {
ref := u.Reference()
sr, sd := u.ScanData()
s := struct {
ID string `json:"id"`
Space string `json:"space"`
Filename string `json:"filename"`
Offset int64 `json:"offset"`
Size int64 `json:"size"`
Executant userpb.UserId `json:"executant"`
SpaceOwner *userpb.UserId `json:"spaceowner,omitempty"`
Expires time.Time `json:"expires"`
Processing bool `json:"processing"`
ScanDate time.Time `json:"virus_scan_date"`
ScanResult string `json:"virus_scan_result"`
}{
Space: ref.GetResourceId().GetSpaceId(),
ID: u.ID(),
Filename: u.Filename(),
Offset: u.Offset(),
Size: u.Size(),
Executant: u.Executant(),
SpaceOwner: u.SpaceOwner(),
Expires: u.Expires(),
Processing: u.IsProcessing(),
ScanDate: sd,
ScanResult: sr,
}
j, err := json.Marshal(s)
if err != nil {
fmt.Println(err)
}
fmt.Println(string(j))

if c.Bool("restart") {
if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{
UploadID: u.ID(),
Timestamp: utils.TSNow(),
}); err != nil {
fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID())
// if publishing fails there is no need to try publishing other events - they will fail too.
os.Exit(1)
}
}
}
} else {
var (
table *tw.Table
raw []Session
)

// Print what the user requested
fmt.Println(b.String())
if !c.Bool("json") {
fmt.Println(buildInfo(filter))

// start a table
table = tw.NewWriter(os.Stdout)
table.SetHeader([]string{"Space", "Upload Id", "Name", "Offset", "Size", "Executant", "Owner", "Expires", "Processing", "Scan Date", "Scan Result"})
table.SetAutoFormatHeaders(false)
}

for _, u := range uploads {
sr, sd := u.ScanData()
for _, u := range uploads {
ref := u.Reference()
sr, sd := u.ScanData()

session := Session{
Space: ref.GetResourceId().GetSpaceId(),
ID: u.ID(),
Filename: u.Filename(),
Offset: u.Offset(),
Size: u.Size(),
Executant: u.Executant(),
SpaceOwner: u.SpaceOwner(),
Expires: u.Expires(),
Processing: u.IsProcessing(),
ScanDate: sd,
ScanResult: sr,
}

if c.Bool("json") {
raw = append(raw, session)
} else {
table.Append([]string{
u.Reference().ResourceId.GetSpaceId(),
u.ID(),
u.Filename(),
strconv.FormatInt(u.Offset(), 10),
strconv.FormatInt(u.Size(), 10),
u.Executant().OpaqueId,
u.SpaceOwner().GetOpaqueId(),
u.Expires().Format(time.RFC3339),
strconv.FormatBool(u.IsProcessing()),
sd.Format(time.RFC3339),
sr,
session.Space,
session.ID,
session.Filename,
strconv.FormatInt(session.Offset, 10),
strconv.FormatInt(session.Size, 10),
session.Executant.OpaqueId,
session.SpaceOwner.GetOpaqueId(),
session.Expires.Format(time.RFC3339),
strconv.FormatBool(session.Processing),
session.ScanDate.Format(time.RFC3339),
session.ScanResult,
})
}

if c.Bool("restart") {
if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{
UploadID: u.ID(),
Timestamp: utils.TSNow(),
}); err != nil {
fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID())
// if publishing fails there is no need to try publishing other events - they will fail too.
os.Exit(1)
}
}

if c.Bool("restart") {
if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{
UploadID: u.ID(),
Timestamp: utils.TSNow(),
}); err != nil {
fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID())
// if publishing fails there is no need to try publishing other events - they will fail too.
os.Exit(1)
}
if c.Bool("clean") {
if err := u.Purge(c.Context); err != nil {
fmt.Fprintf(os.Stderr, "Failed to clean upload session '%s'\n", u.ID())
}
}

}

if !c.Bool("json") {
table.Render()
return nil
}

j, err := json.Marshal(raw)
if err != nil {
fmt.Println(err)
return err
}
fmt.Println(string(j))
return nil
},
}
Expand All @@ -287,6 +262,7 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
fmt.Println("Warning: This command is deprecated, use 'uploads sessions --clean' instead")
f, ok := registry.NewFuncs[cfg.Driver]
if !ok {
fmt.Fprintf(os.Stderr, "Unknown filesystem driver '%s'\n", cfg.Driver)
Expand Down Expand Up @@ -333,3 +309,68 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command {
},
}
}

func buildFilter(c *cli.Context) storage.UploadSessionFilter {
filter := storage.UploadSessionFilter{}
if c.IsSet("processing") {
processingValue := c.Bool("processing")
filter.Processing = &processingValue
}
if c.IsSet("expired") {
expiredValue := c.Bool("expired")
filter.Expired = &expiredValue
}
if c.IsSet("id") {
idValue := c.String("id")
filter.ID = &idValue
}
return filter
}

func buildInfo(filter storage.UploadSessionFilter) string {
var b strings.Builder
if filter.Processing != nil {
if !*filter.Processing {
b.WriteString("Not ")
}
if b.Len() == 0 {
b.WriteString("Processing")
} else {
b.WriteString("processing")
}
}

if filter.Expired != nil {
if b.Len() != 0 {
b.WriteString(", ")
}
if !*filter.Expired {
if b.Len() == 0 {
b.WriteString("Not ")
} else {
b.WriteString("not ")
}
}
if b.Len() == 0 {
b.WriteString("Expired")
} else {
b.WriteString("expired")
}
}

if b.Len() == 0 {
b.WriteString("Session")
} else {
b.WriteString(" session")
}

if filter.ID != nil {
b.WriteString(" with id '" + *filter.ID + "'")
} else {
// to make `session` plural
b.WriteString("s")
}

b.WriteString(":")
return b.String()
}
60 changes: 60 additions & 0 deletions services/storage-users/pkg/command/uploads_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package command

import (
"testing"

"github.com/cs3org/reva/v2/pkg/storage"
"github.com/test-go/testify/require"
)

func TestBuildInfo(t *testing.T) {
testCases := []struct {
alias string
filter storage.UploadSessionFilter
expectedInfo string
}{
{
alias: "empty filter",
filter: storage.UploadSessionFilter{},
expectedInfo: "Sessions:",
},
{
alias: "processing",
filter: storage.UploadSessionFilter{Processing: boolPtr(true)},
expectedInfo: "Processing sessions:",
},
{
alias: "processing and not expired",
filter: storage.UploadSessionFilter{Processing: boolPtr(true), Expired: boolPtr(false)},
expectedInfo: "Processing, not expired sessions:",
},
{
alias: "processing and expired",
filter: storage.UploadSessionFilter{Processing: boolPtr(true), Expired: boolPtr(true)},
expectedInfo: "Processing, expired sessions:",
},
{
alias: "with id",
filter: storage.UploadSessionFilter{ID: strPtr("123")},
expectedInfo: "Session with id '123':",
},
}

for _, tc := range testCases {
alias := tc.alias
filter := tc.filter
expectedInfo := tc.expectedInfo

t.Run(alias, func(t *testing.T) {
require.Equal(t, expectedInfo, buildInfo(filter))
})
}
}

func boolPtr(b bool) *bool {
return &b
}

func strPtr(s string) *string {
return &s
}

0 comments on commit 4941620

Please sign in to comment.