informer_manager.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. /*
  2. Copyright © 2025 ESO Maintainer Team
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. https://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package externalsecret
  14. import (
  15. "context"
  16. "fmt"
  17. "sync"
  18. "github.com/go-logr/logr"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/apimachinery/pkg/fields"
  21. "k8s.io/apimachinery/pkg/runtime/schema"
  22. "k8s.io/apimachinery/pkg/types"
  23. "k8s.io/client-go/util/workqueue"
  24. ctrl "sigs.k8s.io/controller-runtime"
  25. runtimecache "sigs.k8s.io/controller-runtime/pkg/cache"
  26. "sigs.k8s.io/controller-runtime/pkg/client"
  27. "sigs.k8s.io/controller-runtime/pkg/reconcile"
  28. "sigs.k8s.io/controller-runtime/pkg/source"
  29. esv1 "github.com/external-secrets/external-secrets/apis/externalsecrets/v1"
  30. )
  31. // InformerManager manages the lifecycle of informers for generic target resources.
  32. // It handles dynamic registration, tracking, and cleanup of informers.
  33. type InformerManager interface {
  34. // EnsureInformer ensures an informer exists for the given GVK and registers the ExternalSecret as using it.
  35. // Returns true if a new informer was created, false if it already existed.
  36. EnsureInformer(ctx context.Context, gvk schema.GroupVersionKind, es types.NamespacedName) (bool, error)
  37. // ReleaseInformer unregisters the ExternalSecret from using this GVK.
  38. // If no more ExternalSecrets use this GVK, the informer is stopped and removed.
  39. ReleaseInformer(ctx context.Context, gvk schema.GroupVersionKind, es types.NamespacedName) error
  40. // IsManaged returns true if the manager is currently managing an informer for the GVK.
  41. IsManaged(gvk schema.GroupVersionKind) bool
  42. // GetInformer returns the informer for a GVK if it exists.
  43. GetInformer(gvk schema.GroupVersionKind) (runtimecache.Informer, bool)
  44. // Source returns a source.TypedSource that can be used with WatchesRawSource
  45. Source() source.TypedSource[reconcile.Request]
  46. // SetQueue binds the reconcile queue to the informer manager
  47. SetQueue(queue workqueue.TypedRateLimitingInterface[ctrl.Request]) error
  48. }
  49. // informerEntry tracks an informer and the ExternalSecrets using it.
  50. type informerEntry struct {
  51. informer runtimecache.Informer
  52. // externalSecrets tracks the external secrets using a GVK. Once this list is empty, we
  53. // stop the informer and deregister it to free up resources. It is a map instead of just a number to prevent
  54. // duplicated reconcile ensures to increase the number on each reconcile.
  55. externalSecrets map[types.NamespacedName]struct{}
  56. }
  57. // DefaultInformerManager implements InformerManager using controller-runtime's cache.
  58. type DefaultInformerManager struct {
  59. cache runtimecache.Cache
  60. client client.Client
  61. log logr.Logger
  62. mu sync.RWMutex
  63. informers map[string]*informerEntry // key: GVK string
  64. queue workqueue.TypedRateLimitingInterface[ctrl.Request]
  65. managerContext context.Context
  66. }
  67. // NewInformerManager creates a new InformerManager.
  68. func NewInformerManager(ctx context.Context, cache runtimecache.Cache, client client.Client, log logr.Logger) InformerManager {
  69. return &DefaultInformerManager{
  70. managerContext: ctx,
  71. cache: cache,
  72. client: client,
  73. log: log,
  74. informers: make(map[string]*informerEntry),
  75. }
  76. }
  77. // EnsureInformer ensures an informer exists for the given GVK and registers the ExternalSecret.
  78. func (m *DefaultInformerManager) EnsureInformer(ctx context.Context, gvk schema.GroupVersionKind, es types.NamespacedName) (bool, error) {
  79. m.mu.Lock()
  80. defer m.mu.Unlock()
  81. key := gvk.String()
  82. // check if we have this gvk in the list of informers already
  83. if entry, exists := m.informers[key]; exists {
  84. // register this ExternalSecret as using this informer (deduplicate);
  85. entry.externalSecrets[es] = struct{}{}
  86. m.log.Info("registered ExternalSecret with existing informer",
  87. "gvk", key,
  88. "externalSecret", es,
  89. "totalUsers", len(entry.externalSecrets))
  90. return false, nil
  91. }
  92. if m.queue == nil {
  93. return false, fmt.Errorf("queue not initialized, call SetQueue first")
  94. }
  95. // Get or create informer for this GVK
  96. informer, err := m.cache.GetInformerForKind(ctx, gvk)
  97. if err != nil {
  98. return false, fmt.Errorf("failed to get informer for %s: %w", key, err)
  99. }
  100. // Add event handler to the informer that enqueues reconcile requests
  101. _, err = informer.AddEventHandler(&enqueueHandler{
  102. managerContext: m.managerContext,
  103. gvk: gvk,
  104. client: m.client,
  105. queue: m.queue,
  106. log: m.log,
  107. })
  108. if err != nil {
  109. return false, fmt.Errorf("failed to add event handler for %s: %w", key, err)
  110. }
  111. // Store the informer with this ExternalSecret as the first user
  112. m.informers[key] = &informerEntry{
  113. informer: informer,
  114. externalSecrets: map[types.NamespacedName]struct{}{es: {}},
  115. }
  116. m.log.Info("registered informer for generic target",
  117. "group", gvk.Group,
  118. "version", gvk.Version,
  119. "kind", gvk.Kind,
  120. "externalSecret", es)
  121. return true, nil
  122. }
  123. // enqueueHandler is an event handler that enqueues reconcile requests for ExternalSecrets
  124. // that target the changed resource.
  125. type enqueueHandler struct {
  126. managerContext context.Context
  127. gvk schema.GroupVersionKind
  128. client client.Client
  129. queue workqueue.TypedRateLimitingInterface[ctrl.Request]
  130. log logr.Logger
  131. }
  132. func (h *enqueueHandler) OnAdd(obj interface{}, _ bool) {
  133. h.enqueue(obj)
  134. }
  135. func (h *enqueueHandler) OnUpdate(_, newObj interface{}) {
  136. h.enqueue(newObj)
  137. }
  138. func (h *enqueueHandler) OnDelete(obj interface{}) {
  139. h.enqueue(obj)
  140. }
  141. func (h *enqueueHandler) enqueue(obj interface{}) {
  142. // Extract metadata
  143. meta, ok := obj.(metav1.Object)
  144. if !ok {
  145. h.log.Error(nil, "unexpected object type", "type", fmt.Sprintf("%T", obj))
  146. return
  147. }
  148. // Only process resources with the managed label
  149. labels := meta.GetLabels()
  150. if labels == nil {
  151. return
  152. }
  153. value, hasLabel := labels[esv1.LabelManaged]
  154. if !hasLabel || value != esv1.LabelManagedValue {
  155. return
  156. }
  157. // Find ExternalSecrets that target this resource
  158. externalSecretsList := &esv1.ExternalSecretList{}
  159. indexValue := fmt.Sprintf("%s/%s/%s/%s", h.gvk.Group, h.gvk.Version, h.gvk.Kind, meta.GetName())
  160. listOps := &client.ListOptions{
  161. FieldSelector: fields.OneTermEqualSelector(indexESTargetResourceField, indexValue),
  162. Namespace: meta.GetNamespace(),
  163. }
  164. if err := h.client.List(h.managerContext, externalSecretsList, listOps); err != nil {
  165. h.log.Error(err, "failed to list ExternalSecrets for resource",
  166. "gvk", h.gvk.String(),
  167. "name", meta.GetName(),
  168. "namespace", meta.GetNamespace())
  169. return
  170. }
  171. // Enqueue reconcile requests for each ExternalSecret
  172. for i := range externalSecretsList.Items {
  173. req := ctrl.Request{
  174. NamespacedName: types.NamespacedName{
  175. Name: externalSecretsList.Items[i].GetName(),
  176. Namespace: externalSecretsList.Items[i].GetNamespace(),
  177. },
  178. }
  179. h.queue.Add(req)
  180. h.log.V(1).Info("enqueued reconcile request due to resource change",
  181. "externalSecret", req.NamespacedName,
  182. "targetGVK", h.gvk.String(),
  183. "targetResource", meta.GetName())
  184. }
  185. }
  186. // ReleaseInformer unregisters the ExternalSecret from using this GVK.
  187. func (m *DefaultInformerManager) ReleaseInformer(ctx context.Context, gvk schema.GroupVersionKind, es types.NamespacedName) error {
  188. m.mu.Lock()
  189. defer m.mu.Unlock()
  190. key := gvk.String()
  191. entry, exists := m.informers[key]
  192. if !exists {
  193. // Already removed or never existed; can happen if we had a bad start, failed to watch, or during other errors.
  194. // In that case, there is nothing else to do really.
  195. m.log.Info("informer not found for release",
  196. "gvk", key,
  197. "externalSecret", es)
  198. return nil
  199. }
  200. // remove the ES from the list of ESs using this GVK
  201. delete(entry.externalSecrets, es)
  202. m.log.Info("unregistered ExternalSecret from informer",
  203. "gvk", key,
  204. "externalSecret", es,
  205. "remainingUsers", len(entry.externalSecrets))
  206. // if no more ExternalSecrets are using this informer, remove it
  207. if len(entry.externalSecrets) == 0 {
  208. partial := &metav1.PartialObjectMetadata{}
  209. partial.SetGroupVersionKind(gvk)
  210. if err := m.cache.RemoveInformer(ctx, partial); err != nil {
  211. m.log.Error(err, "failed to remove informer, will clean up tracking anyway",
  212. "gvk", key)
  213. }
  214. delete(m.informers, key)
  215. m.log.Info("removed informer for generic target (no more users)",
  216. "group", gvk.Group,
  217. "version", gvk.Version,
  218. "kind", gvk.Kind)
  219. }
  220. return nil
  221. }
  222. // IsManaged returns true if the manager is currently managing an informer for the GVK.
  223. func (m *DefaultInformerManager) IsManaged(gvk schema.GroupVersionKind) bool {
  224. m.mu.RLock()
  225. defer m.mu.RUnlock()
  226. _, exists := m.informers[gvk.String()]
  227. return exists
  228. }
  229. // GetInformer returns the informer for a GVK if it exists.
  230. func (m *DefaultInformerManager) GetInformer(gvk schema.GroupVersionKind) (runtimecache.Informer, bool) {
  231. m.mu.RLock()
  232. defer m.mu.RUnlock()
  233. entry, exists := m.informers[gvk.String()]
  234. if !exists {
  235. return nil, false
  236. }
  237. return entry.informer, true
  238. }
  239. // Source returns a source.TypedSource that binds the reconcile queue to this manager.
  240. func (m *DefaultInformerManager) Source() source.TypedSource[reconcile.Request] {
  241. return source.Func(func(_ context.Context, queue workqueue.TypedRateLimitingInterface[ctrl.Request]) error {
  242. // This dynamically binds the given queue to the informer manager
  243. // From this point on, the queue will receive events for all registered informers
  244. return m.SetQueue(queue)
  245. })
  246. }
  247. // SetQueue binds the reconcile queue to the informer manager.
  248. func (m *DefaultInformerManager) SetQueue(queue workqueue.TypedRateLimitingInterface[ctrl.Request]) error {
  249. m.mu.Lock()
  250. defer m.mu.Unlock()
  251. if m.queue != nil {
  252. return fmt.Errorf("queue already set")
  253. }
  254. m.queue = queue
  255. m.log.Info("reconcile queue bound to informer manager")
  256. return nil
  257. }