| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532 |
- /*
- Copyright © The ESO Authors
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- https://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- // Package clientmanager provides a Manager for provider clients
- package clientmanager
- import (
- "context"
- "errors"
- "fmt"
- "regexp"
- "strings"
- "sync"
- "github.com/go-logr/logr"
- v1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/types"
- ctrl "sigs.k8s.io/controller-runtime"
- "sigs.k8s.io/controller-runtime/pkg/client"
- esv1 "github.com/external-secrets/external-secrets/apis/externalsecrets/v1"
- esv1alpha1 "github.com/external-secrets/external-secrets/apis/externalsecrets/v1alpha1"
- adapterstore "github.com/external-secrets/external-secrets/providers/v2/adapter/store"
- "github.com/external-secrets/external-secrets/providers/v2/common/grpc"
- )
- const (
- errGetClusterSecretStore = "could not get ClusterSecretStore %q, %w"
- errGetSecretStore = "could not get SecretStore %q, %w"
- errSecretStoreNotReady = "%s %q is not ready"
- errClusterStoreMismatch = "using cluster store %q is not allowed from namespace %q: denied by spec.condition"
- errClusterProviderStoreDenied = "using ClusterProviderStore %q is not allowed from namespace %q: denied by spec.conditions"
- providerMetricsLabel = "provider"
- clusterProviderMetricsLabel = "cluster-provider"
- cacheInvalidationGeneration = "generation_change"
- cacheInvalidationMismatch = "store_mismatch"
- v2ProviderStoreCacheKey = "v2-provider-store"
- v2ClusterProviderStoreCache = "v2-cluster-provider-store"
- runtimeRefCacheKeyType = "runtime-ref"
- )
- var (
- // globalV2ConnectionPool is a singleton connection pool for v2 gRPC providers.
- // It persists across all reconciles and Manager instances to enable connection reuse.
- // Initialized once on first use and shared globally.
- globalV2ConnectionPool *grpc.ConnectionPool
- globalV2ConnectionPoolOnce sync.Once
- globalV2ConnectionPoolLog logr.Logger
- )
- // initGlobalV2ConnectionPool initializes the global connection pool for v2 providers.
- // This is called once on first use via sync.Once.
- func initGlobalV2ConnectionPool() {
- globalV2ConnectionPoolLog = ctrl.Log.WithName("v2-connection-pool")
- poolConfig := grpc.DefaultPoolConfig()
- globalV2ConnectionPool = grpc.NewConnectionPool(poolConfig)
- globalV2ConnectionPoolLog.Info("global v2 connection pool initialized",
- "maxIdleTime", poolConfig.MaxIdleTime.String(),
- "maxLifetime", poolConfig.MaxLifetime.String(),
- "healthCheckInterval", poolConfig.HealthCheckInterval.String())
- }
- // getGlobalV2ConnectionPool returns the global connection pool, initializing it if needed.
- func getGlobalV2ConnectionPool() *grpc.ConnectionPool {
- globalV2ConnectionPoolOnce.Do(initGlobalV2ConnectionPool)
- return globalV2ConnectionPool
- }
- // v2PooledConnection tracks connection info needed to release connections back to the pool.
- type v2PooledConnection struct {
- address string
- tlsConfig *grpc.TLSConfig
- }
- // Manager stores instances of provider clients
- // At any given time we must have no more than one instance
- // of a client (due to limitations in GCP / see mutexlock there)
- // If the controller requests another instance of a given client
- // we will close the old client first and then construct a new one.
- type Manager struct {
- log logr.Logger
- client client.Client
- controllerClass string
- enableFloodgate bool
- // store clients by provider type
- clientMap map[clientKey]*clientVal
- // Track v2 provider connections for release back to pool
- v2PooledConnections []v2PooledConnection
- }
- type clientKey struct {
- providerType string
- // For v2 providers, store the provider name and namespace
- v2ProviderName string
- v2ProviderNamespace string
- runtimeSourceNamespace string
- }
- type clientVal struct {
- client esv1.SecretsClient
- store esv1.GenericStore
- // For v2 providers, store the generation for cache invalidation
- v2ProviderGeneration int64
- }
- func providerMetricsLabelForScope(isClusterScoped bool) string {
- if isClusterScoped {
- return clusterProviderMetricsLabel
- }
- return providerMetricsLabel
- }
- func providerMetricsLabelForKey(key clientKey) string {
- if key.v2ProviderName == "" {
- return "unknown"
- }
- if key.v2ProviderNamespace == "" {
- return clusterProviderMetricsLabel
- }
- return providerMetricsLabel
- }
- // NewManager constructs a new manager with defaults.
- func NewManager(ctrlClient client.Client, controllerClass string, enableFloodgate bool) *Manager {
- log := ctrl.Log.WithName("clientmanager")
- return &Manager{
- log: log,
- client: ctrlClient,
- controllerClass: controllerClass,
- enableFloodgate: enableFloodgate,
- clientMap: make(map[clientKey]*clientVal),
- }
- }
- // GetFromStore returns a provider client from the given store.
- // Do not close the client returned from this func, instead close
- // the manager once you're done with reconciling the external secret.
- func (m *Manager) GetFromStore(ctx context.Context, store esv1.GenericStore, namespace string) (esv1.SecretsClient, error) {
- if store.GetSpec().RuntimeRef != nil {
- return m.getRuntimeRefClient(ctx, store, namespace)
- }
- storeProvider, err := esv1.GetProvider(store)
- if err != nil {
- return nil, err
- }
- secretClient := m.getStoredClient(ctx, storeProvider, store)
- if secretClient != nil {
- return secretClient, nil
- }
- m.log.V(1).Info("creating new client",
- "provider", fmt.Sprintf("%T", storeProvider),
- "store", fmt.Sprintf("%s/%s", store.GetNamespace(), store.GetName()))
- // secret client is created only if we are going to refresh
- // this skip an unnecessary check/request in the case we are not going to do anything
- secretClient, err = storeProvider.NewClient(ctx, store, m.client, namespace)
- if err != nil {
- return nil, err
- }
- idx := storeKey(storeProvider)
- m.clientMap[idx] = &clientVal{
- client: secretClient,
- store: store,
- }
- return secretClient, nil
- }
- func (m *Manager) getRuntimeRefClient(ctx context.Context, store esv1.GenericStore, namespace string) (esv1.SecretsClient, error) {
- runtimeRef := store.GetSpec().RuntimeRef
- runtimeKind := runtimeRef.Kind
- if runtimeKind == "" {
- runtimeKind = runtimeRefKindClusterProviderClass
- }
- if runtimeKind != runtimeRefKindClusterProviderClass {
- return nil, fmt.Errorf("unsupported runtimeRef kind %q", runtimeKind)
- }
- cacheKey := runtimeRefStoreKey(store, namespace)
- if cached := m.getStoredRuntimeRefClient(ctx, cacheKey, store); cached != nil {
- return cached, nil
- }
- var runtimeClass esv1alpha1.ClusterProviderClass
- if err := m.client.Get(ctx, types.NamespacedName{Name: runtimeRef.Name}, &runtimeClass); err != nil {
- return nil, fmt.Errorf("failed to get %s %q: %w", runtimeRefKindClusterProviderClass, runtimeRef.Name, err)
- }
- compatStore, err := buildCompatibilityStore(store)
- if err != nil {
- return nil, fmt.Errorf("failed to build compatibility store for %s %q: %w", store.GetKind(), store.GetName(), err)
- }
- tlsSecretNamespace := grpc.ResolveTLSSecretNamespace(runtimeClass.Spec.Address, "", "", "")
- tlsConfig, err := grpc.LoadClientTLSConfig(ctx, m.client, runtimeClass.Spec.Address, tlsSecretNamespace)
- if err != nil {
- return nil, fmt.Errorf("failed to load TLS config for %s %q: %w", runtimeRefKindClusterProviderClass, runtimeRef.Name, err)
- }
- pool := getGlobalV2ConnectionPool()
- grpcClient, err := pool.Get(ctx, runtimeClass.Spec.Address, tlsConfig)
- if err != nil {
- return nil, fmt.Errorf("failed to get gRPC client from pool for %s %q: %w", runtimeRefKindClusterProviderClass, runtimeRef.Name, err)
- }
- compatibilityClient := adapterstore.NewCompatibilityClientWithCloser(grpcClient, compatStore, namespace, func(context.Context) error {
- pool.Release(runtimeClass.Spec.Address, tlsConfig)
- return nil
- })
- m.clientMap[cacheKey] = &clientVal{
- client: compatibilityClient,
- store: store,
- }
- return compatibilityClient, nil
- }
- func runtimeRefStoreKey(store esv1.GenericStore, sourceNamespace string) clientKey {
- return clientKey{
- providerType: runtimeRefCacheKeyType + ":" + store.GetKind(),
- v2ProviderName: store.GetName(),
- v2ProviderNamespace: store.GetNamespace(),
- runtimeSourceNamespace: sourceNamespace,
- }
- }
- func (m *Manager) getStoredRuntimeRefClient(ctx context.Context, key clientKey, store esv1.GenericStore) esv1.SecretsClient {
- val, ok := m.clientMap[key]
- if !ok {
- return nil
- }
- valGVK, err := m.client.GroupVersionKindFor(val.store)
- if err != nil {
- return nil
- }
- storeGVK, err := m.client.GroupVersionKindFor(store)
- if err != nil {
- return nil
- }
- if val.store.GetObjectMeta().Generation == store.GetGeneration() &&
- valGVK == storeGVK &&
- val.store.GetName() == store.GetName() &&
- val.store.GetNamespace() == store.GetNamespace() {
- clientManagerMetrics.RecordCacheHit(providerMetricsLabelForKey(key))
- return val.client
- }
- _ = val.client.Close(ctx)
- delete(m.clientMap, key)
- reason := cacheInvalidationMismatch
- if val.store.GetObjectMeta().Generation != store.GetGeneration() {
- reason = cacheInvalidationGeneration
- }
- clientManagerMetrics.RecordCacheInvalidation(providerMetricsLabelForKey(key), reason)
- return nil
- }
- // Get returns a provider client from the given storeRef or sourceRef.secretStoreRef
- // while sourceRef.SecretStoreRef takes precedence over storeRef.
- // Do not close the client returned from this func, instead close
- // the manager once you're done with recinciling the external secret.
- func (m *Manager) Get(ctx context.Context, storeRef esv1.SecretStoreRef, namespace string, sourceRef *esv1.StoreGeneratorSourceRef) (esv1.SecretsClient, error) {
- if sourceRef != nil && sourceRef.SecretStoreRef != nil {
- storeRef = *sourceRef.SecretStoreRef
- }
- if storeRef.Kind == esv1.ProviderStoreKindStr {
- return m.getV2ProviderStoreClient(ctx, storeRef.Name, namespace)
- }
- if storeRef.Kind == esv1.ClusterProviderStoreKindStr {
- return m.getV2ClusterProviderStoreClient(ctx, storeRef.Name, namespace)
- }
- store, err := m.getStore(ctx, &storeRef, namespace)
- if err != nil {
- return nil, err
- }
- // check if store should be handled by this controller instance
- if !ShouldProcessStore(store, m.controllerClass) {
- return nil, errors.New("can not reference unmanaged store")
- }
- // when using ClusterSecretStore, validate the ClusterSecretStore namespace conditions
- shouldProcess, err := m.shouldProcessSecret(store, namespace)
- if err != nil {
- return nil, err
- }
- if !shouldProcess {
- return nil, fmt.Errorf(errClusterStoreMismatch, store.GetName(), namespace)
- }
- if m.enableFloodgate {
- err := assertStoreIsUsable(store)
- if err != nil {
- return nil, err
- }
- }
- return m.GetFromStore(ctx, store, namespace)
- }
- // returns a previously stored client from the cache if store and store-version match
- // if a client exists for the same provider which points to a different store or store version
- // it will be cleaned up.
- func (m *Manager) getStoredClient(ctx context.Context, storeProvider esv1.ProviderInterface, store esv1.GenericStore) esv1.SecretsClient {
- idx := storeKey(storeProvider)
- val, ok := m.clientMap[idx]
- if !ok {
- return nil
- }
- valGVK, err := m.client.GroupVersionKindFor(val.store)
- if err != nil {
- return nil
- }
- storeGVK, err := m.client.GroupVersionKindFor(store)
- if err != nil {
- return nil
- }
- storeName := fmt.Sprintf("%s/%s", store.GetNamespace(), store.GetName())
- // return client if it points to the very same store
- if val.store.GetObjectMeta().Generation == store.GetGeneration() &&
- valGVK == storeGVK &&
- val.store.GetName() == store.GetName() &&
- val.store.GetNamespace() == store.GetNamespace() {
- m.log.V(1).Info("reusing stored client",
- "provider", fmt.Sprintf("%T", storeProvider),
- "store", storeName)
- // Record cache hit
- clientManagerMetrics.RecordCacheHit(providerMetricsLabelForKey(idx))
- return val.client
- }
- m.log.V(1).Info("cleaning up client",
- "provider", fmt.Sprintf("%T", storeProvider),
- "store", storeName)
- // if we have a client, but it points to a different store
- // we must clean it up
- _ = val.client.Close(ctx)
- delete(m.clientMap, idx)
- // Record cache invalidation
- providerType := providerMetricsLabelForKey(idx)
- reason := cacheInvalidationMismatch
- if idx.v2ProviderName != "" {
- if val.store.GetObjectMeta().Generation != store.GetGeneration() {
- reason = cacheInvalidationGeneration
- }
- }
- clientManagerMetrics.RecordCacheInvalidation(providerType, reason)
- return nil
- }
- func storeKey(storeProvider esv1.ProviderInterface) clientKey {
- return clientKey{
- providerType: fmt.Sprintf("%T", storeProvider),
- }
- }
- // getStore fetches the (Cluster)SecretStore from the kube-apiserver
- // and returns a GenericStore representing it.
- func (m *Manager) getStore(ctx context.Context, storeRef *esv1.SecretStoreRef, namespace string) (esv1.GenericStore, error) {
- ref := types.NamespacedName{
- Name: storeRef.Name,
- }
- if storeRef.Kind == esv1.ClusterSecretStoreKind {
- var store esv1.ClusterSecretStore
- err := m.client.Get(ctx, ref, &store)
- if err != nil {
- return nil, fmt.Errorf(errGetClusterSecretStore, ref.Name, err)
- }
- return &store, nil
- }
- ref.Namespace = namespace
- var store esv1.SecretStore
- err := m.client.Get(ctx, ref, &store)
- if err != nil {
- return nil, fmt.Errorf(errGetSecretStore, ref.Name, err)
- }
- return &store, nil
- }
- // Close cleans up all clients.
- // For v1 providers, it closes the clients directly.
- // For v2 providers, it releases connections back to the pool for reuse.
- func (m *Manager) Close(ctx context.Context) error {
- var errs []string
- // Release v2 pooled connections back to the pool
- pool := getGlobalV2ConnectionPool()
- for _, pooledConn := range m.v2PooledConnections {
- pool.Release(pooledConn.address, pooledConn.tlsConfig)
- m.log.V(1).Info("released v2 connection back to pool",
- "address", pooledConn.address)
- }
- m.v2PooledConnections = nil
- // Close v1 provider clients (they don't use the pool)
- for key, val := range m.clientMap {
- // Only close v1 clients; v2 clients are managed by the pool
- if key.providerType != v2ProviderStoreCacheKey &&
- key.providerType != v2ClusterProviderStoreCache {
- err := val.client.Close(ctx)
- if err != nil {
- errs = append(errs, err.Error())
- }
- }
- delete(m.clientMap, key)
- }
- if len(errs) != 0 {
- return fmt.Errorf("errors while closing clients: %s", strings.Join(errs, ", "))
- }
- return nil
- }
- // validateNamespaceConditions checks if a namespace matches the given conditions.
- // Returns true if the namespace is allowed, false if denied.
- func (m *Manager) validateNamespaceConditions(conditions []esv1.ClusterSecretStoreCondition, ns string) (bool, error) {
- if len(conditions) == 0 {
- return true, nil
- }
- namespace := v1.Namespace{}
- if err := m.client.Get(context.Background(), client.ObjectKey{Name: ns}, &namespace); err != nil {
- return false, fmt.Errorf("failed to get a namespace %q: %w", ns, err)
- }
- nsLabels := labels.Set(namespace.GetLabels())
- for _, condition := range conditions {
- var labelSelectors []*metav1.LabelSelector
- if condition.NamespaceSelector != nil {
- labelSelectors = append(labelSelectors, condition.NamespaceSelector)
- }
- for _, n := range condition.Namespaces {
- labelSelectors = append(labelSelectors, &metav1.LabelSelector{
- MatchLabels: map[string]string{
- "kubernetes.io/metadata.name": n,
- },
- })
- }
- for _, ls := range labelSelectors {
- selector, err := metav1.LabelSelectorAsSelector(ls)
- if err != nil {
- return false, fmt.Errorf("failed to convert label selector into selector %v: %w", ls, err)
- }
- if selector.Matches(nsLabels) {
- return true, nil
- }
- }
- for _, reg := range condition.NamespaceRegexes {
- match, err := regexp.MatchString(reg, ns)
- if err != nil {
- // Should not happen since store validation already verified the regexes.
- return false, fmt.Errorf("failed to compile regex %v: %w", reg, err)
- }
- if match {
- return true, nil
- }
- }
- }
- return false, nil
- }
- // shouldProcessSecret validates if a secret should be processed based on namespace conditions.
- // This is a wrapper around validateNamespaceConditions for backward compatibility with GenericStore.
- func (m *Manager) shouldProcessSecret(store esv1.GenericStore, ns string) (bool, error) {
- // Only check conditions for cluster-scoped resources.
- if store.GetKind() != esv1.ClusterSecretStoreKind {
- return true, nil
- }
- return m.validateNamespaceConditions(store.GetSpec().Conditions, ns)
- }
- // assertStoreIsUsable asserts that the store is ready to use.
- func assertStoreIsUsable(store esv1.GenericStore) error {
- if store == nil {
- return nil
- }
- condition := GetSecretStoreCondition(store.GetStatus(), esv1.SecretStoreReady)
- if condition == nil || condition.Status != v1.ConditionTrue {
- return fmt.Errorf(errSecretStoreNotReady, store.GetKind(), store.GetName())
- }
- return nil
- }
- // ShouldProcessStore returns true if the store should be processed.
- func ShouldProcessStore(store esv1.GenericStore, class string) bool {
- if store == nil || store.GetSpec().Controller == "" || store.GetSpec().Controller == class {
- return true
- }
- return false
- }
- // GetSecretStoreCondition returns the condition with the provided type.
- func GetSecretStoreCondition(status esv1.SecretStoreStatus, condType esv1.SecretStoreConditionType) *esv1.SecretStoreStatusCondition {
- for i := range status.Conditions {
- c := status.Conditions[i]
- if c.Type == condType {
- return &c
- }
- }
- return nil
- }
|