manager.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. /*
  2. Copyright © 2025 ESO Maintainer Team
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. https://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package clientmanager provides a Manager for provider clients
  14. package clientmanager
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "regexp"
  20. "strings"
  21. "sync"
  22. "github.com/go-logr/logr"
  23. v1 "k8s.io/api/core/v1"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/labels"
  26. "k8s.io/apimachinery/pkg/types"
  27. ctrl "sigs.k8s.io/controller-runtime"
  28. "sigs.k8s.io/controller-runtime/pkg/client"
  29. esv1 "github.com/external-secrets/external-secrets/apis/externalsecrets/v1"
  30. "github.com/external-secrets/external-secrets/pkg/controllers/secretstore/storeutil"
  31. pb "github.com/external-secrets/external-secrets/proto/provider"
  32. adapterstore "github.com/external-secrets/external-secrets/providers/v2/adapter/store"
  33. "github.com/external-secrets/external-secrets/providers/v2/common/grpc"
  34. )
  35. const (
  36. errGetClusterSecretStore = "could not get ClusterSecretStore %q, %w"
  37. errGetSecretStore = "could not get SecretStore %q, %w"
  38. errClusterStoreMismatch = "using cluster store %q is not allowed from namespace %q: denied by spec.condition"
  39. )
  40. var (
  41. // globalV2ConnectionPool is a singleton connection pool for v2 gRPC providers.
  42. // It persists across all reconciles and Manager instances to enable connection reuse.
  43. // Initialized once on first use and shared globally.
  44. globalV2ConnectionPool *grpc.ConnectionPool
  45. globalV2ConnectionPoolOnce sync.Once
  46. globalV2ConnectionPoolLog logr.Logger
  47. )
  48. // initGlobalV2ConnectionPool initializes the global connection pool for v2 providers.
  49. // This is called once on first use via sync.Once.
  50. func initGlobalV2ConnectionPool() {
  51. globalV2ConnectionPoolLog = ctrl.Log.WithName("v2-connection-pool")
  52. poolConfig := grpc.DefaultPoolConfig()
  53. globalV2ConnectionPool = grpc.NewConnectionPool(poolConfig)
  54. globalV2ConnectionPoolLog.Info("global v2 connection pool initialized",
  55. "maxIdleTime", poolConfig.MaxIdleTime.String(),
  56. "maxLifetime", poolConfig.MaxLifetime.String(),
  57. "healthCheckInterval", poolConfig.HealthCheckInterval.String())
  58. }
  59. // getGlobalV2ConnectionPool returns the global connection pool, initializing it if needed.
  60. func getGlobalV2ConnectionPool() *grpc.ConnectionPool {
  61. globalV2ConnectionPoolOnce.Do(initGlobalV2ConnectionPool)
  62. return globalV2ConnectionPool
  63. }
  64. // v2PooledConnection tracks connection info needed to release connections back to the pool.
  65. type v2PooledConnection struct {
  66. address string
  67. tlsConfig *grpc.TLSConfig
  68. }
  69. // Manager stores instances of provider clients
  70. // At any given time we must have no more than one instance
  71. // of a client (due to limitations in GCP / see mutexlock there)
  72. // If the controller requests another instance of a given client
  73. // we will close the old client first and then construct a new one.
  74. type Manager struct {
  75. log logr.Logger
  76. client client.Client
  77. controllerClass string
  78. enableFloodgate bool
  79. // store clients by provider type
  80. clientMap map[clientKey]*clientVal
  81. // Track v2 provider connections for release back to pool
  82. v2PooledConnections []v2PooledConnection
  83. }
  84. type clientKey struct {
  85. providerType string
  86. // For v2 providers, store the provider name and namespace
  87. v2ProviderName string
  88. v2ProviderNamespace string
  89. }
  90. type clientVal struct {
  91. client esv1.SecretsClient
  92. store esv1.GenericStore
  93. // For v2 providers, store the generation for cache invalidation
  94. v2ProviderGeneration int64
  95. }
  96. // NewManager constructs a new manager with defaults.
  97. func NewManager(ctrlClient client.Client, controllerClass string, enableFloodgate bool) *Manager {
  98. log := ctrl.Log.WithName("clientmanager")
  99. return &Manager{
  100. log: log,
  101. client: ctrlClient,
  102. controllerClass: controllerClass,
  103. enableFloodgate: enableFloodgate,
  104. clientMap: make(map[clientKey]*clientVal),
  105. }
  106. }
  107. // GetFromStore returns a provider client from the given store.
  108. // Do not close the client returned from this func, instead close
  109. // the manager once you're done with reconciling the external secret.
  110. func (m *Manager) GetFromStore(ctx context.Context, store esv1.GenericStore, namespace string) (esv1.SecretsClient, error) {
  111. storeProvider, err := esv1.GetProvider(store)
  112. if err != nil {
  113. return nil, err
  114. }
  115. secretClient := m.getStoredClient(ctx, storeProvider, store)
  116. if secretClient != nil {
  117. return secretClient, nil
  118. }
  119. m.log.V(1).Info("creating new client",
  120. "provider", fmt.Sprintf("%T", storeProvider),
  121. "store", fmt.Sprintf("%s/%s", store.GetNamespace(), store.GetName()))
  122. // secret client is created only if we are going to refresh
  123. // this skip an unnecessary check/request in the case we are not going to do anything
  124. secretClient, err = storeProvider.NewClient(ctx, store, m.client, namespace)
  125. if err != nil {
  126. return nil, err
  127. }
  128. idx := storeKey(storeProvider)
  129. m.clientMap[idx] = &clientVal{
  130. client: secretClient,
  131. store: store,
  132. }
  133. return secretClient, nil
  134. }
  135. // Get returns a provider client from the given storeRef or sourceRef.secretStoreRef
  136. // while sourceRef.SecretStoreRef takes precedence over storeRef.
  137. // Do not close the client returned from this func, instead close
  138. // the manager once you're done with recinciling the external secret.
  139. func (m *Manager) Get(ctx context.Context, storeRef esv1.SecretStoreRef, namespace string, sourceRef *esv1.StoreGeneratorSourceRef) (esv1.SecretsClient, error) {
  140. if storeRef.Kind == "Provider" {
  141. return m.getV2ProviderClient(ctx, storeRef.Name, namespace)
  142. }
  143. if sourceRef != nil && sourceRef.SecretStoreRef != nil {
  144. storeRef = *sourceRef.SecretStoreRef
  145. }
  146. store, err := m.getStore(ctx, &storeRef, namespace)
  147. if err != nil {
  148. return nil, err
  149. }
  150. // check if store should be handled by this controller instance
  151. if !storeutil.ShouldProcessStore(store, m.controllerClass) {
  152. return nil, errors.New("can not reference unmanaged store")
  153. }
  154. // when using ClusterSecretStore, validate the ClusterSecretStore namespace conditions
  155. shouldProcess, err := m.shouldProcessSecret(store, namespace)
  156. if err != nil {
  157. return nil, err
  158. }
  159. if !shouldProcess {
  160. return nil, fmt.Errorf(errClusterStoreMismatch, store.GetName(), namespace)
  161. }
  162. if m.enableFloodgate {
  163. err := storeutil.AssertStoreIsUsable(store)
  164. if err != nil {
  165. return nil, err
  166. }
  167. }
  168. return m.GetFromStore(ctx, store, namespace)
  169. }
  170. // getV2ProviderClient creates or retrieves a cached gRPC client for a v2 Provider.
  171. // It uses the global connection pool to enable connection reuse across reconciles.
  172. func (m *Manager) getV2ProviderClient(ctx context.Context, providerName, namespace string) (esv1.SecretsClient, error) {
  173. // Fetch the Provider resource
  174. var provider esv1.Provider
  175. providerKey := types.NamespacedName{
  176. Name: providerName,
  177. Namespace: namespace,
  178. }
  179. if err := m.client.Get(ctx, providerKey, &provider); err != nil {
  180. return nil, fmt.Errorf("failed to get Provider %q: %w", providerName, err)
  181. }
  182. // Create cache key
  183. cacheKey := clientKey{
  184. providerType: "v2-provider",
  185. v2ProviderName: providerName,
  186. v2ProviderNamespace: namespace,
  187. }
  188. // Check if we have a cached client
  189. if cached, ok := m.clientMap[cacheKey]; ok {
  190. // Validate cache is still valid (check generation)
  191. if cached.v2ProviderGeneration == provider.Generation {
  192. m.log.V(1).Info("reusing cached v2 provider client",
  193. "provider", providerName,
  194. "namespace", namespace,
  195. "generation", provider.Generation)
  196. return cached.client, nil
  197. }
  198. // Cache is stale, release old pooled connection
  199. m.log.V(1).Info("provider generation changed, invalidating cache",
  200. "provider", providerName,
  201. "namespace", namespace,
  202. "oldGeneration", cached.v2ProviderGeneration,
  203. "newGeneration", provider.Generation)
  204. delete(m.clientMap, cacheKey)
  205. }
  206. m.log.V(1).Info("getting v2 provider client from pool",
  207. "provider", providerName,
  208. "namespace", namespace,
  209. "address", provider.Spec.Config.Address)
  210. // Get provider address
  211. address := provider.Spec.Config.Address
  212. if address == "" {
  213. return nil, fmt.Errorf("provider address is required in Provider %q", providerName)
  214. }
  215. // Load TLS configuration
  216. // TODO: use namespace of controller
  217. tlsConfig, err := grpc.LoadClientTLSConfig(ctx, m.client, provider.Spec.Config.Address, "external-secrets-system")
  218. if err != nil {
  219. return nil, fmt.Errorf("failed to load TLS config for Provider %q: %w", providerName, err)
  220. }
  221. // Get connection from global pool (enables connection reuse across reconciles)
  222. pool := getGlobalV2ConnectionPool()
  223. grpcClient, err := pool.Get(ctx, address, tlsConfig)
  224. if err != nil {
  225. return nil, fmt.Errorf("failed to get gRPC client from pool for Provider %q: %w", providerName, err)
  226. }
  227. // Track this connection for release when Manager closes
  228. m.v2PooledConnections = append(m.v2PooledConnections, v2PooledConnection{
  229. address: address,
  230. tlsConfig: tlsConfig,
  231. })
  232. // Convert ProviderReference to protobuf format
  233. providerRef := &pb.ProviderReference{
  234. ApiVersion: provider.Spec.Config.ProviderRef.APIVersion,
  235. Kind: provider.Spec.Config.ProviderRef.Kind,
  236. Name: provider.Spec.Config.ProviderRef.Name,
  237. Namespace: provider.Spec.Config.ProviderRef.Namespace,
  238. }
  239. // Wrap with V2ClientWrapper
  240. wrappedClient := adapterstore.NewClient(grpcClient, providerRef, namespace)
  241. // Cache the client for this Manager instance
  242. m.clientMap[cacheKey] = &clientVal{
  243. client: wrappedClient,
  244. store: nil, // v2 providers don't use GenericStore
  245. v2ProviderGeneration: provider.Generation,
  246. }
  247. m.log.Info("v2 provider client obtained from pool",
  248. "provider", providerName,
  249. "namespace", namespace,
  250. "address", address)
  251. return wrappedClient, nil
  252. }
  253. // returns a previously stored client from the cache if store and store-version match
  254. // if a client exists for the same provider which points to a different store or store version
  255. // it will be cleaned up.
  256. func (m *Manager) getStoredClient(ctx context.Context, storeProvider esv1.ProviderInterface, store esv1.GenericStore) esv1.SecretsClient {
  257. idx := storeKey(storeProvider)
  258. val, ok := m.clientMap[idx]
  259. if !ok {
  260. return nil
  261. }
  262. valGVK, err := m.client.GroupVersionKindFor(val.store)
  263. if err != nil {
  264. return nil
  265. }
  266. storeGVK, err := m.client.GroupVersionKindFor(store)
  267. if err != nil {
  268. return nil
  269. }
  270. storeName := fmt.Sprintf("%s/%s", store.GetNamespace(), store.GetName())
  271. // return client if it points to the very same store
  272. if val.store.GetObjectMeta().Generation == store.GetGeneration() &&
  273. valGVK == storeGVK &&
  274. val.store.GetName() == store.GetName() &&
  275. val.store.GetNamespace() == store.GetNamespace() {
  276. m.log.V(1).Info("reusing stored client",
  277. "provider", fmt.Sprintf("%T", storeProvider),
  278. "store", storeName)
  279. return val.client
  280. }
  281. m.log.V(1).Info("cleaning up client",
  282. "provider", fmt.Sprintf("%T", storeProvider),
  283. "store", storeName)
  284. // if we have a client, but it points to a different store
  285. // we must clean it up
  286. _ = val.client.Close(ctx)
  287. delete(m.clientMap, idx)
  288. return nil
  289. }
  290. func storeKey(storeProvider esv1.ProviderInterface) clientKey {
  291. return clientKey{
  292. providerType: fmt.Sprintf("%T", storeProvider),
  293. }
  294. }
  295. // getStore fetches the (Cluster)SecretStore from the kube-apiserver
  296. // and returns a GenericStore representing it.
  297. func (m *Manager) getStore(ctx context.Context, storeRef *esv1.SecretStoreRef, namespace string) (esv1.GenericStore, error) {
  298. ref := types.NamespacedName{
  299. Name: storeRef.Name,
  300. }
  301. if storeRef.Kind == esv1.ClusterSecretStoreKind {
  302. var store esv1.ClusterSecretStore
  303. err := m.client.Get(ctx, ref, &store)
  304. if err != nil {
  305. return nil, fmt.Errorf(errGetClusterSecretStore, ref.Name, err)
  306. }
  307. return &store, nil
  308. }
  309. ref.Namespace = namespace
  310. var store esv1.SecretStore
  311. err := m.client.Get(ctx, ref, &store)
  312. if err != nil {
  313. return nil, fmt.Errorf(errGetSecretStore, ref.Name, err)
  314. }
  315. return &store, nil
  316. }
  317. // Close cleans up all clients.
  318. // For v1 providers, it closes the clients directly.
  319. // For v2 providers, it releases connections back to the pool for reuse.
  320. func (m *Manager) Close(ctx context.Context) error {
  321. var errs []string
  322. // Release v2 pooled connections back to the pool
  323. pool := getGlobalV2ConnectionPool()
  324. for _, pooledConn := range m.v2PooledConnections {
  325. pool.Release(pooledConn.address, pooledConn.tlsConfig)
  326. m.log.V(1).Info("released v2 connection back to pool",
  327. "address", pooledConn.address)
  328. }
  329. m.v2PooledConnections = nil
  330. // Close v1 provider clients (they don't use the pool)
  331. for key, val := range m.clientMap {
  332. // Only close v1 clients; v2 clients are managed by the pool
  333. if key.providerType != "v2-provider" && key.providerType != "v2-cluster-provider" {
  334. err := val.client.Close(ctx)
  335. if err != nil {
  336. errs = append(errs, err.Error())
  337. }
  338. }
  339. delete(m.clientMap, key)
  340. }
  341. if len(errs) != 0 {
  342. return fmt.Errorf("errors while closing clients: %s", strings.Join(errs, ", "))
  343. }
  344. return nil
  345. }
  346. func (m *Manager) shouldProcessSecret(store esv1.GenericStore, ns string) (bool, error) {
  347. if store.GetKind() != esv1.ClusterSecretStoreKind {
  348. return true, nil
  349. }
  350. if len(store.GetSpec().Conditions) == 0 {
  351. return true, nil
  352. }
  353. namespace := v1.Namespace{}
  354. if err := m.client.Get(context.Background(), client.ObjectKey{Name: ns}, &namespace); err != nil {
  355. return false, fmt.Errorf("failed to get a namespace %q: %w", ns, err)
  356. }
  357. nsLabels := labels.Set(namespace.GetLabels())
  358. for _, condition := range store.GetSpec().Conditions {
  359. var labelSelectors []*metav1.LabelSelector
  360. if condition.NamespaceSelector != nil {
  361. labelSelectors = append(labelSelectors, condition.NamespaceSelector)
  362. }
  363. for _, n := range condition.Namespaces {
  364. labelSelectors = append(labelSelectors, &metav1.LabelSelector{
  365. MatchLabels: map[string]string{
  366. "kubernetes.io/metadata.name": n,
  367. },
  368. })
  369. }
  370. for _, ls := range labelSelectors {
  371. selector, err := metav1.LabelSelectorAsSelector(ls)
  372. if err != nil {
  373. return false, fmt.Errorf("failed to convert label selector into selector %v: %w", ls, err)
  374. }
  375. if selector.Matches(nsLabels) {
  376. return true, nil
  377. }
  378. }
  379. for _, reg := range condition.NamespaceRegexes {
  380. match, err := regexp.MatchString(reg, ns)
  381. if err != nil {
  382. // Should not happen since store validation already verified the regexes.
  383. return false, fmt.Errorf("failed to compile regex %v: %w", reg, err)
  384. }
  385. if match {
  386. return true, nil
  387. }
  388. }
  389. }
  390. return false, nil
  391. }