| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- /*
- Copyright © 2025 ESO Maintainer Team
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- https://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package externalsecret
- import (
- "context"
- "fmt"
- "sync"
- "github.com/go-logr/logr"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/client-go/util/workqueue"
- ctrl "sigs.k8s.io/controller-runtime"
- runtimecache "sigs.k8s.io/controller-runtime/pkg/cache"
- "sigs.k8s.io/controller-runtime/pkg/client"
- "sigs.k8s.io/controller-runtime/pkg/reconcile"
- "sigs.k8s.io/controller-runtime/pkg/source"
- esv1 "github.com/external-secrets/external-secrets/apis/externalsecrets/v1"
- )
- // InformerManager manages the lifecycle of informers for generic target resources.
- // It handles dynamic registration, tracking, and cleanup of informers.
- type InformerManager interface {
- // EnsureInformer ensures an informer exists for the given GVK and registers the ExternalSecret as using it.
- // Returns true if a new informer was created, false if it already existed.
- EnsureInformer(ctx context.Context, gvk schema.GroupVersionKind, es types.NamespacedName) (bool, error)
- // ReleaseInformer unregisters the ExternalSecret from using this GVK.
- // If no more ExternalSecrets use this GVK, the informer is stopped and removed.
- ReleaseInformer(ctx context.Context, gvk schema.GroupVersionKind, es types.NamespacedName) error
- // IsManaged returns true if the manager is currently managing an informer for the GVK.
- IsManaged(gvk schema.GroupVersionKind) bool
- // GetInformer returns the informer for a GVK if it exists.
- GetInformer(gvk schema.GroupVersionKind) (runtimecache.Informer, bool)
- // Source returns a source.TypedSource that can be used with WatchesRawSource
- Source() source.TypedSource[reconcile.Request]
- // SetQueue binds the reconcile queue to the informer manager
- SetQueue(queue workqueue.TypedRateLimitingInterface[ctrl.Request]) error
- }
- // informerEntry tracks an informer and the ExternalSecrets using it.
- type informerEntry struct {
- informer runtimecache.Informer
- // externalSecrets tracks the external secrets using a GVK. Once this list is empty, we
- // stop the informer and deregister it to free up resources. It is a map instead of just a number to prevent
- // duplicated reconcile ensures to increase the number on each reconcile.
- externalSecrets map[types.NamespacedName]struct{}
- }
- // DefaultInformerManager implements InformerManager using controller-runtime's cache.
- type DefaultInformerManager struct {
- cache runtimecache.Cache
- client client.Client
- log logr.Logger
- mu sync.RWMutex
- informers map[string]*informerEntry // key: GVK string
- queue workqueue.TypedRateLimitingInterface[ctrl.Request]
- managerContext context.Context
- }
- // NewInformerManager creates a new InformerManager.
- func NewInformerManager(ctx context.Context, cache runtimecache.Cache, client client.Client, log logr.Logger) InformerManager {
- return &DefaultInformerManager{
- managerContext: ctx,
- cache: cache,
- client: client,
- log: log,
- informers: make(map[string]*informerEntry),
- }
- }
- // EnsureInformer ensures an informer exists for the given GVK and registers the ExternalSecret.
- func (m *DefaultInformerManager) EnsureInformer(ctx context.Context, gvk schema.GroupVersionKind, es types.NamespacedName) (bool, error) {
- m.mu.Lock()
- defer m.mu.Unlock()
- key := gvk.String()
- // check if we have this gvk in the list of informers already
- if entry, exists := m.informers[key]; exists {
- // register this ExternalSecret as using this informer (deduplicate);
- entry.externalSecrets[es] = struct{}{}
- m.log.Info("registered ExternalSecret with existing informer",
- "gvk", key,
- "externalSecret", es,
- "totalUsers", len(entry.externalSecrets))
- return false, nil
- }
- if m.queue == nil {
- return false, fmt.Errorf("queue not initialized, call SetQueue first")
- }
- obj := &unstructured.Unstructured{}
- obj.SetGroupVersionKind(gvk)
- informer, err := m.cache.GetInformer(ctx, obj)
- if err != nil {
- return false, fmt.Errorf("failed to get informer for %s: %w", key, err)
- }
- // Add event handler to the informer that enqueues reconcile requests
- _, err = informer.AddEventHandler(&enqueueHandler{
- managerContext: m.managerContext,
- gvk: gvk,
- client: m.client,
- queue: m.queue,
- log: m.log,
- })
- if err != nil {
- return false, fmt.Errorf("failed to add event handler for %s: %w", key, err)
- }
- // Store the informer with this ExternalSecret as the first user
- m.informers[key] = &informerEntry{
- informer: informer,
- externalSecrets: map[types.NamespacedName]struct{}{es: {}},
- }
- m.log.Info("registered informer for generic target",
- "group", gvk.Group,
- "version", gvk.Version,
- "kind", gvk.Kind,
- "externalSecret", es)
- return true, nil
- }
- // enqueueHandler is an event handler that enqueues reconcile requests for ExternalSecrets
- // that target the changed resource.
- type enqueueHandler struct {
- managerContext context.Context
- gvk schema.GroupVersionKind
- client client.Client
- queue workqueue.TypedRateLimitingInterface[ctrl.Request]
- log logr.Logger
- }
- func (h *enqueueHandler) OnAdd(obj interface{}, _ bool) {
- h.enqueue(obj)
- }
- func (h *enqueueHandler) OnUpdate(_, newObj interface{}) {
- h.enqueue(newObj)
- }
- func (h *enqueueHandler) OnDelete(obj interface{}) {
- h.enqueue(obj)
- }
- func (h *enqueueHandler) enqueue(obj interface{}) {
- // Extract metadata
- meta, ok := obj.(metav1.Object)
- if !ok {
- h.log.Error(nil, "unexpected object type", "type", fmt.Sprintf("%T", obj))
- return
- }
- // Only process resources with the managed label
- labels := meta.GetLabels()
- if labels == nil {
- return
- }
- value, hasLabel := labels[esv1.LabelManaged]
- if !hasLabel || value != esv1.LabelManagedValue {
- return
- }
- // Find ExternalSecrets that target this resource
- externalSecretsList := &esv1.ExternalSecretList{}
- indexValue := fmt.Sprintf("%s/%s/%s/%s", h.gvk.Group, h.gvk.Version, h.gvk.Kind, meta.GetName())
- listOps := &client.ListOptions{
- FieldSelector: fields.OneTermEqualSelector(indexESTargetResourceField, indexValue),
- Namespace: meta.GetNamespace(),
- }
- if err := h.client.List(h.managerContext, externalSecretsList, listOps); err != nil {
- h.log.Error(err, "failed to list ExternalSecrets for resource",
- "gvk", h.gvk.String(),
- "name", meta.GetName(),
- "namespace", meta.GetNamespace())
- return
- }
- // Enqueue reconcile requests for each ExternalSecret
- for i := range externalSecretsList.Items {
- req := ctrl.Request{
- NamespacedName: types.NamespacedName{
- Name: externalSecretsList.Items[i].GetName(),
- Namespace: externalSecretsList.Items[i].GetNamespace(),
- },
- }
- h.queue.Add(req)
- h.log.V(1).Info("enqueued reconcile request due to resource change",
- "externalSecret", req.NamespacedName,
- "targetGVK", h.gvk.String(),
- "targetResource", meta.GetName())
- }
- }
- // ReleaseInformer unregisters the ExternalSecret from using this GVK.
- func (m *DefaultInformerManager) ReleaseInformer(ctx context.Context, gvk schema.GroupVersionKind, es types.NamespacedName) error {
- m.mu.Lock()
- defer m.mu.Unlock()
- key := gvk.String()
- entry, exists := m.informers[key]
- if !exists {
- // Already removed or never existed; can happen if we had a bad start, failed to watch, or during other errors.
- // In that case, there is nothing else to do really.
- m.log.Info("informer not found for release",
- "gvk", key,
- "externalSecret", es)
- return nil
- }
- // remove the ES from the list of ESs using this GVK
- delete(entry.externalSecrets, es)
- m.log.Info("unregistered ExternalSecret from informer",
- "gvk", key,
- "externalSecret", es,
- "remainingUsers", len(entry.externalSecrets))
- // if no more ExternalSecrets are using this informer, remove it
- if len(entry.externalSecrets) == 0 {
- obj := &unstructured.Unstructured{}
- obj.SetGroupVersionKind(gvk)
- if err := m.cache.RemoveInformer(ctx, obj); err != nil {
- m.log.Error(err, "failed to remove informer, will clean up tracking anyway",
- "gvk", key)
- }
- delete(m.informers, key)
- m.log.Info("removed informer for generic target (no more users)",
- "group", gvk.Group,
- "version", gvk.Version,
- "kind", gvk.Kind)
- }
- return nil
- }
- // IsManaged returns true if the manager is currently managing an informer for the GVK.
- func (m *DefaultInformerManager) IsManaged(gvk schema.GroupVersionKind) bool {
- m.mu.RLock()
- defer m.mu.RUnlock()
- _, exists := m.informers[gvk.String()]
- return exists
- }
- // GetInformer returns the informer for a GVK if it exists.
- func (m *DefaultInformerManager) GetInformer(gvk schema.GroupVersionKind) (runtimecache.Informer, bool) {
- m.mu.RLock()
- defer m.mu.RUnlock()
- entry, exists := m.informers[gvk.String()]
- if !exists {
- return nil, false
- }
- return entry.informer, true
- }
- // Source returns a source.TypedSource that binds the reconcile queue to this manager.
- func (m *DefaultInformerManager) Source() source.TypedSource[reconcile.Request] {
- return source.Func(func(_ context.Context, queue workqueue.TypedRateLimitingInterface[ctrl.Request]) error {
- // This dynamically binds the given queue to the informer manager
- // From this point on, the queue will receive events for all registered informers
- return m.SetQueue(queue)
- })
- }
- // SetQueue binds the reconcile queue to the informer manager.
- func (m *DefaultInformerManager) SetQueue(queue workqueue.TypedRateLimitingInterface[ctrl.Request]) error {
- m.mu.Lock()
- defer m.mu.Unlock()
- if m.queue != nil {
- return fmt.Errorf("queue already set")
- }
- m.queue = queue
- m.log.Info("reconcile queue bound to informer manager")
- return nil
- }
|