From a537831b6e038af71432676826cac94b8d9269b8 Mon Sep 17 00:00:00 2001 From: Guillaume LEGRAIN Date: Mon, 10 Jun 2024 10:53:07 +0200 Subject: [PATCH] feat: implement a new rate limit on the vault API to avoid 429 error --- pkg/config/config.go | 2 ++ pkg/k8smutator/k8smutator.go | 2 +- pkg/vault/handle_token.go | 51 +++++++++++++++++++++++++++--------- pkg/vault/vault.go | 20 +++++++------- 4 files changed, 52 insertions(+), 23 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 8fc166c..84b3004 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -26,6 +26,7 @@ type Config struct { SyncTTLSecond int `yaml:"syncTTLSecond" envconfig:"sync_ttl_second"` InjectorLabel string `yaml:"injectorLabel" envconfig:"injector_label"` DefaultEngine string `yaml:"defaultEngine" envconfig:"default_engine"` + VaultRateLimit int `yaml:"vaultRateLimit" envconfig:"vault_rate_limit"` } func NewConfig(configFile string) (*Config, error) { @@ -45,6 +46,7 @@ func NewConfig(configFile string) (*Config, error) { SyncTTLSecond: 300, InjectorLabel: "vault-db-injector", DefaultEngine: "databases", + VaultRateLimit: 50, } if configFile != "" { data, err := os.ReadFile(configFile) diff --git a/pkg/k8smutator/k8smutator.go b/pkg/k8smutator/k8smutator.go index e565f63..3ae26b7 100644 --- a/pkg/k8smutator/k8smutator.go +++ b/pkg/k8smutator/k8smutator.go @@ -91,7 +91,7 @@ func handlePodConfiguration(ctx context.Context, cfg *config.Config, dbConfs *[] logger.Errorf("Their is an issue with the db Configuration") return nil, "db-role not found", nil, err } - vaultConn := vault.NewConnector(cfg.VaultAddress, cfg.VaultAuthPath, cfg.KubeRole, vaultDbPath, dbConf.Role, tok) + vaultConn := vault.NewConnector(cfg.VaultAddress, cfg.VaultAuthPath, cfg.KubeRole, vaultDbPath, dbConf.Role, tok, cfg.VaultRateLimit) if err := vaultConn.Login(ctx); err != nil { return nil, dbConf.Role, nil, errors.Newf("cannot authenticate vault role: %s", err.Error()) } diff --git a/pkg/vault/handle_token.go b/pkg/vault/handle_token.go index 49fbeb9..4c46c21 100644 --- a/pkg/vault/handle_token.go +++ b/pkg/vault/handle_token.go @@ -10,6 +10,7 @@ import ( "github.com/numberly/vault-db-injector/pkg/config" "github.com/numberly/vault-db-injector/pkg/k8s" promInjector "github.com/numberly/vault-db-injector/pkg/prometheus" + "golang.org/x/time/rate" ) type KeyInformation struct { @@ -119,24 +120,35 @@ func (c *Connector) ListKeyInformations(ctx context.Context, path, prefix string var wg sync.WaitGroup keyInformationsChan := make(chan *KeyInformation, len(keys)) + // Create a rate limiter + rateLimit := rate.Limit(c.VaultRateLimit) // requests per second + limiter := rate.NewLimiter(rateLimit, 1) + for _, k := range keys { wg.Add(1) go func(k interface{}) { defer wg.Done() + + // Wait for the rate limiter + if err := limiter.Wait(ctx); err != nil { + c.Log.Errorf("Rate limiter error: %v", err) + return + } + podName := strings.TrimSuffix(k.(string), "/") // Utiliser le préfixe pour lire les données dataPath := fmt.Sprintf("%s/data/%s/%s", path, prefix, podName) podSecret, err := c.client.Logical().ReadWithContext(ctx, dataPath) if err != nil { - c.Log.Errorf("Error while trying to recover data informations for : %s: %v", podName, err) + c.Log.Errorf("Error while trying to recover data informations for: %s: %v", podName, err) return } if podSecret == nil || podSecret.Data == nil || podSecret.Data["data"] == nil { status, err := c.DeleteData(ctx, podName, path, podName, "", prefix) if err != nil { - c.Log.Errorf("Data for %s can't be deleted : %s with error : %s", podName, status, err.Error()) + c.Log.Errorf("Data for %s can't be deleted: %s with error: %s", podName, status, err.Error()) } return } @@ -189,61 +201,73 @@ func (c *Connector) HandleTokens(ctx context.Context, cfg *config.Config, keysIn return false } - // Créer une map pour une recherche rapide des podsInformations + // Create a map for quick lookup of pod information podInfoMap := make(map[string]k8s.PodInformations) for _, pi := range podsInformations { for _, uuid := range pi.PodNameUUIDs { podInfoMap[uuid] = pi } - } var KubePolicies []string KubePolicies = append(KubePolicies, c.authRole) _, err = c.CreateOrphanToken(ctx, "1h", KubePolicies) if err != nil { - c.Log.Errorf("Can't create orphan ticket : %v", err) + c.Log.Errorf("Can't create orphan ticket: %v", err) c.Log.Error("Token renew has been cancelled") return false } + + // Create a rate limiter + rateLimit := rate.Limit(cfg.VaultRateLimit) // requests per second + limiter := rate.NewLimiter(rateLimit, 1) + var wg sync.WaitGroup var isOk bool = true + for _, ki := range keysInformations { wg.Add(1) go func(ki *KeyInformation) { defer wg.Done() + // Wait for the rate limiter + if err := limiter.Wait(ctx); err != nil { + c.Log.Errorf("Rate limiter error: %v", err) + isOk = false + return + } + if _, found := podInfoMap[ki.PodNameUID]; found { err := c.RenewToken(ctx, ki.TokenId, ki.PodNameUID, ki.Namespace, SyncTTLSecond) if err != nil { - c.Log.Errorf("Can't renew Token with pod UUID : %s", ki.PodNameUID) + c.Log.Errorf("Can't renew Token with pod UUID: %s", ki.PodNameUID) isOk = false return } - err = c.RenewLease(ctx, ki.LeaseId, 86400*5, ki.PodNameUID, ki.Namespace) // Renew for 1week + err = c.RenewLease(ctx, ki.LeaseId, 86400*5, ki.PodNameUID, ki.Namespace) // Renew for 1 week if err != nil { - c.Log.Errorf("Can't renew Lease with pod UUID : %s", ki.PodNameUID) + c.Log.Errorf("Can't renew Lease with pod UUID: %s", ki.PodNameUID) isOk = false return } } else { leaseTooYoung, err := c.isLeaseTooYoung(ctx, ki.LeaseId) if err != nil { - c.Log.Debug("error while trying to retrieve lease age, lease will be cleaned") + c.Log.Debug("Error while trying to retrieve lease age, lease will be cleaned") } if leaseTooYoung { - c.Log.Infof("This lease : %s is too young to be cleaned up.", ki.LeaseId) + c.Log.Infof("This lease: %s is too young to be cleaned up.", ki.LeaseId) return } err = c.RevokeOrphanToken(ctx, ki.TokenId, ki.PodNameUID, ki.Namespace) if err != nil { - c.Log.Errorf("Can't revok Token with UUID : %s", ki.PodNameUID) + c.Log.Errorf("Can't revoke Token with UUID: %s", ki.PodNameUID) isOk = false return } status, err := c.DeleteData(ctx, ki.PodNameUID, secretName, ki.PodNameUID, ki.Namespace, prefix) if err != nil { - c.Log.Errorf("Data for %s can't be deleted : %s with error : %s", ki.PodNameUID, status, err.Error()) + c.Log.Errorf("Data for %s can't be deleted: %s with error: %s", ki.PodNameUID, status, err.Error()) isOk = false return } @@ -252,10 +276,11 @@ func (c *Connector) HandleTokens(ctx context.Context, cfg *config.Config, keysIn promInjector.RenewLeaseCount.DeleteLabelValues(ki.PodNameUID, ki.Namespace) promInjector.RenewTokenCount.DeleteLabelValues(ki.PodNameUID, ki.Namespace) promInjector.DataDeletedCount.DeleteLabelValues(ki.PodNameUID, ki.Namespace) - c.Log.Infof("Token has been revoked and data deleted : %s", status) + c.Log.Infof("Token has been revoked and data deleted: %s", status) } }(ki) } + wg.Wait() c.RevokeSelfToken(ctx, c.client.Token(), "", "") c.SetToken(c.K8sSaVaultToken) diff --git a/pkg/vault/vault.go b/pkg/vault/vault.go index b4aa8bc..f854857 100644 --- a/pkg/vault/vault.go +++ b/pkg/vault/vault.go @@ -28,6 +28,7 @@ type Connector struct { client *vault.Client RenewalInterval time.Duration Log logger.Logger + VaultRateLimit int } func (c *Connector) GetToken() string { @@ -46,15 +47,16 @@ type DbCreds struct { DbTokenId string } -func NewConnector(address string, authPath string, authRole string, dbMountPath string, dbRole string, token string) *Connector { +func NewConnector(address string, authPath string, authRole string, dbMountPath string, dbRole string, token string, VaultRateLimit int) *Connector { return &Connector{ - address: address, - authPath: authPath, - dbRole: dbRole, - dbMountPath: dbMountPath, - k8sSaToken: token, - authRole: authRole, - Log: logger.GetLogger(), + address: address, + authPath: authPath, + dbRole: dbRole, + dbMountPath: dbMountPath, + k8sSaToken: token, + authRole: authRole, + Log: logger.GetLogger(), + VaultRateLimit: VaultRateLimit, } } @@ -67,7 +69,7 @@ func ConnectToVault(ctx context.Context, cfg *config.Config) (*Connector, error) return nil, errors.Newf("cannot get ServiceAccount token: %s", err.Error()) } // Configure vault connection using serviceAccount token - vaultConn := NewConnector(cfg.VaultAddress, cfg.VaultAuthPath, cfg.KubeRole, "random", "random", tok) + vaultConn := NewConnector(cfg.VaultAddress, cfg.VaultAuthPath, cfg.KubeRole, "random", "random", tok, cfg.VaultRateLimit) if err := vaultConn.Login(ctx); // Assuming Login is modified to accept a context err != nil { promInjector.ConnectVaultError.WithLabelValues().Inc()