Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: failed placement change need to trigger work generator copy to binding #879

Merged
merged 4 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/controllers/clusterresourcebindingwatcher/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"

Expand All @@ -44,10 +43,13 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
klog.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

ctx, cancel = context.WithCancel(context.TODO())

By("Setup klog")
fs := flag.NewFlagSet("klog", flag.ContinueOnError)
klog.InitFlags(fs)
Expect(fs.Parse([]string{"--v", "5", "-add_dir_header", "true"})).Should(Succeed())

By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("../../../", "config", "crd", "bases")},
Expand Down
12 changes: 9 additions & 3 deletions pkg/controllers/clusterresourcebindingwatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/condition"
"go.goms.io/fleet/pkg/utils/controller"
)
Expand Down Expand Up @@ -99,7 +100,7 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
klog.ErrorS(err, "Failed to process update event")
return false
}
return areConditionsUpdated(oldBinding, newBinding)
return isBindingStatusUpdated(oldBinding, newBinding)
},
}

Expand All @@ -109,14 +110,19 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func areConditionsUpdated(oldBinding, newBinding *fleetv1beta1.ClusterResourceBinding) bool {
func isBindingStatusUpdated(oldBinding, newBinding *fleetv1beta1.ClusterResourceBinding) bool {
for i := condition.RolloutStartedCondition; i < condition.TotalCondition; i++ {
oldCond := oldBinding.GetCondition(string(i.ResourceBindingConditionType()))
newCond := newBinding.GetCondition(string(i.ResourceBindingConditionType()))
// oldCond.ObservedGeneration will always be less than or equal to newCond.ObservedGeneration.
if !condition.EqualCondition(oldCond, newCond) {
klog.V(2).InfoS("The binding condition has changed, need to update the corresponding CRP", "oldBinding", klog.KObj(oldBinding), "newBinding", klog.KObj(newBinding))
return true
}
}
if !utils.IsFailedResourcePlacementsEqual(oldBinding.Status.FailedPlacements, newBinding.Status.FailedPlacements) {
klog.V(2).InfoS("The binding failed placement has changed, need to update the corresponding CRP", "oldBinding", klog.KObj(oldBinding), "newBinding", klog.KObj(newBinding))
return true
}
klog.V(5).InfoS("The binding status has not changed, no need to update the corresponding CRP", "oldBinding", klog.KObj(oldBinding), "newBinding", klog.KObj(newBinding))
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,11 @@ var _ = Describe("Test ClusterResourceBinding Watcher - update metadata", Serial
})

AfterEach(func() {
crb.Name = testCRBName
By("Deleting the clusterResourceBinding")
Expect(k8sClient.Delete(ctx, crb)).Should(Succeed(), "failed to delete cluster resource binding")
})

It("Should not enqueue the clusterResourcePlacement name for reconciling, when clusterResourceBinding spec, status doesn't change", func() {
It("Should not enqueue the clusterResourcePlacement name for reconciling, when only meta data changed", func() {
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
labels := crb.GetLabels()
labels["test-key"] = "test-value"
Expand All @@ -79,12 +78,23 @@ var _ = Describe("Test ClusterResourceBinding Watcher - update metadata", Serial
By("Checking placement controller queue")
consistentlyCheckPlacementControllerQueueIsEmpty()
})

It("Should not enqueue the clusterResourcePlacement name for reconciling, when only spec changed", func() {
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
crb.Spec.State = fleetv1beta1.BindingStateBound
Expect(k8sClient.Update(ctx, crb)).Should(Succeed(), "failed to update cluster resource binding")

By("Checking placement controller queue")
consistentlyCheckPlacementControllerQueueIsEmpty()
})
})

// This container cannot be run in parallel with other ITs because it uses a shared fakePlacementController. These tests are also ordered.
var _ = Describe("Test ClusterResourceBinding Watcher - update status", Serial, Ordered, func() {
var crb *fleetv1beta1.ClusterResourceBinding
var currentTime metav1.Time
BeforeAll(func() {
currentTime = metav1.Now()
fakePlacementController.ResetQueue()
By("Creating a new clusterResourceBinding")
crb = clusterResourceBindingForTest()
Expand Down Expand Up @@ -123,30 +133,131 @@ var _ = Describe("Test ClusterResourceBinding Watcher - update status", Serial,
validateWhenUpdateClusterResourceBindingStatusWithCondition(fleetv1beta1.ResourceBindingAvailable, crb.Generation, metav1.ConditionFalse, testReason1)
})

It("Should enqueue the clusterResourcePlacement name for reconciling, when condition's observed generation changes", func() {
validateWhenUpdateClusterResourceBindingStatusWithCondition(fleetv1beta1.ResourceBindingRolloutStarted, crb.Generation+1, metav1.ConditionFalse, testReason1)
})

It("Should enqueue the clusterResourcePlacement name for reconciling, when condition's reason changes", func() {
validateWhenUpdateClusterResourceBindingStatusWithCondition(fleetv1beta1.ResourceBindingOverridden, crb.Generation, metav1.ConditionFalse, testReason2)
})

It("Should not enqueue the clusterResourcePlacement name for reconciling, when only condition's last transition time changes", func() {
It("Should enqueue the clusterResourcePlacement name for reconciling, when condition's observed generation changes", func() {
crb := &fleetv1beta1.ClusterResourceBinding{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
condition := metav1.Condition{
Type: string(fleetv1beta1.ResourceBindingOverridden),
ObservedGeneration: crb.Generation + 1,
Status: metav1.ConditionFalse,
Reason: testReason2,
LastTransitionTime: currentTime,
}
By(fmt.Sprintf("Updating the clusterResourceBinding status - %s, %d, %s, %s", fleetv1beta1.ResourceBindingOverridden, crb.Generation, metav1.ConditionFalse, testReason2))
crb.SetConditions(condition)
Expect(k8sClient.Status().Update(ctx, crb)).Should(Succeed(), "failed to update cluster resource binding status")

By("Checking placement controller queue")
eventuallyCheckPlacementControllerQueue(crb.GetLabels()[fleetv1beta1.CRPTrackingLabel])
fakePlacementController.ResetQueue()
})

It("Should not enqueue the clusterResourcePlacement name for reconciling, when only condition's last transition time changes", func() {
crb := &fleetv1beta1.ClusterResourceBinding{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
newTime := metav1.NewTime(currentTime.Add(10 * time.Second))
condition := metav1.Condition{
Type: string(fleetv1beta1.ResourceBindingOverridden),
ObservedGeneration: crb.Generation,
Status: metav1.ConditionFalse,
Reason: testReason2,
LastTransitionTime: metav1.Now(),
LastTransitionTime: newTime,
}
By(fmt.Sprintf("Updating the clusterResourceBinding status - %s, %d, %s, %s", fleetv1beta1.ResourceBindingOverridden, crb.Generation, metav1.ConditionFalse, testReason2))
crb.SetConditions(condition)
Expect(k8sClient.Status().Update(ctx, crb)).Should(Succeed(), "failed to update cluster resource binding status")

consistentlyCheckPlacementControllerQueueIsEmpty()
})

Context("Should enqueue the clusterResourcePlacement name for reconciling, when the failed placement list has changed", Serial, Ordered, func() {
It("Should enqueue the clusterResourcePlacement name for reconciling, when there are new failed placements", func() {
crb := &fleetv1beta1.ClusterResourceBinding{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
crb.Status.FailedPlacements = []fleetv1beta1.FailedResourcePlacement{
{
ResourceIdentifier: fleetv1beta1.ResourceIdentifier{
Group: "",
Version: "v1",
Kind: "Service",
Name: "svc-name",
Namespace: "svc-namespace",
},
Condition: metav1.Condition{
Type: fleetv1beta1.WorkConditionTypeAvailable,
Status: metav1.ConditionFalse,
Reason: "fakeFailedAvailableReason",
Message: "fakeFailedAvailableMessage",
LastTransitionTime: metav1.Now(),
},
},
{
ResourceIdentifier: fleetv1beta1.ResourceIdentifier{
Group: "",
Version: "v1",
Kind: "ConfigMap",
Name: "config-name",
Namespace: "config-namespace",
},
Condition: metav1.Condition{
Type: fleetv1beta1.WorkConditionTypeAvailable,
Status: metav1.ConditionFalse,
Reason: "fakeFailedAvailableReason",
Message: "fakeFailedAvailableMessage",
LastTransitionTime: metav1.Now(),
},
},
}
Expect(k8sClient.Status().Update(ctx, crb)).Should(Succeed(), "failed to update cluster resource binding status")

By("Checking placement controller queue")
eventuallyCheckPlacementControllerQueue(crb.GetLabels()[fleetv1beta1.CRPTrackingLabel])
fakePlacementController.ResetQueue()
})

It("Should enqueue the clusterResourcePlacement name for reconciling, when there are one less failed placements", func() {
crb := &fleetv1beta1.ClusterResourceBinding{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
crb.Status.FailedPlacements = []fleetv1beta1.FailedResourcePlacement{
{
ResourceIdentifier: fleetv1beta1.ResourceIdentifier{
Group: "",
Version: "v1",
Kind: "Service",
Name: "svc-name",
Namespace: "svc-namespace",
},
Condition: metav1.Condition{
Type: fleetv1beta1.WorkConditionTypeAvailable,
Status: metav1.ConditionFalse,
Reason: "fakeFailedAvailableReason",
Message: "fakeFailedAvailableMessage",
LastTransitionTime: metav1.Now(),
},
},
}
Expect(k8sClient.Status().Update(ctx, crb)).Should(Succeed(), "failed to update cluster resource binding status")

By("Checking placement controller queue")
eventuallyCheckPlacementControllerQueue(crb.GetLabels()[fleetv1beta1.CRPTrackingLabel])
fakePlacementController.ResetQueue()
})

It("Should enqueue the clusterResourcePlacement name for reconciling, when there are no more failed placements", func() {
crb := &fleetv1beta1.ClusterResourceBinding{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: testCRBName}, crb)).Should(Succeed(), "failed to get cluster resource binding")
crb.Status.FailedPlacements = []fleetv1beta1.FailedResourcePlacement{}
Expect(k8sClient.Status().Update(ctx, crb)).Should(Succeed(), "failed to update cluster resource binding status")

By("Checking placement controller queue")
eventuallyCheckPlacementControllerQueue(crb.GetLabels()[fleetv1beta1.CRPTrackingLabel])
fakePlacementController.ResetQueue()
})
})
})

func clusterResourceBindingForTest() *fleetv1beta1.ClusterResourceBinding {
Expand Down
6 changes: 2 additions & 4 deletions pkg/controllers/clusterresourceplacement/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,8 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster
}

klog.V(2).InfoS("Placement rollout has not finished yet and requeue the request", "clusterResourcePlacement", crpKObj, "status", crp.Status, "generation", crp.Generation)
// we need to requeue the request to update the status of the resources eg, failedManifests.
// The binding status won't be changed.
// TODO: once we move to populate the failedManifests from the binding, no need to requeue.
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
// no need to requeue the request as the binding status will be changed but we add a long resync loop just in case.
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
}

func (r *Reconciler) getOrCreateClusterSchedulingPolicySnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, revisionHistoryLimit int) (*fleetv1beta1.ClusterSchedulingPolicySnapshot, error) {
Expand Down
7 changes: 5 additions & 2 deletions pkg/controllers/clusterresourceplacement/placement_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (r *Reconciler) setResourceConditions(ctx context.Context, crp *fleetv1beta
meta.RemoveStatusCondition(&rps.Conditions, string(i.ResourcePlacementConditionType()))
}
placementStatuses = append(placementStatuses, rps)
klog.V(2).InfoS("Populated the resource placement status for the scheduled cluster", "clusterResourcePlacement", klog.KObj(crp), "cluster", c.ClusterName)
klog.V(2).InfoS("Populated the resource placement status for the scheduled cluster", "clusterResourcePlacement", klog.KObj(crp), "cluster", c.ClusterName, "resourcePlacementStatus", rps)
}
isClusterScheduled := len(placementStatuses) > 0

Expand Down Expand Up @@ -227,7 +227,8 @@ func (r *Reconciler) buildClusterResourceBindings(ctx context.Context, crp *flee
// TotalCondition
//
// )
func (r *Reconciler) setResourcePlacementStatusPerCluster(crp *fleetv1beta1.ClusterResourcePlacement, latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot, binding *fleetv1beta1.ClusterResourceBinding, status *fleetv1beta1.ResourcePlacementStatus) ([]metav1.ConditionStatus, error) {
func (r *Reconciler) setResourcePlacementStatusPerCluster(crp *fleetv1beta1.ClusterResourcePlacement, latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot,
binding *fleetv1beta1.ClusterResourceBinding, status *fleetv1beta1.ResourcePlacementStatus) ([]metav1.ConditionStatus, error) {
if binding == nil {
meta.SetStatusCondition(&status.Conditions, condition.RolloutStartedCondition.UnknownResourceConditionPerCluster(crp.Generation))
return []metav1.ConditionStatus{metav1.ConditionUnknown}, nil
Expand All @@ -246,6 +247,7 @@ func (r *Reconciler) setResourcePlacementStatusPerCluster(crp *fleetv1beta1.Clus
if !condition.IsConditionStatusTrue(bindingCond, binding.Generation) &&
!condition.IsConditionStatusFalse(bindingCond, binding.Generation) {
meta.SetStatusCondition(&status.Conditions, i.UnknownResourceConditionPerCluster(crp.Generation))
klog.V(5).InfoS("Find an unknown condition", "bindingCond", bindingCond, "clusterResourceBinding", klog.KObj(binding), "clusterResourcePlacement", klog.KObj(crp))
res = append(res, metav1.ConditionUnknown)
break
}
Expand Down Expand Up @@ -294,5 +296,6 @@ func (r *Reconciler) setResourcePlacementStatusPerCluster(crp *fleetv1beta1.Clus
// At this point, either the generation is not the one in the binding spec or the status is true/unknown.
// It means the rollout controller has not handled the binding yet.
meta.SetStatusCondition(&status.Conditions, condition.RolloutStartedCondition.UnknownResourceConditionPerCluster(crp.Generation))
klog.V(5).InfoS("The staled binding rollout status is unknown", "clusterResourceBinding", klog.KObj(binding), "clusterResourcePlacement", klog.KObj(crp))
return []metav1.ConditionStatus{metav1.ConditionUnknown}, nil
}
36 changes: 23 additions & 13 deletions pkg/controllers/workgenerator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ func extractFailedResourcePlacementsFromWork(work *fleetv1beta1.Work) []fleetv1b
}
failedManifest.Condition = *appliedCond
res = append(res, failedManifest)
break
continue //jump to the next manifest
}
availableCond = meta.FindStatusCondition(manifestCondition.Conditions, fleetv1beta1.WorkConditionTypeAvailable)
if availableCond != nil && availableCond.Status == metav1.ConditionFalse {
Expand All @@ -809,7 +809,7 @@ func extractFailedResourcePlacementsFromWork(work *fleetv1beta1.Work) []fleetv1b
"version", manifestCondition.Identifier.Version, "kind", manifestCondition.Identifier.Kind,
"envelopeType", envelopeType, "envelopObjName", envelopObjName, "envelopObjNamespace", envelopObjNamespace)
} else {
klog.V(2).InfoS("Find an unavailable enveloped manifest",
klog.V(2).InfoS("Find an unavailable manifest",
"manifestName", manifestCondition.Identifier.Name, "group", manifestCondition.Identifier.Group,
"version", manifestCondition.Identifier.Version, "kind", manifestCondition.Identifier.Kind)
}
Expand Down Expand Up @@ -874,19 +874,29 @@ func (r *Reconciler) SetupWithManager(mgr controllerruntime.Manager) error {
"Failed to process an update event for work object")
return
}
oldAppliedStatus := meta.FindStatusCondition(oldWork.Status.Conditions, fleetv1beta1.WorkConditionTypeApplied)
newAppliedStatus := meta.FindStatusCondition(newWork.Status.Conditions, fleetv1beta1.WorkConditionTypeApplied)
oldAvailableStatus := meta.FindStatusCondition(oldWork.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable)
newAvailableStatus := meta.FindStatusCondition(newWork.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable)

// we only need to handle the case the applied or available condition is changed between the
// new and old work objects. Otherwise, it won't affect the binding applied condition
if condition.EqualCondition(oldAppliedStatus, newAppliedStatus) && condition.EqualCondition(oldAvailableStatus, newAvailableStatus) {
klog.V(2).InfoS("The work applied or available condition didn't flip between true and false, no need to reconcile", "oldWork", klog.KObj(oldWork), "newWork", klog.KObj(newWork))
return
oldAppliedCondition := meta.FindStatusCondition(oldWork.Status.Conditions, fleetv1beta1.WorkConditionTypeApplied)
newAppliedCondition := meta.FindStatusCondition(newWork.Status.Conditions, fleetv1beta1.WorkConditionTypeApplied)
oldAvailableCondition := meta.FindStatusCondition(oldWork.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable)
newAvailableCondition := meta.FindStatusCondition(newWork.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable)

// we try to filter out events, we only need to handle the updated event if the applied or available condition flip between true and false
// or the failed placements are changed.
if condition.EqualCondition(oldAppliedCondition, newAppliedCondition) && condition.EqualCondition(oldAvailableCondition, newAvailableCondition) {
if condition.IsConditionStatusFalse(newAppliedCondition, newWork.Generation) || condition.IsConditionStatusFalse(newAvailableCondition, newWork.Generation) {
// we need to compare the failed placement if the work is not applied or available
oldFailedPlacements := extractFailedResourcePlacementsFromWork(oldWork)
newFailedPlacements := extractFailedResourcePlacementsFromWork(newWork)
if utils.IsFailedResourcePlacementsEqual(oldFailedPlacements, newFailedPlacements) {
klog.V(2).InfoS("The failed placement list didn't change on failed work, no need to reconcile", "oldWork", klog.KObj(oldWork), "newWork", klog.KObj(newWork))
return
}
} else {
klog.V(2).InfoS("The work applied or available condition stayed as true, no need to reconcile", "oldWork", klog.KObj(oldWork), "newWork", klog.KObj(newWork))
return
}
}
klog.V(2).InfoS("Received a work update event", "work", klog.KObj(newWork), "parentBindingName", parentBindingName)
// We need to update the binding status in this case
klog.V(2).InfoS("Received a work update event that we need to handle", "work", klog.KObj(newWork), "parentBindingName", parentBindingName)
queue.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: parentBindingName,
}})
Expand Down
Loading
Loading