Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

always select next before making calls #8578

Merged
merged 3 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/next-before-making-calls.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: always select next before making calls

We now select the next client more often to spread out load

https://github.com/owncloud/ocis/pull/8578
10 changes: 10 additions & 0 deletions services/clientlog/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func (cl *ClientlogService) processEvent(event events.Event) {
return
}

gwc, err = cl.gatewaySelector.Next()
if err != nil {
cl.log.Error().Err(err).Interface("event", event).Msg("error getting gateway client")
return
}
var (
users []string
evType string
Expand Down Expand Up @@ -129,6 +134,11 @@ func (cl *ClientlogService) processEvent(event events.Event) {
InitiatorID: event.InitiatorID,
}

gwc, err = cl.gatewaySelector.Next()
if err != nil {
cl.log.Error().Err(err).Interface("event", event).Msg("error getting gateway client")
return
}
users, err = utils.GetSpaceMembers(ctx, e.ID.GetSpaceId(), gwc, utils.ViewerRole)
break
}
Expand Down
10 changes: 9 additions & 1 deletion services/graph/pkg/service/v0/api_drives_drive_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func (s DrivesDriveItemService) UnmountShare(ctx context.Context, resourceID sto
}

// Find all accepted shares for this resource
gatewayClient, err = s.gatewaySelector.Next()
if err != nil {
return err
}
receivedSharesResponse, err := gatewayClient.ListReceivedShares(ctx, &collaboration.ListReceivedSharesRequest{
Filters: []*collaboration.Filter{
{
Expand Down Expand Up @@ -188,6 +192,10 @@ func (s DrivesDriveItemService) MountShare(ctx context.Context, resourceID stora
UpdateMask: updateMask,
}

gatewayClient, err = s.gatewaySelector.Next()
if err != nil {
return libregraph.DriveItem{}, err
}
updateReceivedShareResponse, err := gatewayClient.UpdateReceivedShare(ctx, updateReceivedShareRequest)
switch errCode := errorcode.FromCS3Status(updateReceivedShareResponse.GetStatus(), err); {
case errCode == nil:
Expand All @@ -213,7 +221,7 @@ func (s DrivesDriveItemService) MountShare(ctx context.Context, resourceID stora
items, err := cs3ReceivedSharesToDriveItems(ctx, &s.logger, gatewayClient, s.identityCache, acceptedShares)
switch {
case err != nil:
return libregraph.DriveItem{}, nil
return libregraph.DriveItem{}, err
case len(items) != 1:
return libregraph.DriveItem{}, errorcode.New(errorcode.GeneralException, "failed to convert accepted shares into drive-item")
}
Expand Down
12 changes: 12 additions & 0 deletions services/storage-users/pkg/task/trash_bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func PurgeTrashBin(serviceAccountID string, deleteBefore time.Time, spaceType Sp
return err
}

gatewayClient, err = gatewaySelector.Next()
if err != nil {
return err
}
listStorageSpacesResponse, err := gatewayClient.ListStorageSpaces(ctx, &apiProvider.ListStorageSpacesRequest{
Filters: []*apiProvider.ListStorageSpacesRequest_Filter{
{
Expand All @@ -49,6 +53,10 @@ func PurgeTrashBin(serviceAccountID string, deleteBefore time.Time, spaceType Sp
ResourceId: storageSpace.GetRoot(),
}

gatewayClient, err = gatewaySelector.Next()
if err != nil {
return err
}
listRecycleResponse, err := gatewayClient.ListRecycle(ctx, &apiProvider.ListRecycleRequest{Ref: storageSpaceReference})
if err != nil {
return err
Expand All @@ -60,6 +68,10 @@ func PurgeTrashBin(serviceAccountID string, deleteBefore time.Time, spaceType Sp
continue
}

gatewayClient, err = gatewaySelector.Next()
if err != nil {
return err
}
purgeRecycleResponse, err := gatewayClient.PurgeRecycle(ctx, &apiProvider.PurgeRecycleRequest{
Ref: storageSpaceReference,
Key: recycleItem.Key,
Expand Down
25 changes: 19 additions & 6 deletions services/userlog/pkg/service/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/l10n"
Expand Down Expand Up @@ -51,7 +52,7 @@ type OC10Notification struct {
// Converter is responsible for converting eventhistory events to OC10Notifications
type Converter struct {
locale string
gwc gateway.GatewayAPIClient
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
serviceName string
translationPath string
defaultLanguage string
Expand All @@ -64,10 +65,10 @@ type Converter struct {
}

// NewConverter returns a new Converter
func NewConverter(ctx context.Context, loc string, gwc gateway.GatewayAPIClient, name, translationPath, defaultLanguage string) *Converter {
func NewConverter(ctx context.Context, loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], name, translationPath, defaultLanguage string) *Converter {
return &Converter{
locale: loc,
gwc: gwc,
gatewaySelector: gatewaySelector,
serviceName: name,
translationPath: translationPath,
defaultLanguage: defaultLanguage,
Expand Down Expand Up @@ -319,7 +320,11 @@ func (c *Converter) getSpace(ctx context.Context, spaceID string) (*storageprovi
if space, ok := c.spaces[spaceID]; ok {
return space, nil
}
space, err := utils.GetSpace(ctx, spaceID, c.gwc)
gwc, err := c.gatewaySelector.Next()
if err != nil {
return nil, err
}
space, err := utils.GetSpace(ctx, spaceID, gwc)
if err == nil {
c.spaces[spaceID] = space
}
Expand All @@ -330,7 +335,11 @@ func (c *Converter) getResource(ctx context.Context, resourceID *storageprovider
if r, ok := c.resources[resourceID.GetOpaqueId()]; ok {
return r, nil
}
resource, err := utils.GetResourceByID(ctx, resourceID, c.gwc)
gwc, err := c.gatewaySelector.Next()
if err != nil {
return nil, err
}
resource, err := utils.GetResourceByID(ctx, resourceID, gwc)
if err == nil {
c.resources[resourceID.GetOpaqueId()] = resource
}
Expand All @@ -341,7 +350,11 @@ func (c *Converter) getUser(_ context.Context, userID *user.UserId) (*user.User,
if u, ok := c.users[userID.GetOpaqueId()]; ok {
return u, nil
}
usr, err := utils.GetUser(userID, c.gwc)
gwc, err := c.gatewaySelector.Next()
if err != nil {
return nil, err
}
usr, err := utils.GetUser(userID, gwc)
if err == nil {
c.users[userID.GetOpaqueId()] = usr
}
Expand Down
3 changes: 1 addition & 2 deletions services/userlog/pkg/service/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,14 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
w.WriteHeader(http.StatusInternalServerError)
return
}

ctx, err = utils.GetServiceUserContext(ul.cfg.ServiceAccount.ServiceAccountID, gwc, ul.cfg.ServiceAccount.ServiceAccountSecret)
if err != nil {
ul.log.Error().Err(err).Msg("cant get service account")
w.WriteHeader(http.StatusInternalServerError)
return
}

conv := NewConverter(ctx, r.Header.Get(HeaderAcceptLanguage), gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage)
conv := NewConverter(ctx, r.Header.Get(HeaderAcceptLanguage), ul.gatewaySelector, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage)

var outdatedEvents []string
resp := GetEventResponseOC10{}
Expand Down
11 changes: 8 additions & 3 deletions services/userlog/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func (ul *UserlogService) processEvent(event events.Event) {
return
}

gwc, err = ul.gatewaySelector.Next()
if err != nil {
ul.log.Error().Err(err).Msg("cannot get gateway client")
return
}
switch e := event.Event.(type) {
default:
err = errors.New("unhandled event")
Expand Down Expand Up @@ -196,7 +201,7 @@ func (ul *UserlogService) processEvent(event events.Event) {

// IV) send sses
if !ul.cfg.DisableSSE {
if err := ul.sendSSE(ctx, users, event, gwc); err != nil {
if err := ul.sendSSE(ctx, users, event, ul.gatewaySelector); err != nil {
ul.log.Error().Err(err).Interface("userid", users).Str("eventid", event.ID).Msg("cannot create sse event")
}
}
Expand Down Expand Up @@ -337,7 +342,7 @@ func (ul *UserlogService) addEventToUser(userid string, event events.Event) erro
})
}

func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event events.Event, gwc gateway.GatewayAPIClient) error {
func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event events.Event, gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) error {
m := make(map[string]events.SendSSE)

for _, userid := range userIDs {
Expand All @@ -348,7 +353,7 @@ func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event e
continue
}

ev, err := NewConverter(ctx, loc, gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event)
ev, err := NewConverter(ctx, loc, gatewaySelector, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event)
if err != nil {
if utils.IsErrNotFound(err) || utils.IsErrPermissionDenied(err) {
// the resource was not found, we assume it is deleted
Expand Down