Skip to content

Commit

Permalink
feat(NSC): implement NodePort Health Check
Browse files Browse the repository at this point in the history
NodePort Health Check has long been part of the Kubernetes API, but
kube-router hasn't implemented it in the past. This is meant to be a
port that is assigned by the kube-controller-manager for LoadBalancer
services that have a traffic policy of `externalTrafficPolicy=Local`.

When set, the k8s networking implementation is meant to open a port and
provide HTTP responses that inform parties external to the Kubernetes
cluster about whether or not a local endpoint exists on the node. It
should return a 200 status if the node contains a local endpoint and
return a 503 status if the node does not contain a local endpoint.

This allows applications outside the cluster to choose their endpoint in
such a way that their source IP could be preserved. For more details
see:
https://kubernetes.io/docs/tutorials/services/source-ip/#source-ip-for-services-with-type-loadbalancer
  • Loading branch information
aauren committed Mar 1, 2024
1 parent 7aec8d0 commit 9a136c1
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 11 deletions.
29 changes: 18 additions & 11 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ type NetworkServicesController struct {

hpc *hairpinController
hpEndpointReceiver chan string

nphc *nodePortHealthCheckController
}

type ipvsCalls interface {
Expand Down Expand Up @@ -200,6 +202,7 @@ type serviceInfo struct {
intTrafficPolicy *v1.ServiceInternalTrafficPolicy
extTrafficPolicy *v1.ServiceExternalTrafficPolicy
flags schedFlags
healthCheckNodePort int
}

// IPVS scheduler flags
Expand Down Expand Up @@ -315,6 +318,7 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
nsc.mu.Lock()
nsc.readyForUpdates = false
nsc.mu.Unlock()
nsc.nphc.StopAll()
klog.Info("Shutting down network services controller")
return

Expand Down Expand Up @@ -923,17 +927,18 @@ func (nsc *NetworkServicesController) buildServicesInfo() serviceInfoMap {
extClusterPolicyDefault := v1.ServiceExternalTrafficPolicyCluster
for _, port := range svc.Spec.Ports {
svcInfo := serviceInfo{
clusterIP: net.ParseIP(svc.Spec.ClusterIP),
clusterIPs: make([]string, len(svc.Spec.ClusterIPs)),
port: int(port.Port),
targetPort: port.TargetPort.String(),
protocol: strings.ToLower(string(port.Protocol)),
nodePort: int(port.NodePort),
name: svc.ObjectMeta.Name,
namespace: svc.ObjectMeta.Namespace,
externalIPs: make([]string, len(svc.Spec.ExternalIPs)),
intTrafficPolicy: &intClusterPolicyDefault,
extTrafficPolicy: &extClusterPolicyDefault,
clusterIP: net.ParseIP(svc.Spec.ClusterIP),
clusterIPs: make([]string, len(svc.Spec.ClusterIPs)),
port: int(port.Port),
targetPort: port.TargetPort.String(),
protocol: strings.ToLower(string(port.Protocol)),
nodePort: int(port.NodePort),
name: svc.ObjectMeta.Name,
namespace: svc.ObjectMeta.Namespace,
externalIPs: make([]string, len(svc.Spec.ExternalIPs)),
intTrafficPolicy: &intClusterPolicyDefault,
extTrafficPolicy: &extClusterPolicyDefault,
healthCheckNodePort: int(svc.Spec.HealthCheckNodePort),
}
dsrMethod, ok := svc.ObjectMeta.Annotations[svcDSRAnnotation]
if ok {
Expand Down Expand Up @@ -2070,5 +2075,7 @@ func NewNetworkServicesController(clientset kubernetes.Interface,
nsc.hpEndpointReceiver = make(chan string)
nsc.hpc = NewHairpinController(&nsc, nsc.hpEndpointReceiver)

nsc.nphc = NewNodePortHealthCheck()

return &nsc, nil
}
185 changes: 185 additions & 0 deletions pkg/controllers/proxy/nodeport_healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package proxy

import (
"context"
"fmt"
"net/http"
"strconv"
"sync"
"time"

"k8s.io/klog/v2"
)

type nodePortHealthCheckController struct {
nphcServicesInfo
activeNPHC map[int](chan<- struct{})
wg *sync.WaitGroup
stopCh chan struct{}
}

type serviceHealthCheck struct {
serviceID string
nodePort int
}

type nphcServicesInfo struct {
serviceInfoMap serviceInfoMap
endpointsInfoMap endpointSliceInfoMap
}

type nphcHandler struct {
svcHC *serviceHealthCheck
nphc *nodePortHealthCheckController
}

func (nphc *nodePortHealthCheckController) UpdateServicesInfo(serviceInfoMap serviceInfoMap,
endpointsInfoMap endpointSliceInfoMap) error {
klog.V(1).Info("Running UpdateServicesInfo for NodePort health check")
nphc.serviceInfoMap = serviceInfoMap
nphc.endpointsInfoMap = endpointsInfoMap

newActiveServices := make(map[int]bool)

for svcID, svc := range serviceInfoMap {
if svc.healthCheckNodePort != 0 {
newActiveServices[svc.healthCheckNodePort] = true
svcHC := serviceHealthCheck{
serviceID: svcID,
nodePort: svc.healthCheckNodePort,
}
if nphc.healthCheckExists(svcHC) {
continue
}
err := nphc.addHealthCheck(svcHC)
if err != nil {
return err
}
}
}

for np := range nphc.activeNPHC {
if !newActiveServices[np] {
err := nphc.stopHealthCheck(np)
if err != nil {
klog.Errorf("error stopping the NodePort healthcheck on NodePort %d: %v", np, err)
}
}
}

klog.V(1).Info("Finished UpdateServicesInfo for NodePort health check")
return nil
}

func (nphc *nodePortHealthCheckController) healthCheckExists(svcHC serviceHealthCheck) bool {
if _, ok := nphc.activeNPHC[svcHC.nodePort]; ok {
return true
}
return false
}

func (nphc *nodePortHealthCheckController) addHealthCheck(svcHC serviceHealthCheck) error {
klog.V(1).Infof("Adding NodePort health check for port: %d with svcid: %s", svcHC.nodePort, svcHC.serviceID)
if nphc.healthCheckExists(svcHC) {
return fmt.Errorf("unable to add healthcheck for NodePort %d as it is already taken", svcHC.nodePort)
}
closingChan := make(chan struct{})
nphc.activeNPHC[svcHC.nodePort] = closingChan

nphc.wg.Add(1)
go func(nphc *nodePortHealthCheckController, svcHC serviceHealthCheck, closingChan <-chan struct{}) {
defer nphc.wg.Done()
mux := http.NewServeMux()
srv := &http.Server{
Addr: ":" + strconv.Itoa(svcHC.nodePort),
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
}

npHandler := nphcHandler{
svcHC: &svcHC,
nphc: nphc,
}
mux.HandleFunc("/healthz", npHandler.Handler)

nphc.wg.Add(1)
go func(svcHC serviceHealthCheck) {
defer nphc.wg.Done()
klog.Infof("starting NodePort health controller on NodePort: %d", svcHC.nodePort)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
// cannot panic, because this probably is an intentional close
klog.Errorf("could not start NodePort health controller on NodePort %d: %s", svcHC.nodePort, err)
}
}(svcHC)

// block until we receive a shut down signal on either our private channel or the global channel
select {
case <-closingChan:
case <-nphc.stopCh:
}
klog.Infof("shutting down NodePort health controller on NodePort: %d", svcHC.nodePort)
if err := srv.Shutdown(context.Background()); err != nil {
klog.Errorf("could not shutdown NodePort health controller on NodePort %d: %v", svcHC.nodePort, err)
}

}(nphc, svcHC, closingChan)

return nil
}

func (nphc *nodePortHealthCheckController) stopHealthCheck(nodePort int) error {
if _, ok := nphc.activeNPHC[nodePort]; !ok {
return fmt.Errorf("no NodePort health check currently exists for NodePort: %d", nodePort)
}

svcStopCh := nphc.activeNPHC[nodePort]
close(svcStopCh)

delete(nphc.activeNPHC, nodePort)

return nil
}

func (npHandler *nphcHandler) Handler(w http.ResponseWriter, r *http.Request) {
eps := npHandler.nphc.endpointsInfoMap[npHandler.svcHC.serviceID]
endpointsOnNode := hasActiveEndpoints(eps)

var numActiveEndpoints int8
for _, endpoint := range eps {
if endpoint.isLocal && !endpoint.isTerminating {
numActiveEndpoints++
}
}

if endpointsOnNode && numActiveEndpoints > 0 {
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(fmt.Sprintf("%d Service Endpoints found\n", numActiveEndpoints)))
if err != nil {
klog.Errorf("failed to write body: %s", err)
}
} else {
w.WriteHeader(http.StatusServiceUnavailable)
_, err := w.Write([]byte("No Service Endpoints Found\n"))
if err != nil {
klog.Errorf("Failed to write body: %s", err)
}
}
}

func (nphc *nodePortHealthCheckController) StopAll() {
klog.Info("Stopping all NodePort health checks")
close(nphc.stopCh)
klog.Info("Waiting for all NodePort health checks to finish shutting down")
nphc.wg.Wait()
klog.Info("All NodePort health checks are completely shut down, all done!")
}

func NewNodePortHealthCheck() *nodePortHealthCheckController {
nphc := nodePortHealthCheckController{
activeNPHC: make(map[int]chan<- struct{}),
wg: &sync.WaitGroup{},
stopCh: make(chan struct{}),
}

return &nphc
}
7 changes: 7 additions & 0 deletions pkg/controllers/proxy/service_endpoints_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ func (nsc *NetworkServicesController) syncIpvsServices(serviceInfoMap serviceInf
err.Error())
}

klog.V(1).Info("Setting up NodePort Health Checks for LB services")
err = nsc.nphc.UpdateServicesInfo(serviceInfoMap, endpointsInfoMap)
if err != nil {
syncErrors = true
klog.Errorf("Error setting up NodePort Health Checks for LB Services: %v", err)
}

klog.V(1).Info("Cleaning Up Stale VIPs from dummy interface")
err = nsc.cleanupStaleVIPs(activeServiceEndpointMap)
if err != nil {
Expand Down

0 comments on commit 9a136c1

Please sign in to comment.