Browse Source

Use locks for GCP PushSecrets (#2678)

* Use locks for GCP PushSecrets

Signed-off-by: shuheiktgw <s-kitagawa@mercari.com>

* Share locks among providers

Signed-off-by: shuheiktgw <s-kitagawa@mercari.com>

---------

Signed-off-by: shuheiktgw <s-kitagawa@mercari.com>
Shuhei Kitagawa 2 years ago
parent
commit
150e3dfde1

+ 7 - 0
pkg/controllers/pushsecret/pushsecret_controller.go

@@ -16,6 +16,7 @@ package pushsecret
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"strings"
 	"time"
@@ -36,6 +37,7 @@ import (
 	ctrlmetrics "github.com/external-secrets/external-secrets/pkg/controllers/metrics"
 	"github.com/external-secrets/external-secrets/pkg/controllers/pushsecret/psmetrics"
 	"github.com/external-secrets/external-secrets/pkg/controllers/secretstore"
+	"github.com/external-secrets/external-secrets/pkg/provider/util/locks"
 )
 
 const (
@@ -146,6 +148,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
 	}
 	syncedSecrets, err := r.PushSecretToProviders(ctx, secretStores, ps, secret, mgr)
 	if err != nil {
+		if errors.Is(err, locks.ErrConflict) {
+			log.Info("retry to acquire lock to update the secret later", "error", err)
+			return ctrl.Result{Requeue: true}, nil
+		}
+
 		msg := fmt.Sprintf(errFailedSetSecret, err)
 		cond := NewPushSecretCondition(esapi.PushSecretReady, v1.ConditionFalse, esapi.ReasonErrored, msg)
 		ps = SetPushSecretCondition(ps, *cond)

+ 14 - 2
pkg/provider/gcp/secretmanager/client.go

@@ -38,6 +38,7 @@ import (
 	"github.com/external-secrets/external-secrets/pkg/constants"
 	"github.com/external-secrets/external-secrets/pkg/find"
 	"github.com/external-secrets/external-secrets/pkg/metrics"
+	"github.com/external-secrets/external-secrets/pkg/provider/util/locks"
 	"github.com/external-secrets/external-secrets/pkg/utils"
 )
 
@@ -66,6 +67,8 @@ const (
 
 	managedByKey   = "managed-by"
 	managedByValue = "external-secrets"
+
+	providerName = "GCPSecretManager"
 )
 
 type Client struct {
@@ -111,6 +114,7 @@ func (c *Client) DeleteSecret(ctx context.Context, remoteRef esv1beta1.PushRemot
 
 	deleteSecretVersionReq := &secretmanagerpb.DeleteSecretRequest{
 		Name: fmt.Sprintf("projects/%s/secrets/%s", c.store.ProjectID, remoteRef.GetRemoteKey()),
+		Etag: gcpSecret.Etag,
 	}
 	err = c.smClient.DeleteSecret(ctx, deleteSecretVersionReq)
 	metrics.ObserveAPICall(constants.ProviderGCPSM, constants.CallGCPSMDeleteSecret, err)
@@ -127,8 +131,9 @@ func parseError(err error) error {
 
 // PushSecret pushes a kubernetes secret key into gcp provider Secret.
 func (c *Client) PushSecret(ctx context.Context, payload []byte, metadata *apiextensionsv1.JSON, remoteRef esv1beta1.PushRemoteRef) error {
+	secretName := fmt.Sprintf("projects/%s/secrets/%s", c.store.ProjectID, remoteRef.GetRemoteKey())
 	gcpSecret, err := c.smClient.GetSecret(ctx, &secretmanagerpb.GetSecretRequest{
-		Name: fmt.Sprintf("projects/%s/secrets/%s", c.store.ProjectID, remoteRef.GetRemoteKey()),
+		Name: secretName,
 	})
 	metrics.ObserveAPICall(constants.ProviderGCPSM, constants.CallGCPSMGetSecret, err)
 
@@ -171,6 +176,7 @@ func (c *Client) PushSecret(ctx context.Context, payload []byte, metadata *apiex
 		_, err = c.smClient.UpdateSecret(ctx, &secretmanagerpb.UpdateSecretRequest{
 			Secret: &secretmanagerpb.Secret{
 				Name:        gcpSecret.Name,
+				Etag:        gcpSecret.Etag,
 				Labels:      labels,
 				Annotations: annotations,
 			},
@@ -184,8 +190,14 @@ func (c *Client) PushSecret(ctx context.Context, payload []byte, metadata *apiex
 		}
 	}
 
+	unlock, err := locks.TryLock(providerName, secretName)
+	if err != nil {
+		return err
+	}
+	defer unlock()
+
 	gcpVersion, err := c.smClient.AccessSecretVersion(ctx, &secretmanagerpb.AccessSecretVersionRequest{
-		Name: fmt.Sprintf("projects/%s/secrets/%s/versions/latest", c.store.ProjectID, remoteRef.GetRemoteKey()),
+		Name: fmt.Sprintf("%s/versions/latest", secretName),
 	})
 	metrics.ObserveAPICall(constants.ProviderGCPSM, constants.CallGCPSMAccessSecretVersion, err)
 

+ 53 - 0
pkg/provider/util/locks/secret_locks.go

@@ -0,0 +1,53 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+	http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package locks
+
+import (
+	"errors"
+	"fmt"
+	"sync"
+)
+
+var (
+	ErrConflict = errors.New("unable to access secret since it is locked")
+
+	sharedLocks = &secretLocks{}
+)
+
+func TryLock(providerName, secretName string) (func(), error) {
+	key := fmt.Sprintf("%s#%s", providerName, secretName)
+	unlockFunc, ok := sharedLocks.tryLock(key)
+	if !ok {
+		return nil, fmt.Errorf(
+			"failed to acquire lock: provider: %s, secret: %s: %w",
+			providerName,
+			secretName,
+			ErrConflict,
+		)
+	}
+
+	return unlockFunc, nil
+}
+
+// secretLocks is a collection of locks for secrets to prevent lost update.
+type secretLocks struct {
+	locks sync.Map
+}
+
+// tryLock tries to hold lock for a given secret and returns true if succeeded.
+func (s *secretLocks) tryLock(key string) (func(), bool) {
+	lock, _ := s.locks.LoadOrStore(key, &sync.Mutex{})
+	mu, _ := lock.(*sync.Mutex)
+	return mu.Unlock, mu.TryLock()
+}

+ 90 - 0
pkg/provider/util/locks/secret_locks_test.go

@@ -0,0 +1,90 @@
+/*
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+	http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package locks
+
+import (
+	"strings"
+	"testing"
+)
+
+func TestTryLock(t *testing.T) {
+	t.Parallel()
+
+	providerName := "test-provider"
+	secretName := "test-secret"
+
+	tests := []struct {
+		desc       string
+		preprocess func() chan error
+		expected   string
+	}{
+		{
+			desc: "No conflict occurs and hold lock successfully",
+			preprocess: func() chan error {
+				ch := make(chan error)
+				go func() {
+					ch <- nil
+				}()
+				return ch
+			},
+			expected: "",
+		},
+		{
+			desc: "Conflict occurs and cannot hold lock",
+			preprocess: func() chan error {
+				ch := make(chan error)
+				go func() {
+					_, err := TryLock(providerName, secretName)
+					ch <- err
+				}()
+				return ch
+			},
+			expected: "failed to acquire lock: provider: test-provider, secret: test-secret: unable to access secret since it is locked",
+		},
+	}
+
+	for _, tc := range tests {
+		t.Run(tc.desc, func(t *testing.T) {
+			// Evacuate the sharedLocks temporarily
+			tmp := sharedLocks
+			sharedLocks = &secretLocks{}
+			defer func() {
+				sharedLocks = tmp
+			}()
+
+			ch := tc.preprocess()
+
+			err := <-ch
+			if err != nil {
+				t.Fatalf("preprocessing failed: %v", err)
+			}
+
+			_, got := TryLock(providerName, secretName)
+			if got != nil {
+				if tc.expected == "" {
+					t.Fatalf("received an unexpected error: %v", got)
+				}
+
+				if !strings.Contains(got.Error(), tc.expected) {
+					t.Fatalf("error %q is supposed to contain %q", got, tc.expected)
+				}
+				return
+			}
+
+			if tc.expected != "" {
+				t.Fatal("expected to receive an error but got nil")
+			}
+		})
+	}
+}