Skip to content

Commit

Permalink
always select next before making calls
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Mar 5, 2024
1 parent fa57da8 commit 44d86ce
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 12 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/next-before-making-calls.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: always select next before making calls

We now select the next client more often to spread out load

https://github.com/owncloud/ocis/pull/8578
12 changes: 11 additions & 1 deletion services/clientlog/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (cl *ClientlogService) Run() error {
func (cl *ClientlogService) processEvent(event events.Event) {
gwc, err := cl.gatewaySelector.Next()
if err != nil {
cl.log.Error().Err(err).Interface("event", event).Msg("error getting gatway client")
cl.log.Error().Err(err).Interface("event", event).Msg("error getting gateway client")
return
}

Expand All @@ -88,6 +88,11 @@ func (cl *ClientlogService) processEvent(event events.Event) {
return
}

gwc, err = cl.gatewaySelector.Next()
if err != nil {
cl.log.Error().Err(err).Interface("event", event).Msg("error getting gateway client")
return
}
var (
users []string
evType string
Expand All @@ -109,6 +114,11 @@ func (cl *ClientlogService) processEvent(event events.Event) {
ItemID: storagespace.FormatResourceID(*info.GetId()),
}

gwc, err := cl.gatewaySelector.Next()
if err != nil {
cl.log.Error().Err(err).Interface("event", event).Msg("error getting gateway client")
return
}
users, err = utils.GetSpaceMembers(ctx, info.GetSpace().GetId().GetOpaqueId(), gwc, utils.ViewerRole)
}

Expand Down
8 changes: 8 additions & 0 deletions services/graph/pkg/service/v0/api_drives_drive_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (s DrivesDriveItemService) UnmountShare(ctx context.Context, resourceID sto
}

// Find all accepted shares for this resource
gatewayClient, err = s.gatewaySelector.Next()
if err != nil {
return err
}
receivedSharesResponse, err := gatewayClient.ListReceivedShares(ctx, &collaboration.ListReceivedSharesRequest{
Filters: []*collaboration.Filter{
{
Expand Down Expand Up @@ -190,6 +194,10 @@ func (s DrivesDriveItemService) MountShare(ctx context.Context, resourceID stora
UpdateMask: updateMask,
}

gatewayClient, err = s.gatewaySelector.Next()
if err != nil {
return libregraph.DriveItem{}, err
}
updateReceivedShareResponse, err := gatewayClient.UpdateReceivedShare(ctx, updateReceivedShareRequest)
switch errCode := errorcode.FromCS3Status(updateReceivedShareResponse.GetStatus(), err); {
case errCode == nil:
Expand Down
12 changes: 12 additions & 0 deletions services/storage-users/pkg/task/trash_bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func PurgeTrashBin(serviceAccountID string, deleteBefore time.Time, spaceType Sp
return err
}

gatewayClient, err = gatewaySelector.Next()
if err != nil {
return err
}
listStorageSpacesResponse, err := gatewayClient.ListStorageSpaces(ctx, &apiProvider.ListStorageSpacesRequest{
Filters: []*apiProvider.ListStorageSpacesRequest_Filter{
{
Expand All @@ -49,6 +53,10 @@ func PurgeTrashBin(serviceAccountID string, deleteBefore time.Time, spaceType Sp
ResourceId: storageSpace.GetRoot(),
}

gatewayClient, err = gatewaySelector.Next()
if err != nil {
return err
}
listRecycleResponse, err := gatewayClient.ListRecycle(ctx, &apiProvider.ListRecycleRequest{Ref: storageSpaceReference})
if err != nil {
return err
Expand All @@ -60,6 +68,10 @@ func PurgeTrashBin(serviceAccountID string, deleteBefore time.Time, spaceType Sp
continue
}

gatewayClient, err = gatewaySelector.Next()
if err != nil {
return err
}
purgeRecycleResponse, err := gatewayClient.PurgeRecycle(ctx, &apiProvider.PurgeRecycleRequest{
Ref: storageSpaceReference,
Key: recycleItem.Key,
Expand Down
25 changes: 19 additions & 6 deletions services/userlog/pkg/service/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/leonelquinteros/gotext"
Expand Down Expand Up @@ -52,7 +53,7 @@ type OC10Notification struct {
// Converter is responsible for converting eventhistory events to OC10Notifications
type Converter struct {
locale string
gwc gateway.GatewayAPIClient
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
serviceName string
translationPath string
defaultLanguage string
Expand All @@ -65,10 +66,10 @@ type Converter struct {
}

// NewConverter returns a new Converter
func NewConverter(ctx context.Context, loc string, gwc gateway.GatewayAPIClient, name, translationPath, defaultLanguage string) *Converter {
func NewConverter(ctx context.Context, loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], name, translationPath, defaultLanguage string) *Converter {
return &Converter{
locale: loc,
gwc: gwc,
gatewaySelector: gatewaySelector,
serviceName: name,
translationPath: translationPath,
defaultLanguage: defaultLanguage,
Expand Down Expand Up @@ -320,7 +321,11 @@ func (c *Converter) getSpace(ctx context.Context, spaceID string) (*storageprovi
if space, ok := c.spaces[spaceID]; ok {
return space, nil
}
space, err := utils.GetSpace(ctx, spaceID, c.gwc)
gwc, err := c.gatewaySelector.Next()
if err != nil {
return nil, err
}
space, err := utils.GetSpace(ctx, spaceID, gwc)
if err == nil {
c.spaces[spaceID] = space
}
Expand All @@ -331,7 +336,11 @@ func (c *Converter) getResource(ctx context.Context, resourceID *storageprovider
if r, ok := c.resources[resourceID.GetOpaqueId()]; ok {
return r, nil
}
resource, err := utils.GetResourceByID(ctx, resourceID, c.gwc)
gwc, err := c.gatewaySelector.Next()
if err != nil {
return nil, err
}
resource, err := utils.GetResourceByID(ctx, resourceID, gwc)
if err == nil {
c.resources[resourceID.GetOpaqueId()] = resource
}
Expand All @@ -342,7 +351,11 @@ func (c *Converter) getUser(ctx context.Context, userID *user.UserId) (*user.Use
if u, ok := c.users[userID.GetOpaqueId()]; ok {
return u, nil
}
usr, err := utils.GetUser(userID, c.gwc)
gwc, err := c.gatewaySelector.Next()
if err != nil {
return nil, err
}
usr, err := utils.GetUser(userID, gwc)
if err == nil {
c.users[userID.GetOpaqueId()] = usr
}
Expand Down
3 changes: 1 addition & 2 deletions services/userlog/pkg/service/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,14 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
w.WriteHeader(http.StatusInternalServerError)
return
}

ctx, err = utils.GetServiceUserContext(ul.cfg.ServiceAccount.ServiceAccountID, gwc, ul.cfg.ServiceAccount.ServiceAccountSecret)
if err != nil {
ul.log.Error().Err(err).Msg("cant get service account")
w.WriteHeader(http.StatusInternalServerError)
return
}

conv := NewConverter(ctx, r.Header.Get(HeaderAcceptLanguage), gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage)
conv := NewConverter(ctx, r.Header.Get(HeaderAcceptLanguage), ul.gatewaySelector, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage)

var outdatedEvents []string
resp := GetEventResponseOC10{}
Expand Down
11 changes: 8 additions & 3 deletions services/userlog/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func (ul *UserlogService) processEvent(event events.Event) {
return
}

gwc, err = ul.gatewaySelector.Next()
if err != nil {
ul.log.Error().Err(err).Msg("cannot get gateway client")
return
}
switch e := event.Event.(type) {
default:
err = errors.New("unhandled event")
Expand Down Expand Up @@ -198,7 +203,7 @@ func (ul *UserlogService) processEvent(event events.Event) {

// IV) send sses
if !ul.cfg.DisableSSE {
if err := ul.sendSSE(ctx, users, event, gwc); err != nil {
if err := ul.sendSSE(ctx, users, event, ul.gatewaySelector); err != nil {
ul.log.Error().Err(err).Interface("userid", users).Str("eventid", event.ID).Msg("cannot create sse event")
}
}
Expand Down Expand Up @@ -339,7 +344,7 @@ func (ul *UserlogService) addEventToUser(userid string, event events.Event) erro
})
}

func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event events.Event, gwc gateway.GatewayAPIClient) error {
func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event events.Event, gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) error {
m := make(map[string]events.SendSSE)

for _, userid := range userIDs {
Expand All @@ -350,7 +355,7 @@ func (ul *UserlogService) sendSSE(ctx context.Context, userIDs []string, event e
continue
}

ev, err := NewConverter(ctx, loc, gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event)
ev, err := NewConverter(ctx, loc, gatewaySelector, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.DefaultLanguage).ConvertEvent(event.ID, event.Event)
if err != nil {
if utils.IsErrNotFound(err) || utils.IsErrPermissionDenied(err) {
// the resource was not found, we assume it is deleted
Expand Down

0 comments on commit 44d86ce

Please sign in to comment.