Skip to content

Commit

Permalink
Merge pull request #53 from tommasopozzetti/spread-vips
Browse files Browse the repository at this point in the history
Add annotation to allow spreading VIPs across keepalived instances
  • Loading branch information
David-Igou committed Mar 3, 2021
2 parents f46eca7 + 4ff6f76 commit da6d67b
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 26 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ Additionally, the fields can be edited manually via `oc edit Network.config.open
If the Keepalived pods are deployed on nodes which are in the same network (same broadcast domain to be precise) with other keepalived the process, it's necessary to ensure that there is no collision between the used routers it.
For this purpose it is possible to provide a `blacklistRouterIDs` field with a list of black-listed IDs that will not be used.

## Spreading VIPs across nodes to maximize load balancing

If a service contains multiple externalIPs or LoadBalancer IPs, it is possible to instruct keepalived-operator to maximize the spread of such VIPs across the nodes in the KeepalivedGroup by specifying the `keepalived-operator.redhat-cop.io/spreadvips: "true"` annotation on the service. This option ensures that different VIPs for the same service are always owned by different nodes (or, if the number of nodes in the group is less than the number of VIPs, that the VIPs are assigned maximizing the spread), to avoid creating a traffic bottleneck. However, in order to achieve this, keepalived-operator will create a separate VRRP instance per VIP of that service, which could exhaust the 256 available instances faster.
## OpenShift RHV, vSphere, OSP and bare metal IPI instructions
When IPI is used for RHV, vSphere, OSP or bare metal platforms, three keepalived VIPs are deployed. To make sure that keepalived-operator can work in these environment we need to discover and blacklist the corresponding VRRP router IDs.
Expand Down
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ rules:
- list
- patch
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
66 changes: 52 additions & 14 deletions config/templates/keepalived-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,22 @@
- name: keepalived
image: {{ .KeepalivedGroup.Spec.Image }}
command:
- /usr/sbin/keepalived
- -l
- -D
- -n
- /bin/bash
args:
- -f
- /etc/keepalived.d/keepalived.conf
- -p
- /etc/keepalived.pid/keepalived.pid
- -c
- >
exec /usr/sbin/keepalived
--log-console
--log-detail
--dont-fork
--config-id=${POD_NAME}
--use-file=/etc/keepalived.d/keepalived.conf
--pid=/etc/keepalived.pid/keepalived.pid
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
volumeMounts:
- mountPath: /lib/modules
name: lib-modules
Expand Down Expand Up @@ -134,30 +141,61 @@
{{- end }}

{{ $root:=. }}
{{ $verbatim_key:="keepalived-operator.redhat-cop.io/verbatimconfig"}}
{{ range .Services }}
{{ $namespacedName:=printf "%s/%s" .ObjectMeta.Namespace .ObjectMeta.Name }}
{{ $verbatim_key:="keepalived-operator.redhat-cop.io/verbatimconfig"}}
{{ $spread_key:="keepalived-operator.redhat-cop.io/spreadvips" }}
{{ range $service := .Services }}
{{ $namespacedName:=printf "%s/%s" $service.ObjectMeta.Namespace $service.ObjectMeta.Name }}
{{- if eq (index $service.GetAnnotations $spread_key) "true"}}
{{- range $i, $ip := (mergeStringSlices $service.Status.LoadBalancer.Ingress $service.Spec.ExternalIPs) }}
{{- $namespacedNameForIP := printf "%s/%s" $namespacedName $ip }}
{{- $owner := index $root.KeepalivedPods (modulus $i (len $root.KeepalivedPods)) }}
vrrp_instance {{ $namespacedNameForIP }} {
@{{ $owner.ObjectMeta.Name }} state MASTER
@^{{ $owner.ObjectMeta.Name }} state BACKUP
@{{ $owner.ObjectMeta.Name }} priority 200
@^{{ $owner.ObjectMeta.Name }} priority 100
interface {{ $root.KeepalivedGroup.Spec.Interface }}

virtual_router_id {{ index $root.KeepalivedGroup.Status.RouterIDs $namespacedNameForIP }}

virtual_ipaddress {
{{ $ip }}
}

{{- if eq $service.Spec.ExternalTrafficPolicy "Local" }}
track_script {
{{ $namespacedName }}
}
{{- end }}

{{ range $key , $value := (parseJson (index $service.GetAnnotations $verbatim_key)) }}
{{ $key }} {{ $value }}
{{ end }}
}
{{- end }}
{{- else }}
vrrp_instance {{ $namespacedName }} {
interface {{ $root.KeepalivedGroup.Spec.Interface }}

virtual_router_id {{ index $root.KeepalivedGroup.Status.RouterIDs $namespacedName }}

virtual_ipaddress {
{{ range mergeStringSlices .Status.LoadBalancer.Ingress .Spec.ExternalIPs }}
{{ range mergeStringSlices $service.Status.LoadBalancer.Ingress $service.Spec.ExternalIPs }}
{{ . }}
{{ end }}
}

{{- if eq .Spec.ExternalTrafficPolicy "Local" }}
{{- if eq $service.Spec.ExternalTrafficPolicy "Local" }}
track_script {
{{ $namespacedName }}
}
{{- end }}

{{ range $key , $value := (parseJson (index .GetAnnotations $verbatim_key)) }}
{{ range $key , $value := (parseJson (index $service.GetAnnotations $verbatim_key)) }}
{{ $key }} {{ $value }}
{{ end }}
}
{{- end }}
{{ end }}
{{ if eq .Misc.supportsPodMonitor "true" }}
- apiVersion: monitoring.coreos.com/v1
Expand Down
127 changes: 115 additions & 12 deletions controllers/keepalivedgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"sort"
"strings"
"text/template"

Expand All @@ -35,12 +37,14 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand All @@ -52,6 +56,8 @@ const (
imageNameEnv = "KEEPALIVED_OPERATOR_IMAGE_NAME"
keepalivedGroupAnnotation = "keepalived-operator.redhat-cop.io/keepalivedgroup"
keepalivedGroupVerbatimConfigAnnotation = "keepalived-operator.redhat-cop.io/verbatimconfig"
keepalivedSpreadVIPsAnnotation = "keepalived-operator.redhat-cop.io/spreadvips"
keepalivedGroupLabel = "keepalivedGroup"
podMonitorAPIVersion = "monitoring.coreos.com/v1"
podMonitorKind = "PodMonitor"
)
Expand Down Expand Up @@ -94,6 +100,7 @@ func (r *KeepalivedGroupReconciler) setSupportForPodMonitorAvailable() {
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=configmaps/finalizers,verbs=update
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch
// +kubebuilder:rbac:groups="apps",resources=daemonsets;daemonsets/finalizers,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="monitoring.coreos.com",resources=podmonitors,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="monitoring.coreos.com",resources=podmonitors/finalizers,verbs=update
Expand Down Expand Up @@ -138,6 +145,7 @@ func (r *KeepalivedGroupReconciler) Reconcile(context context.Context, req ctrl.
return reconcile.Result{}, nil
}

pods, err := r.getKeepalivedPods(instance)
services, err := r.getReferencingServices(instance)
if err != nil {
log.Error(err, "unable to get referencing services from", "instance", instance)
Expand All @@ -148,7 +156,7 @@ func (r *KeepalivedGroupReconciler) Reconcile(context context.Context, req ctrl.
log.Error(err, "unable assign router ids to", "instance", instance, "from services", services)
return r.ManageError(context, instance, err)
}
objs, err := r.processTemplate(instance, services)
objs, err := r.processTemplate(instance, services, pods)
if err != nil {
log.Error(err, "unable process keepalived template from", "instance", instance, "and from services", services)
return r.ManageError(context, instance, err)
Expand Down Expand Up @@ -181,7 +189,7 @@ func (r *KeepalivedGroupReconciler) Reconcile(context context.Context, req ctrl.
}

func (r *KeepalivedGroupReconciler) assignRouterIDs(instance *redhatcopv1alpha1.KeepalivedGroup, services []corev1.Service) (bool, error) {
assignedServices := []string{}
assignedInstances := []string{}
assignedIDs := []int{}
if len(instance.Spec.BlacklistRouterIDs) > 0 {
assignedIDs = append(assignedIDs, instance.Spec.BlacklistRouterIDs...)
Expand All @@ -195,16 +203,14 @@ func (r *KeepalivedGroupReconciler) assignRouterIDs(instance *redhatcopv1alpha1.
}
}
for key := range instance.Status.RouterIDs {
assignedServices = append(assignedServices, key)
assignedInstances = append(assignedInstances, key)
}
lbServices := []string{}
for _, service := range services {
lbServices = append(lbServices, apis.GetKeyShort(&service))
}
assignedServicesSet := strset.New(assignedServices...)
lbServicesSet := strset.New(lbServices...)
toBeRemovedSet := strset.Difference(assignedServicesSet, lbServicesSet)
toBeAddedSet := strset.Difference(lbServicesSet, assignedServicesSet)
vrrpInstances := servicesToVRRPInstances(services)

assignedInstancesSet := strset.New(assignedInstances...)
vrrpInstancesSet := strset.New(vrrpInstances...)
toBeRemovedSet := strset.Difference(assignedInstancesSet, vrrpInstancesSet)
toBeAddedSet := strset.Difference(vrrpInstancesSet, assignedInstancesSet)

for _, value := range toBeRemovedSet.List() {
delete(instance.Status.RouterIDs, value)
Expand Down Expand Up @@ -246,18 +252,50 @@ func findNextAvailableID(ids []int) (int, error) {
return 0, errors.New("cannot allocate more than 255 ids in one keepalived group")
}

func (r *KeepalivedGroupReconciler) processTemplate(instance *redhatcopv1alpha1.KeepalivedGroup, services []corev1.Service) (*[]unstructured.Unstructured, error) {
func servicesToVRRPInstances(services []corev1.Service) []string {
vrrpInstances := []string{}
for _, service := range services {
svcName := apis.GetKeyShort(&service)
if ann, ok := service.GetAnnotations()[keepalivedSpreadVIPsAnnotation]; ok && ann == "true" {
for _, ingress := range service.Status.LoadBalancer.Ingress {
vrrpInstances = append(vrrpInstances, svcName+"/"+ingress.IP)
}
for _, ip := range service.Spec.ExternalIPs {
vrrpInstances = append(vrrpInstances, svcName+"/"+ip)
}
} else {
vrrpInstances = append(vrrpInstances, svcName)
}
}

return vrrpInstances
}

func (r *KeepalivedGroupReconciler) processTemplate(instance *redhatcopv1alpha1.KeepalivedGroup, services []corev1.Service, pods []corev1.Pod) (*[]unstructured.Unstructured, error) {
// sort services and pods to ensure deterministic template output
sort.SliceStable(services, func(i, j int) bool {
if services[i].GetNamespace() == services[j].GetNamespace() {
return services[i].GetName() < services[j].GetName()
}
return services[i].GetNamespace() < services[j].GetNamespace()
})
sort.SliceStable(pods, func(i, j int) bool {
return pods[i].GetName() < pods[j].GetName()
})

imagename, ok := os.LookupEnv(imageNameEnv)
if !ok {
imagename = "quay.io/redhat-cop/keepalived-operator:latest"
}
objs, err := util.ProcessTemplateArray(struct {
KeepalivedGroup *redhatcopv1alpha1.KeepalivedGroup
Services []corev1.Service
KeepalivedPods []corev1.Pod
Misc map[string]string
}{
instance,
services,
pods,
map[string]string{
"image": imagename,
"supportsPodMonitor": r.supportsPodMonitors,
Expand All @@ -270,6 +308,16 @@ func (r *KeepalivedGroupReconciler) processTemplate(instance *redhatcopv1alpha1.
return &objs, nil
}

func (r *KeepalivedGroupReconciler) getKeepalivedPods(instance *redhatcopv1alpha1.KeepalivedGroup) ([]corev1.Pod, error) {
podList := &corev1.PodList{}
err := r.GetClient().List(context.TODO(), podList, &client.ListOptions{Namespace: instance.GetNamespace(), LabelSelector: labels.SelectorFromSet(map[string]string{keepalivedGroupLabel: instance.GetName()})})
if err != nil {
r.Log.Error(err, "unable to get list of keepalived pods")
return corev1.PodList{}.Items, err
}
return podList.Items, nil
}

func (r *KeepalivedGroupReconciler) getReferencingServices(instance *redhatcopv1alpha1.KeepalivedGroup) ([]corev1.Service, error) {
serviceList := &corev1.ServiceList{}
err := r.GetClient().List(context.TODO(), serviceList, &client.ListOptions{})
Expand Down Expand Up @@ -336,6 +384,7 @@ func (r *KeepalivedGroupReconciler) initializeTemplate() (*template.Template, er
}
return strset.Union(strset.New(s1...), strset.New(s2...)).List()
},
"modulus": func(a, b int) int { return a % b },
}).Parse(string(text))
if err != nil {
r.Log.Error(err, "Error parsing template", "template", string(text))
Expand Down Expand Up @@ -411,6 +460,56 @@ func getNamespacedName(namespaced string) (types.NamespacedName, error) {
}, nil
}

// Handler to issue reconciles for KeepalivedGroup resources based on changes on keepalived pods
func (r *KeepalivedGroupReconciler) requestsForKeepalivedPodChange(obj client.Object) []reconcile.Request {
pod, ok := obj.(*corev1.Pod)
if !ok {
r.Log.Error(fmt.Errorf("expected a Pod, got %T", pod), "could not process pod change")
return nil
}

keepalivedGroup, ok := pod.GetLabels()[keepalivedGroupLabel]
if !ok {
r.Log.Error(fmt.Errorf("could not extract keepalivedGroup from keepalived pod %s in namespace %s", pod.GetName(), pod.GetNamespace()), "could not process pod change")
return nil
}
return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: keepalivedGroup}}}
}

// PodChange is a predicate that filters Pod changes to issue KeepalivedGroup reconciles for creation and deletion of keepalived pods
type PodChange struct {
predicate.Funcs
}

// Update filters out pod updates
func (PodChange) Update(e event.UpdateEvent) bool {
return false
}

// Create filters out pod creations if they are not keepalived pods
func (PodChange) Create(e event.CreateEvent) bool {
pod, ok := e.Object.(*corev1.Pod)
if !ok {
return false
}
if _, ok := pod.GetLabels()[keepalivedGroupLabel]; !ok {
return false
}
return true
}

// Delete filters out pod deletions if they are not keepalived pods
func (PodChange) Delete(e event.DeleteEvent) bool {
pod, ok := e.Object.(*corev1.Pod)
if !ok {
return false
}
if _, ok := pod.GetLabels()[keepalivedGroupLabel]; !ok {
return false
}
return true
}

// SetupWithManager sets up the controller with the Manager.
func (r *KeepalivedGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.setSupportForPodMonitorAvailable()
Expand Down Expand Up @@ -469,5 +568,9 @@ func (r *KeepalivedGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
}}, &enqueueRequestForReferredKeepAlivedGroup{
Client: mgr.GetClient(),
}, builder.WithPredicates(isAnnotatedService)).
Watches(&source.Kind{Type: &corev1.Pod{}},
handler.EnqueueRequestsFromMapFunc(r.requestsForKeepalivedPodChange),
builder.WithPredicates(PodChange{}),
).
Complete(r)
}

0 comments on commit da6d67b

Please sign in to comment.