Skip to content

Commit

Permalink
feat!: support emitting errors from the bulk evaluator (#1338)
Browse files Browse the repository at this point in the history
Fixes #1328

### Improvement

flagd's core components are intended to be reused. This PR change
the`IStore` interface by allowing an error to be returned from `GetAll`.
This error is then propagated through `ResolveAllValues`. This change
enables custom `IStore` implementations to return errors and propagate
them through the resolver layer.

With this change, I have upgrade OFREP bulk evaluator and flagd RPC
`ResolveAll` with error propagation.

OFREP - Log warning with resolver error and return HTTP 500 with a
tracking reference
RPC - Log warning with resolver error and return an error with a
tracking reference

Signed-off-by: Kavindu Dodanduwa <[email protected]>
Co-authored-by: Todd Baert <[email protected]>
  • Loading branch information
Kavindu-Dodan and toddbaert committed Jun 27, 2024
1 parent f82c094 commit b9c099c
Show file tree
Hide file tree
Showing 15 changed files with 264 additions and 69 deletions.
2 changes: 1 addition & 1 deletion core/pkg/evaluator/fractional.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewFractional(logger *logger.Logger) *Fractional {
func (fe *Fractional) Evaluate(values, data any) any {
valueToDistribute, feDistributions, err := parseFractionalEvaluationData(values, data)
if err != nil {
fe.Logger.Error(fmt.Sprintf("parse fractional evaluation data: %v", err))
fe.Logger.Warn(fmt.Sprintf("parse fractional evaluation data: %v", err))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/pkg/evaluator/ievaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,5 @@ type IResolver interface {
ResolveAllValues(
ctx context.Context,
reqID string,
context map[string]any) (values []AnyValue)
context map[string]any) (values []AnyValue, err error)
}
46 changes: 14 additions & 32 deletions core/pkg/evaluator/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,22 @@ func NewResolver(store store.IStore, logger *logger.Logger, jsonEvalTracer trace
return Resolver{store: store, Logger: logger, tracer: jsonEvalTracer}
}

func (je *Resolver) ResolveAllValues(ctx context.Context, reqID string, context map[string]any) []AnyValue {
func (je *Resolver) ResolveAllValues(ctx context.Context, reqID string, context map[string]any) ([]AnyValue, error) {
_, span := je.tracer.Start(ctx, "resolveAll")
defer span.End()

var err error
allFlags, err := je.store.GetAll(ctx)
if err != nil {
return nil, fmt.Errorf("error retreiving flags from the store: %w", err)
}

values := []AnyValue{}
var value interface{}
var variant string
var reason string
var metadata map[string]interface{}
var err error
allFlags := je.store.GetAll(ctx)

for flagKey, flag := range allFlags {
if flag.State == Disabled {
// ignore evaluation of disabled flag
Expand All @@ -176,44 +181,21 @@ func (je *Resolver) ResolveAllValues(ctx context.Context, reqID string, context
defaultValue := flag.Variants[flag.DefaultVariant]
switch defaultValue.(type) {
case bool:
value, variant, reason, metadata, err = resolve[bool](
ctx,
reqID,
flagKey,
context,
je.evaluateVariant,
)
value, variant, reason, metadata, err = resolve[bool](ctx, reqID, flagKey, context, je.evaluateVariant)
case string:
value, variant, reason, metadata, err = resolve[string](
ctx,
reqID,
flagKey,
context,
je.evaluateVariant,
)
value, variant, reason, metadata, err = resolve[string](ctx, reqID, flagKey, context, je.evaluateVariant)
case float64:
value, variant, reason, metadata, err = resolve[float64](
ctx,
reqID,
flagKey,
context,
je.evaluateVariant,
)
value, variant, reason, metadata, err = resolve[float64](ctx, reqID, flagKey, context, je.evaluateVariant)
case map[string]any:
value, variant, reason, metadata, err = resolve[map[string]any](
ctx,
reqID,
flagKey,
context,
je.evaluateVariant,
)
value, variant, reason, metadata, err = resolve[map[string]any](ctx, reqID, flagKey, context, je.evaluateVariant)
}
if err != nil {
je.Logger.ErrorWithID(reqID, fmt.Sprintf("bulk evaluation: key: %s returned error: %s", flagKey, err.Error()))
}
values = append(values, NewAnyValue(value, variant, reason, flagKey, metadata, err))
}
return values

return values, nil
}

func (je *Resolver) ResolveBooleanValue(
Expand Down
21 changes: 17 additions & 4 deletions core/pkg/evaluator/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,11 @@ func TestResolveAllValues(t *testing.T) {
}
const reqID = "default"
for _, test := range tests {
vals := evaluator.ResolveAllValues(context.TODO(), reqID, test.context)
vals, err := evaluator.ResolveAllValues(context.TODO(), reqID, test.context)
if err != nil {
t.Error("error from resolver", err)
}

for _, val := range vals {
// disabled flag must be ignored from bulk evaluation
if val.FlagKey == DisabledFlag {
Expand Down Expand Up @@ -1234,21 +1238,30 @@ func TestFlagStateSafeForConcurrentReadWrites(t *testing.T) {
"Add_ResolveAllValues": {
dataSyncType: sync.ADD,
flagResolution: func(evaluator *evaluator.JSON) error {
evaluator.ResolveAllValues(context.TODO(), "", nil)
_, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
if err != nil {
return err
}
return nil
},
},
"Update_ResolveAllValues": {
dataSyncType: sync.UPDATE,
flagResolution: func(evaluator *evaluator.JSON) error {
evaluator.ResolveAllValues(context.TODO(), "", nil)
_, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
if err != nil {
return err
}
return nil
},
},
"Delete_ResolveAllValues": {
dataSyncType: sync.DELETE,
flagResolution: func(evaluator *evaluator.JSON) error {
evaluator.ResolveAllValues(context.TODO(), "", nil)
_, err := evaluator.ResolveAllValues(context.TODO(), "", nil)
if err != nil {
return err
}
return nil
},
},
Expand Down
147 changes: 145 additions & 2 deletions core/pkg/evaluator/mock/ievaluator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion core/pkg/service/ofrep/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,20 @@ func ContextErrorResponseFrom(key string) EvaluationError {
}
}

func BulkEvaluationContextErrorFrom() BulkEvaluationError {
func BulkEvaluationContextError() BulkEvaluationError {
return BulkEvaluationError{
ErrorCode: model.InvalidContextCode,
ErrorDetails: "Provider context is not valid",
}
}

func BulkEvaluationContextErrorFrom(code string, details string) BulkEvaluationError {
return BulkEvaluationError{
ErrorCode: code,
ErrorDetails: details,
}
}

func EvaluationErrorResponseFrom(result evaluator.AnyValue) (int, EvaluationError) {
payload := EvaluationError{
Key: result.FlagKey,
Expand Down
13 changes: 9 additions & 4 deletions core/pkg/store/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

type IStore interface {
GetAll(ctx context.Context) map[string]model.Flag
GetAll(ctx context.Context) (map[string]model.Flag, error)
Get(ctx context.Context, key string) (model.Flag, bool)
SelectorForFlag(ctx context.Context, flag model.Flag) string
}
Expand Down Expand Up @@ -90,7 +90,7 @@ func (f *Flags) String() (string, error) {
}

// GetAll returns a copy of the store's state (copy in order to be concurrency safe)
func (f *Flags) GetAll(_ context.Context) map[string]model.Flag {
func (f *Flags) GetAll(_ context.Context) (map[string]model.Flag, error) {
f.mx.RLock()
defer f.mx.RUnlock()
state := make(map[string]model.Flag, len(f.Flags))
Expand All @@ -99,7 +99,7 @@ func (f *Flags) GetAll(_ context.Context) map[string]model.Flag {
state[key] = flag
}

return state
return state, nil
}

// Add new flags from source.
Expand Down Expand Up @@ -187,7 +187,12 @@ func (f *Flags) DeleteFlags(logger *logger.Logger, source string, flags map[stri

notifications := map[string]interface{}{}
if len(flags) == 0 {
allFlags := f.GetAll(ctx)
allFlags, err := f.GetAll(ctx)
if err != nil {
logger.Error(fmt.Sprintf("error while retrieving flags from the store: %v", err))
return notifications
}

for key, flag := range allFlags {
if flag.Source != source {
continue
Expand Down
8 changes: 7 additions & 1 deletion flagd/pkg/service/flag-evaluation/flag_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,13 @@ func (s *OldFlagEvaluationService) ResolveAll(
if e := req.Msg.GetContext(); e != nil {
evalCtx = e.AsMap()
}
values := s.eval.ResolveAllValues(sCtx, reqID, evalCtx)

values, err := s.eval.ResolveAllValues(sCtx, reqID, evalCtx)
if err != nil {
s.logger.WarnWithID(reqID, fmt.Sprintf("error resolving all flags: %v", err))
return nil, fmt.Errorf("error resolving flags. Tracking ID: %s", reqID)
}

span.SetAttributes(attribute.Int("feature_flag.count", len(values)))
for _, value := range values {
// register the impression and reason for each flag evaluated
Expand Down
2 changes: 1 addition & 1 deletion flagd/pkg/service/flag-evaluation/flag_evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestConnectService_ResolveAll(t *testing.T) {
t.Run(name, func(t *testing.T) {
eval := mock.NewMockIEvaluator(ctrl)
eval.EXPECT().ResolveAllValues(gomock.Any(), gomock.Any(), gomock.Any()).Return(
tt.evalRes,
tt.evalRes, nil,
).AnyTimes()
metrics, exp := getMetricReader()
s := NewOldFlagEvaluationService(
Expand Down
Loading

0 comments on commit b9c099c

Please sign in to comment.