informer_manager.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. /*
  2. Copyright © 2025 ESO Maintainer Team
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. https://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package externalsecret
  14. import (
  15. "context"
  16. "fmt"
  17. "sync"
  18. "github.com/go-logr/logr"
  19. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  20. "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
  21. "k8s.io/apimachinery/pkg/fields"
  22. "k8s.io/apimachinery/pkg/runtime/schema"
  23. "k8s.io/apimachinery/pkg/types"
  24. "k8s.io/client-go/util/workqueue"
  25. ctrl "sigs.k8s.io/controller-runtime"
  26. runtimecache "sigs.k8s.io/controller-runtime/pkg/cache"
  27. "sigs.k8s.io/controller-runtime/pkg/client"
  28. "sigs.k8s.io/controller-runtime/pkg/reconcile"
  29. "sigs.k8s.io/controller-runtime/pkg/source"
  30. esv1 "github.com/external-secrets/external-secrets/apis/externalsecrets/v1"
  31. )
  32. // InformerManager manages the lifecycle of informers for generic target resources.
  33. // It handles dynamic registration, tracking, and cleanup of informers.
  34. type InformerManager interface {
  35. // EnsureInformer ensures an informer exists for the given GVK and registers the ExternalSecret as using it.
  36. // Returns true if a new informer was created, false if it already existed.
  37. EnsureInformer(ctx context.Context, gvk schema.GroupVersionKind, es types.NamespacedName) (bool, error)
  38. // ReleaseInformer unregisters the ExternalSecret from using this GVK.
  39. // If no more ExternalSecrets use this GVK, the informer is stopped and removed.
  40. ReleaseInformer(ctx context.Context, gvk schema.GroupVersionKind, es types.NamespacedName) error
  41. // IsManaged returns true if the manager is currently managing an informer for the GVK.
  42. IsManaged(gvk schema.GroupVersionKind) bool
  43. // GetInformer returns the informer for a GVK if it exists.
  44. GetInformer(gvk schema.GroupVersionKind) (runtimecache.Informer, bool)
  45. // Source returns a source.TypedSource that can be used with WatchesRawSource
  46. Source() source.TypedSource[reconcile.Request]
  47. // SetQueue binds the reconcile queue to the informer manager
  48. SetQueue(queue workqueue.TypedRateLimitingInterface[ctrl.Request]) error
  49. }
  50. // informerEntry tracks an informer and the ExternalSecrets using it.
  51. type informerEntry struct {
  52. informer runtimecache.Informer
  53. // externalSecrets tracks the external secrets using a GVK. Once this list is empty, we
  54. // stop the informer and deregister it to free up resources. It is a map instead of just a number to prevent
  55. // duplicated reconcile ensures to increase the number on each reconcile.
  56. externalSecrets map[types.NamespacedName]struct{}
  57. }
  58. // DefaultInformerManager implements InformerManager using controller-runtime's cache.
  59. type DefaultInformerManager struct {
  60. cache runtimecache.Cache
  61. client client.Client
  62. log logr.Logger
  63. mu sync.RWMutex
  64. informers map[string]*informerEntry // key: GVK string
  65. queue workqueue.TypedRateLimitingInterface[ctrl.Request]
  66. managerContext context.Context
  67. }
  68. // NewInformerManager creates a new InformerManager.
  69. func NewInformerManager(ctx context.Context, cache runtimecache.Cache, client client.Client, log logr.Logger) InformerManager {
  70. return &DefaultInformerManager{
  71. managerContext: ctx,
  72. cache: cache,
  73. client: client,
  74. log: log,
  75. informers: make(map[string]*informerEntry),
  76. }
  77. }
  78. // EnsureInformer ensures an informer exists for the given GVK and registers the ExternalSecret.
  79. func (m *DefaultInformerManager) EnsureInformer(ctx context.Context, gvk schema.GroupVersionKind, es types.NamespacedName) (bool, error) {
  80. m.mu.Lock()
  81. defer m.mu.Unlock()
  82. key := gvk.String()
  83. // check if we have this gvk in the list of informers already
  84. if entry, exists := m.informers[key]; exists {
  85. // register this ExternalSecret as using this informer (deduplicate);
  86. entry.externalSecrets[es] = struct{}{}
  87. m.log.Info("registered ExternalSecret with existing informer",
  88. "gvk", key,
  89. "externalSecret", es,
  90. "totalUsers", len(entry.externalSecrets))
  91. return false, nil
  92. }
  93. if m.queue == nil {
  94. return false, fmt.Errorf("queue not initialized, call SetQueue first")
  95. }
  96. obj := &unstructured.Unstructured{}
  97. obj.SetGroupVersionKind(gvk)
  98. informer, err := m.cache.GetInformer(ctx, obj)
  99. if err != nil {
  100. return false, fmt.Errorf("failed to get informer for %s: %w", key, err)
  101. }
  102. // Add event handler to the informer that enqueues reconcile requests
  103. _, err = informer.AddEventHandler(&enqueueHandler{
  104. managerContext: m.managerContext,
  105. gvk: gvk,
  106. client: m.client,
  107. queue: m.queue,
  108. log: m.log,
  109. })
  110. if err != nil {
  111. return false, fmt.Errorf("failed to add event handler for %s: %w", key, err)
  112. }
  113. // Store the informer with this ExternalSecret as the first user
  114. m.informers[key] = &informerEntry{
  115. informer: informer,
  116. externalSecrets: map[types.NamespacedName]struct{}{es: {}},
  117. }
  118. m.log.Info("registered informer for generic target",
  119. "group", gvk.Group,
  120. "version", gvk.Version,
  121. "kind", gvk.Kind,
  122. "externalSecret", es)
  123. return true, nil
  124. }
  125. // enqueueHandler is an event handler that enqueues reconcile requests for ExternalSecrets
  126. // that target the changed resource.
  127. type enqueueHandler struct {
  128. managerContext context.Context
  129. gvk schema.GroupVersionKind
  130. client client.Client
  131. queue workqueue.TypedRateLimitingInterface[ctrl.Request]
  132. log logr.Logger
  133. }
  134. func (h *enqueueHandler) OnAdd(obj interface{}, _ bool) {
  135. h.enqueue(obj)
  136. }
  137. func (h *enqueueHandler) OnUpdate(_, newObj interface{}) {
  138. h.enqueue(newObj)
  139. }
  140. func (h *enqueueHandler) OnDelete(obj interface{}) {
  141. h.enqueue(obj)
  142. }
  143. func (h *enqueueHandler) enqueue(obj interface{}) {
  144. // Extract metadata
  145. meta, ok := obj.(metav1.Object)
  146. if !ok {
  147. h.log.Error(nil, "unexpected object type", "type", fmt.Sprintf("%T", obj))
  148. return
  149. }
  150. // Only process resources with the managed label
  151. labels := meta.GetLabels()
  152. if labels == nil {
  153. return
  154. }
  155. value, hasLabel := labels[esv1.LabelManaged]
  156. if !hasLabel || value != esv1.LabelManagedValue {
  157. return
  158. }
  159. // Find ExternalSecrets that target this resource
  160. externalSecretsList := &esv1.ExternalSecretList{}
  161. indexValue := fmt.Sprintf("%s/%s/%s/%s", h.gvk.Group, h.gvk.Version, h.gvk.Kind, meta.GetName())
  162. listOps := &client.ListOptions{
  163. FieldSelector: fields.OneTermEqualSelector(indexESTargetResourceField, indexValue),
  164. Namespace: meta.GetNamespace(),
  165. }
  166. if err := h.client.List(h.managerContext, externalSecretsList, listOps); err != nil {
  167. h.log.Error(err, "failed to list ExternalSecrets for resource",
  168. "gvk", h.gvk.String(),
  169. "name", meta.GetName(),
  170. "namespace", meta.GetNamespace())
  171. return
  172. }
  173. // Enqueue reconcile requests for each ExternalSecret
  174. for i := range externalSecretsList.Items {
  175. req := ctrl.Request{
  176. NamespacedName: types.NamespacedName{
  177. Name: externalSecretsList.Items[i].GetName(),
  178. Namespace: externalSecretsList.Items[i].GetNamespace(),
  179. },
  180. }
  181. h.queue.Add(req)
  182. h.log.V(1).Info("enqueued reconcile request due to resource change",
  183. "externalSecret", req.NamespacedName,
  184. "targetGVK", h.gvk.String(),
  185. "targetResource", meta.GetName())
  186. }
  187. }
  188. // ReleaseInformer unregisters the ExternalSecret from using this GVK.
  189. func (m *DefaultInformerManager) ReleaseInformer(ctx context.Context, gvk schema.GroupVersionKind, es types.NamespacedName) error {
  190. m.mu.Lock()
  191. defer m.mu.Unlock()
  192. key := gvk.String()
  193. entry, exists := m.informers[key]
  194. if !exists {
  195. // Already removed or never existed; can happen if we had a bad start, failed to watch, or during other errors.
  196. // In that case, there is nothing else to do really.
  197. m.log.Info("informer not found for release",
  198. "gvk", key,
  199. "externalSecret", es)
  200. return nil
  201. }
  202. // remove the ES from the list of ESs using this GVK
  203. delete(entry.externalSecrets, es)
  204. m.log.Info("unregistered ExternalSecret from informer",
  205. "gvk", key,
  206. "externalSecret", es,
  207. "remainingUsers", len(entry.externalSecrets))
  208. // if no more ExternalSecrets are using this informer, remove it
  209. if len(entry.externalSecrets) == 0 {
  210. obj := &unstructured.Unstructured{}
  211. obj.SetGroupVersionKind(gvk)
  212. if err := m.cache.RemoveInformer(ctx, obj); err != nil {
  213. m.log.Error(err, "failed to remove informer, will clean up tracking anyway",
  214. "gvk", key)
  215. }
  216. delete(m.informers, key)
  217. m.log.Info("removed informer for generic target (no more users)",
  218. "group", gvk.Group,
  219. "version", gvk.Version,
  220. "kind", gvk.Kind)
  221. }
  222. return nil
  223. }
  224. // IsManaged returns true if the manager is currently managing an informer for the GVK.
  225. func (m *DefaultInformerManager) IsManaged(gvk schema.GroupVersionKind) bool {
  226. m.mu.RLock()
  227. defer m.mu.RUnlock()
  228. _, exists := m.informers[gvk.String()]
  229. return exists
  230. }
  231. // GetInformer returns the informer for a GVK if it exists.
  232. func (m *DefaultInformerManager) GetInformer(gvk schema.GroupVersionKind) (runtimecache.Informer, bool) {
  233. m.mu.RLock()
  234. defer m.mu.RUnlock()
  235. entry, exists := m.informers[gvk.String()]
  236. if !exists {
  237. return nil, false
  238. }
  239. return entry.informer, true
  240. }
  241. // Source returns a source.TypedSource that binds the reconcile queue to this manager.
  242. func (m *DefaultInformerManager) Source() source.TypedSource[reconcile.Request] {
  243. return source.Func(func(_ context.Context, queue workqueue.TypedRateLimitingInterface[ctrl.Request]) error {
  244. // This dynamically binds the given queue to the informer manager
  245. // From this point on, the queue will receive events for all registered informers
  246. return m.SetQueue(queue)
  247. })
  248. }
  249. // SetQueue binds the reconcile queue to the informer manager.
  250. func (m *DefaultInformerManager) SetQueue(queue workqueue.TypedRateLimitingInterface[ctrl.Request]) error {
  251. m.mu.Lock()
  252. defer m.mu.Unlock()
  253. if m.queue != nil {
  254. return fmt.Errorf("queue already set")
  255. }
  256. m.queue = queue
  257. m.log.Info("reconcile queue bound to informer manager")
  258. return nil
  259. }