diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index a7e491855a..a94ec6cc32 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -308,6 +308,42 @@ type ByObject struct { // // Defaults to true. EnableWatchBookmarks *bool + + // SyncPeriod determines the minimum frequency at which watched resources are + // reconciled. A lower period will correct entropy more quickly, but reduce + // responsiveness to change if there are many watched resources. Change this + // value only if you know what you are doing. Defaults to 10 hours if unset. + // there will a 10 percent jitter between the SyncPeriod of all controllers + // so that all controllers will not send list requests simultaneously. + // + // This applies to all controllers. + // + // A period sync happens for two reasons: + // 1. To insure against a bug in the controller that causes an object to not + // be requeued, when it otherwise should be requeued. + // 2. To insure against an unknown bug in controller-runtime, or its dependencies, + // that causes an object to not be requeued, when it otherwise should be + // requeued, or to be removed from the queue, when it otherwise should not + // be removed. + // + // If you want + // 1. to insure against missed watch events, or + // 2. to poll services that cannot be watched, + // then we recommend that, instead of changing the default period, the + // controller requeue, with a constant duration `t`, whenever the controller + // is "done" with an object, and would otherwise not requeue it, i.e., we + // recommend the `Reconcile` function return `reconcile.Result{RequeueAfter: t}`, + // instead of `reconcile.Result{}`. + // + // SyncPeriod will locally trigger an artificial Update event with the same + // object in both ObjectOld and ObjectNew for everything that is in the + // cache. + // + // Predicates or Handlers that expect ObjectOld and ObjectNew to be different + // (such as GenerationChangedPredicate) will filter out this event, preventing + // it from triggering a reconciliation. + // SyncPeriod does not sync between the local cache and the server. + SyncPeriod *time.Duration } // Config describes all potential options for a given watch. @@ -343,6 +379,42 @@ type Config struct { // // Defaults to true. EnableWatchBookmarks *bool + + // SyncPeriod determines the minimum frequency at which watched resources are + // reconciled. A lower period will correct entropy more quickly, but reduce + // responsiveness to change if there are many watched resources. Change this + // value only if you know what you are doing. Defaults to 10 hours if unset. + // there will a 10 percent jitter between the SyncPeriod of all controllers + // so that all controllers will not send list requests simultaneously. + // + // This applies to all controllers. + // + // A period sync happens for two reasons: + // 1. To insure against a bug in the controller that causes an object to not + // be requeued, when it otherwise should be requeued. + // 2. To insure against an unknown bug in controller-runtime, or its dependencies, + // that causes an object to not be requeued, when it otherwise should be + // requeued, or to be removed from the queue, when it otherwise should not + // be removed. + // + // If you want + // 1. to insure against missed watch events, or + // 2. to poll services that cannot be watched, + // then we recommend that, instead of changing the default period, the + // controller requeue, with a constant duration `t`, whenever the controller + // is "done" with an object, and would otherwise not requeue it, i.e., we + // recommend the `Reconcile` function return `reconcile.Result{RequeueAfter: t}`, + // instead of `reconcile.Result{}`. + // + // SyncPeriod will locally trigger an artificial Update event with the same + // object in both ObjectOld and ObjectNew for everything that is in the + // cache. + // + // Predicates or Handlers that expect ObjectOld and ObjectNew to be different + // (such as GenerationChangedPredicate) will filter out this event, preventing + // it from triggering a reconciliation. + // SyncPeriod does not sync between the local cache and the server. + SyncPeriod *time.Duration } // NewCacheFunc - Function for creating a new cache from the options and a rest config. @@ -413,6 +485,7 @@ func optionDefaultsToConfig(opts *Options) Config { Transform: opts.DefaultTransform, UnsafeDisableDeepCopy: opts.DefaultUnsafeDisableDeepCopy, EnableWatchBookmarks: opts.DefaultEnableWatchBookmarks, + SyncPeriod: opts.SyncPeriod, } } @@ -423,6 +496,7 @@ func byObjectToConfig(byObject ByObject) Config { Transform: byObject.Transform, UnsafeDisableDeepCopy: byObject.UnsafeDisableDeepCopy, EnableWatchBookmarks: byObject.EnableWatchBookmarks, + SyncPeriod: byObject.SyncPeriod, } } @@ -436,7 +510,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { HTTPClient: opts.HTTPClient, Scheme: opts.Scheme, Mapper: opts.Mapper, - ResyncPeriod: *opts.SyncPeriod, + ResyncPeriod: ptr.Deref(config.SyncPeriod, defaultSyncPeriod), Namespace: namespace, Selector: internal.Selector{ Label: config.LabelSelector, @@ -534,6 +608,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { byObject.Transform = defaultedConfig.Transform byObject.UnsafeDisableDeepCopy = defaultedConfig.UnsafeDisableDeepCopy byObject.EnableWatchBookmarks = defaultedConfig.EnableWatchBookmarks + byObject.SyncPeriod = defaultedConfig.SyncPeriod } opts.ByObject[obj] = byObject @@ -555,10 +630,6 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { opts.DefaultNamespaces[namespace] = cfg } - // Default the resync period to 10 hours if unset - if opts.SyncPeriod == nil { - opts.SyncPeriod = &defaultSyncPeriod - } return opts, nil } @@ -578,6 +649,9 @@ func defaultConfig(toDefault, defaultFrom Config) Config { if toDefault.EnableWatchBookmarks == nil { toDefault.EnableWatchBookmarks = defaultFrom.EnableWatchBookmarks } + if toDefault.SyncPeriod == nil { + toDefault.SyncPeriod = defaultFrom.SyncPeriod + } return toDefault } diff --git a/pkg/cache/defaulting_test.go b/pkg/cache/defaulting_test.go index d9d0dcceb3..89a0334324 100644 --- a/pkg/cache/defaulting_test.go +++ b/pkg/cache/defaulting_test.go @@ -249,6 +249,30 @@ func TestDefaultOpts(t *testing.T) { return cmp.Diff(expected, o.ByObject[pod].EnableWatchBookmarks) }, }, + { + name: "ByObject.SyncPeriod gets defaulted from SyncPeriod", + in: Options{ + ByObject: map[client.Object]ByObject{pod: {}}, + SyncPeriod: ptr.To(5 * time.Minute), + }, + + verification: func(o Options) string { + expected := ptr.To(5 * time.Minute) + return cmp.Diff(expected, o.ByObject[pod].SyncPeriod) + }, + }, + { + name: "ByObject.SyncPeriod doesn't get defaulted when set", + in: Options{ + ByObject: map[client.Object]ByObject{pod: {SyncPeriod: ptr.To(1 * time.Minute)}}, + SyncPeriod: ptr.To(5 * time.Minute), + }, + + verification: func(o Options) string { + expected := ptr.To(1 * time.Minute) + return cmp.Diff(expected, o.ByObject[pod].SyncPeriod) + }, + }, { name: "DefaultNamespace label selector gets defaulted from DefaultLabelSelector", in: Options{ diff --git a/pkg/client/namespaced_client.go b/pkg/client/namespaced_client.go index cacba4a9c6..d4223eda26 100644 --- a/pkg/client/namespaced_client.go +++ b/pkg/client/namespaced_client.go @@ -213,7 +213,12 @@ func (n *namespacedClient) Get(ctx context.Context, key ObjectKey, obj Object, o // List implements client.Client. func (n *namespacedClient) List(ctx context.Context, obj ObjectList, opts ...ListOption) error { - if n.namespace != "" { + isNamespaceScoped, err := n.IsObjectNamespaced(obj) + if err != nil { + return fmt.Errorf("error finding the scope of the object: %w", err) + } + + if isNamespaceScoped && n.namespace != "" { opts = append(opts, InNamespace(n.namespace)) } return n.client.List(ctx, obj, opts...) diff --git a/pkg/client/namespaced_client_test.go b/pkg/client/namespaced_client_test.go index cf28289e72..a80d0d46f8 100644 --- a/pkg/client/namespaced_client_test.go +++ b/pkg/client/namespaced_client_test.go @@ -43,6 +43,7 @@ import ( var _ = Describe("NamespacedClient", func() { var dep *appsv1.Deployment + var nameSpace *corev1.Namespace var acDep *appsv1applyconfigurations.DeploymentApplyConfiguration var ns = "default" var count uint64 = 0 @@ -53,6 +54,8 @@ var _ = Describe("NamespacedClient", func() { err := rbacv1.AddToScheme(sch) Expect(err).ToNot(HaveOccurred()) + err = corev1.AddToScheme(sch) + Expect(err).ToNot(HaveOccurred()) err = appsv1.AddToScheme(sch) Expect(err).ToNot(HaveOccurred()) @@ -80,6 +83,12 @@ var _ = Describe("NamespacedClient", func() { }, }, } + nameSpace = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("namespace-%v", count), + Labels: map[string]string{"name": fmt.Sprintf("namespace-%v", count)}, + }, + } acDep = appsv1applyconfigurations.Deployment(dep.Name, ""). WithLabels(dep.Labels). WithSpec(appsv1applyconfigurations.DeploymentSpec(). @@ -131,10 +140,13 @@ var _ = Describe("NamespacedClient", func() { var err error dep, err = clientset.AppsV1().Deployments(ns).Create(ctx, dep, metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) + nameSpace, err = clientset.CoreV1().Namespaces().Create(ctx, nameSpace, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func(ctx SpecContext) { deleteDeployment(ctx, dep, ns) + deleteNamespace(ctx, nameSpace) }) It("should successfully List objects when namespace is not specified with the object", func(ctx SpecContext) { @@ -146,6 +158,13 @@ var _ = Describe("NamespacedClient", func() { Expect(result.Items[0]).To(BeEquivalentTo(*dep)) }) + It("should successfully List objects when object is not namespaced scoped", func(ctx SpecContext) { + result := &corev1.NamespaceList{} + opts := &client.ListOptions{} + Expect(getClient().List(ctx, result, opts)).NotTo(HaveOccurred()) + Expect(result.Items).NotTo(BeEmpty()) + }) + It("should List objects from the namespace specified in the client", func(ctx SpecContext) { result := &appsv1.DeploymentList{} opts := client.InNamespace("non-default") diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 98df84c56b..71363f0d17 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -124,8 +124,8 @@ type priorityqueue[T comparable] struct { get chan item[T] // waiters is the number of routines blocked in Get, we use it to determine - // if we can push items. - waiters atomic.Int64 + // if we can push items. Every manipulation has to be protected with the lock. + waiters int64 // Configurable for testing now func() time.Time @@ -269,7 +269,7 @@ func (w *priorityqueue[T]) spin() { } } - if w.waiters.Load() == 0 { + if w.waiters == 0 { // Have to keep iterating here to ensure we update metrics // for further items that became ready and set nextReady. return true @@ -277,7 +277,7 @@ func (w *priorityqueue[T]) spin() { w.metrics.get(item.Key, item.Priority) w.locked.Insert(item.Key) - w.waiters.Add(-1) + w.waiters-- delete(w.items, item.Key) toDelete = append(toDelete, item) w.becameReady.Delete(item.Key) @@ -316,7 +316,9 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) return zero, 0, true } - w.waiters.Add(1) + w.lock.Lock() + w.waiters++ + w.lock.Unlock() w.notifyItemOrWaiterAdded() diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index d0cc51f7c5..5cade57e3c 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -378,7 +378,12 @@ var _ = Describe("Controllerworkqueue", func() { }() // Verify the go routine above is now waiting for an item. - Eventually(q.(*priorityqueue[string]).waiters.Load).Should(Equal(int64(1))) + Eventually(func() int64 { + q.(*priorityqueue[string]).lock.Lock() + defer q.(*priorityqueue[string]).lock.Unlock() + + return q.(*priorityqueue[string]).waiters + }).Should(Equal(int64(1))) Consistently(getUnblocked).ShouldNot(BeClosed()) // shut down diff --git a/pkg/envtest/envtest_test.go b/pkg/envtest/envtest_test.go index ce3e9a4d3f..806c9f43cc 100644 --- a/pkg/envtest/envtest_test.go +++ b/pkg/envtest/envtest_test.go @@ -963,4 +963,56 @@ var _ = Describe("Test", func() { Expect(env.WebhookInstallOptions.LocalServingCertDir).ShouldNot(BeADirectory()) }) }) + + Describe("Binary Path Handling", func() { + It("should respect pre-configured binary paths when not downloading", func() { + // Setup custom paths + customAPIServerPath := "/custom/path/to/kube-apiserver" + customEtcdPath := "/custom/path/to/etcd" + customKubectlPath := "/custom/path/to/kubectl" + + // Create an environment with pre-configured paths + testEnv := &Environment{} + testEnv.ControlPlane.GetAPIServer().Path = customAPIServerPath + testEnv.ControlPlane.Etcd = &Etcd{} + testEnv.ControlPlane.Etcd.Path = customEtcdPath + testEnv.ControlPlane.KubectlPath = customKubectlPath + + // Set BinaryAssetsDirectory to ensure it's not using defaults + testEnv.BinaryAssetsDirectory = "/should/not/be/used" + testEnv.DownloadBinaryAssets = false + + // Call configureBinaryPaths to test the path configuration logic + err := testEnv.configureBinaryPaths() + Expect(err).NotTo(HaveOccurred()) + + // Verify paths were preserved (not overwritten) + apiServer := testEnv.ControlPlane.GetAPIServer() + Expect(apiServer.Path).To(Equal(customAPIServerPath)) + Expect(testEnv.ControlPlane.Etcd.Path).To(Equal(customEtcdPath)) + Expect(testEnv.ControlPlane.KubectlPath).To(Equal(customKubectlPath)) + }) + + It("should auto-configure binary paths when not pre-configured", func() { + // Create an environment without pre-configured paths + testEnv := &Environment{} + testEnv.BinaryAssetsDirectory = "/test/assets" + testEnv.DownloadBinaryAssets = false + + // Call configureBinaryPaths + err := testEnv.configureBinaryPaths() + Expect(err).NotTo(HaveOccurred()) + + // Verify paths were set using BinPathFinder + apiServer := testEnv.ControlPlane.GetAPIServer() + Expect(apiServer.Path).NotTo(BeEmpty()) + Expect(testEnv.ControlPlane.Etcd.Path).NotTo(BeEmpty()) + Expect(testEnv.ControlPlane.KubectlPath).NotTo(BeEmpty()) + + // Verify the paths contain the binary names + Expect(apiServer.Path).To(ContainSubstring("kube-apiserver")) + Expect(testEnv.ControlPlane.Etcd.Path).To(ContainSubstring("etcd")) + Expect(testEnv.ControlPlane.KubectlPath).To(ContainSubstring("kubectl")) + }) + }) }) diff --git a/pkg/envtest/server.go b/pkg/envtest/server.go index 9bb81ed2ab..c9f19da977 100644 --- a/pkg/envtest/server.go +++ b/pkg/envtest/server.go @@ -109,7 +109,11 @@ var ( // Environment creates a Kubernetes test environment that will start / stop the Kubernetes control plane and // install extension APIs. type Environment struct { - // ControlPlane is the ControlPlane including the apiserver and etcd + // ControlPlane is the ControlPlane including the apiserver and etcd. + // Binary paths (APIServer.Path, Etcd.Path, KubectlPath) can be pre-configured in ControlPlane. + // If DownloadBinaryAssets is true, the downloaded paths will always be used. + // If DownloadBinaryAssets is false and paths are not pre-configured (default is empty), they will be + // automatically resolved using BinaryAssetsDirectory. ControlPlane controlplane.ControlPlane // Scheme is used to determine if conversion webhooks should be enabled @@ -211,6 +215,40 @@ func (te *Environment) Stop() error { return te.ControlPlane.Stop() } +// configureBinaryPaths configures the binary paths for the API server, etcd, and kubectl. +// If DownloadBinaryAssets is true, it downloads and uses those paths. +// If DownloadBinaryAssets is false, it only sets paths that are not already configured (empty). +func (te *Environment) configureBinaryPaths() error { + apiServer := te.ControlPlane.GetAPIServer() + + if te.ControlPlane.Etcd == nil { + te.ControlPlane.Etcd = &controlplane.Etcd{} + } + + if te.DownloadBinaryAssets { + apiServerPath, etcdPath, kubectlPath, err := downloadBinaryAssets(context.TODO(), + te.BinaryAssetsDirectory, te.DownloadBinaryAssetsVersion, te.DownloadBinaryAssetsIndexURL) + if err != nil { + return err + } + + apiServer.Path = apiServerPath + te.ControlPlane.Etcd.Path = etcdPath + te.ControlPlane.KubectlPath = kubectlPath + } else { + if apiServer.Path == "" { + apiServer.Path = process.BinPathFinder("kube-apiserver", te.BinaryAssetsDirectory) + } + if te.ControlPlane.Etcd.Path == "" { + te.ControlPlane.Etcd.Path = process.BinPathFinder("etcd", te.BinaryAssetsDirectory) + } + if te.ControlPlane.KubectlPath == "" { + te.ControlPlane.KubectlPath = process.BinPathFinder("kubectl", te.BinaryAssetsDirectory) + } + } + return nil +} + // Start starts a local Kubernetes server and updates te.ApiserverPort with the port it is listening on. func (te *Environment) Start() (*rest.Config, error) { if te.useExistingCluster() { @@ -229,10 +267,6 @@ func (te *Environment) Start() (*rest.Config, error) { } else { apiServer := te.ControlPlane.GetAPIServer() - if te.ControlPlane.Etcd == nil { - te.ControlPlane.Etcd = &controlplane.Etcd{} - } - if os.Getenv(envAttachOutput) == "true" { te.AttachControlPlaneOutput = true } @@ -243,6 +277,9 @@ func (te *Environment) Start() (*rest.Config, error) { if apiServer.Err == nil { apiServer.Err = os.Stderr } + if te.ControlPlane.Etcd == nil { + te.ControlPlane.Etcd = &controlplane.Etcd{} + } if te.ControlPlane.Etcd.Out == nil { te.ControlPlane.Etcd.Out = os.Stdout } @@ -251,20 +288,8 @@ func (te *Environment) Start() (*rest.Config, error) { } } - if te.DownloadBinaryAssets { - apiServerPath, etcdPath, kubectlPath, err := downloadBinaryAssets(context.TODO(), - te.BinaryAssetsDirectory, te.DownloadBinaryAssetsVersion, te.DownloadBinaryAssetsIndexURL) - if err != nil { - return nil, err - } - - apiServer.Path = apiServerPath - te.ControlPlane.Etcd.Path = etcdPath - te.ControlPlane.KubectlPath = kubectlPath - } else { - apiServer.Path = process.BinPathFinder("kube-apiserver", te.BinaryAssetsDirectory) - te.ControlPlane.Etcd.Path = process.BinPathFinder("etcd", te.BinaryAssetsDirectory) - te.ControlPlane.KubectlPath = process.BinPathFinder("kubectl", te.BinaryAssetsDirectory) + if err := te.configureBinaryPaths(); err != nil { + return nil, fmt.Errorf("failed to configure binary paths: %w", err) } if err := te.defaultTimeouts(); err != nil {