client.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. /*
  2. Copyright © The ESO Authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. https://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package grpc
  14. import (
  15. "context"
  16. "fmt"
  17. "time"
  18. "github.com/go-logr/logr"
  19. "google.golang.org/grpc"
  20. "google.golang.org/grpc/connectivity"
  21. corev1 "k8s.io/api/core/v1"
  22. esv1 "github.com/external-secrets/external-secrets/apis/externalsecrets/v1"
  23. pb "github.com/external-secrets/external-secrets/proto/provider"
  24. v2 "github.com/external-secrets/external-secrets/providers/v2/common"
  25. )
  26. const (
  27. // defaultTimeout is the default timeout for gRPC calls.
  28. defaultTimeout = 30 * time.Second
  29. getAllSecretsTimeout = 2 * time.Minute
  30. readIdentityRequiredErr = "provider reference or compatibility store is required for read operations"
  31. )
  32. // grpcProviderClient implements the v2.Provider interface using gRPC.
  33. type grpcProviderClient struct {
  34. conn *grpc.ClientConn
  35. client pb.SecretStoreProviderClient
  36. log logr.Logger
  37. }
  38. // Ensure grpcProviderClient implements the Provider interface.
  39. var _ v2.Provider = &grpcProviderClient{}
  40. // GetSecret retrieves a single secret from the provider via gRPC.
  41. func (c *grpcProviderClient) GetSecret(
  42. ctx context.Context,
  43. ref esv1.ExternalSecretDataRemoteRef,
  44. providerRef *pb.ProviderReference,
  45. compatibilityStore *pb.CompatibilityStore,
  46. sourceNamespace string,
  47. ) ([]byte, error) {
  48. start := time.Now()
  49. var err error
  50. defer func() {
  51. clientMetrics.ObserveRequest("GetSecret", c.conn.Target(), err, time.Since(start))
  52. }()
  53. if validationErr := validateReadIdentity(providerRef, compatibilityStore); validationErr != nil {
  54. err = validationErr
  55. return nil, err
  56. }
  57. logFields := append([]any{
  58. "key", ref.Key,
  59. "version", ref.Version,
  60. "property", ref.Property,
  61. "connectionState", c.conn.GetState().String(),
  62. "providerRef", providerRef,
  63. "sourceNamespace", sourceNamespace,
  64. }, compatibilityStoreLogFields(compatibilityStore)...)
  65. c.log.V(1).Info("getting secret via gRPC", logFields...)
  66. // Check connection state before call
  67. state := c.conn.GetState()
  68. if state != connectivity.Ready && state != connectivity.Idle {
  69. c.log.Info("connection not ready, attempting to reconnect",
  70. "state", state.String(),
  71. "target", c.conn.Target())
  72. }
  73. // Create context with timeout
  74. ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
  75. defer cancel()
  76. // Convert v1 reference to protobuf message
  77. pbRef := &pb.ExternalSecretDataRemoteRef{
  78. Key: ref.Key,
  79. Version: ref.Version,
  80. Property: ref.Property,
  81. DecodingStrategy: string(ref.DecodingStrategy),
  82. MetadataPolicy: string(ref.MetadataPolicy),
  83. }
  84. // Make gRPC call with provider reference
  85. req := &pb.GetSecretRequest{
  86. RemoteRef: pbRef,
  87. ProviderRef: providerRef,
  88. CompatibilityStore: compatibilityStore,
  89. SourceNamespace: sourceNamespace,
  90. }
  91. c.log.V(1).Info("calling GetSecret RPC",
  92. "target", c.conn.Target(),
  93. "timeout", defaultTimeout.String())
  94. resp, err := c.client.GetSecret(ctx, req)
  95. if err != nil {
  96. c.log.Error(err, "GetSecret RPC failed",
  97. "key", ref.Key,
  98. "connectionState", c.conn.GetState().String(),
  99. "target", c.conn.Target())
  100. err = fmt.Errorf("failed to get secret via gRPC: %w", err)
  101. return nil, err
  102. }
  103. c.log.V(1).Info("GetSecret RPC succeeded",
  104. "key", ref.Key,
  105. "valueLength", len(resp.Value))
  106. return resp.Value, nil
  107. }
  108. // GetSecretMap retrieves multiple key/value pairs from a single provider object via gRPC.
  109. func (c *grpcProviderClient) GetSecretMap(
  110. ctx context.Context,
  111. ref esv1.ExternalSecretDataRemoteRef,
  112. providerRef *pb.ProviderReference,
  113. compatibilityStore *pb.CompatibilityStore,
  114. sourceNamespace string,
  115. ) (map[string][]byte, error) {
  116. start := time.Now()
  117. var err error
  118. defer func() {
  119. clientMetrics.ObserveRequest("GetSecretMap", c.conn.Target(), err, time.Since(start))
  120. }()
  121. if validationErr := validateReadIdentity(providerRef, compatibilityStore); validationErr != nil {
  122. err = validationErr
  123. return nil, err
  124. }
  125. logFields := append([]any{
  126. "key", ref.Key,
  127. "version", ref.Version,
  128. "property", ref.Property,
  129. "connectionState", c.conn.GetState().String(),
  130. "providerRef", providerRef,
  131. "sourceNamespace", sourceNamespace,
  132. }, compatibilityStoreLogFields(compatibilityStore)...)
  133. c.log.V(1).Info("getting secret map via gRPC", logFields...)
  134. ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
  135. defer cancel()
  136. pbRef := &pb.ExternalSecretDataRemoteRef{
  137. Key: ref.Key,
  138. Version: ref.Version,
  139. Property: ref.Property,
  140. DecodingStrategy: string(ref.DecodingStrategy),
  141. MetadataPolicy: string(ref.MetadataPolicy),
  142. }
  143. req := &pb.GetSecretMapRequest{
  144. RemoteRef: pbRef,
  145. ProviderRef: providerRef,
  146. CompatibilityStore: compatibilityStore,
  147. SourceNamespace: sourceNamespace,
  148. }
  149. c.log.V(1).Info("calling GetSecretMap RPC",
  150. "target", c.conn.Target())
  151. resp, err := c.client.GetSecretMap(ctx, req)
  152. if err != nil {
  153. c.log.Error(err, "GetSecretMap RPC failed",
  154. "connectionState", c.conn.GetState().String(),
  155. "target", c.conn.Target())
  156. err = fmt.Errorf("failed to get secret map via gRPC: %w", err)
  157. return nil, err
  158. }
  159. c.log.V(1).Info("GetSecretMap RPC succeeded",
  160. "secretCount", len(resp.Secrets))
  161. return resp.Secrets, nil
  162. }
  163. // Validate checks if the provider is properly configured via gRPC.
  164. func (c *grpcProviderClient) Validate(ctx context.Context, providerRef *pb.ProviderReference, compatibilityStore *pb.CompatibilityStore, sourceNamespace string) error {
  165. start := time.Now()
  166. var err error
  167. defer func() {
  168. clientMetrics.ObserveRequest("Validate", c.conn.Target(), err, time.Since(start))
  169. }()
  170. if validationErr := validateReadIdentity(providerRef, compatibilityStore); validationErr != nil {
  171. err = validationErr
  172. return err
  173. }
  174. c.log.Info("validating provider via gRPC",
  175. "target", c.conn.Target(),
  176. "connectionState", c.conn.GetState().String(),
  177. "providerRef", providerRef,
  178. "sourceNamespace", sourceNamespace,
  179. "compatibilityStore", compatibilityStore != nil)
  180. // Check connection state before call
  181. state := c.conn.GetState()
  182. c.log.V(1).Info("connection details",
  183. "state", state.String(),
  184. "target", c.conn.Target(),
  185. "authority", c.conn.GetMethodConfig("").WaitForReady)
  186. if state != connectivity.Ready && state != connectivity.Idle {
  187. c.log.Info("connection not in ready/idle state, will attempt to connect",
  188. "state", state.String(),
  189. "target", c.conn.Target())
  190. }
  191. // Create context with timeout
  192. ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
  193. defer cancel()
  194. // Make gRPC call with provider reference
  195. req := &pb.ValidateRequest{
  196. ProviderRef: providerRef,
  197. CompatibilityStore: compatibilityStore,
  198. SourceNamespace: sourceNamespace,
  199. }
  200. c.log.V(1).Info("calling Validate RPC",
  201. "target", c.conn.Target(),
  202. "timeout", defaultTimeout.String())
  203. resp, err := c.client.Validate(ctx, req)
  204. if err != nil {
  205. c.log.Error(err, "Validate RPC failed",
  206. "connectionState", c.conn.GetState().String(),
  207. "target", c.conn.Target(),
  208. "errorType", fmt.Sprintf("%T", err))
  209. err = fmt.Errorf("failed to validate provider via gRPC: %w", err)
  210. return err
  211. }
  212. c.log.V(1).Info("Validate RPC completed",
  213. "valid", resp.Valid,
  214. "error", resp.Error)
  215. // Check for error in response
  216. if !resp.Valid {
  217. if resp.Error != "" {
  218. c.log.Error(fmt.Errorf("provider validation failed"), "validation response",
  219. "message", resp.Error)
  220. err = fmt.Errorf("provider validation failed: %s", resp.Error)
  221. return err
  222. }
  223. c.log.Error(fmt.Errorf("provider validation failed"), "validation response",
  224. "message", "no error message provided")
  225. err = fmt.Errorf("provider validation failed without error message")
  226. return err
  227. }
  228. c.log.Info("provider validation succeeded")
  229. return nil
  230. }
  231. // GetAllSecrets retrieves multiple secrets based on find criteria via gRPC.
  232. func (c *grpcProviderClient) GetAllSecrets(
  233. ctx context.Context,
  234. find esv1.ExternalSecretFind,
  235. providerRef *pb.ProviderReference,
  236. compatibilityStore *pb.CompatibilityStore,
  237. sourceNamespace string,
  238. ) (map[string][]byte, error) {
  239. start := time.Now()
  240. var err error
  241. defer func() {
  242. clientMetrics.ObserveRequest("GetAllSecrets", c.conn.Target(), err, time.Since(start))
  243. }()
  244. if validationErr := validateReadIdentity(providerRef, compatibilityStore); validationErr != nil {
  245. err = validationErr
  246. return nil, err
  247. }
  248. logFields := append([]any{
  249. "tags", find.Tags,
  250. "connectionState", c.conn.GetState().String(),
  251. "providerRef", providerRef,
  252. "sourceNamespace", sourceNamespace,
  253. }, compatibilityStoreLogFields(compatibilityStore)...)
  254. c.log.V(1).Info("getting all secrets via gRPC", logFields...)
  255. // Create context with timeout
  256. ctx, cancel := context.WithTimeout(ctx, getAllSecretsTimeout)
  257. defer cancel()
  258. // Convert find criteria to protobuf
  259. pbFind := &pb.ExternalSecretFind{
  260. Tags: find.Tags,
  261. ConversionStrategy: string(find.ConversionStrategy),
  262. DecodingStrategy: string(find.DecodingStrategy),
  263. }
  264. if find.Path != nil {
  265. pbFind.Path = *find.Path
  266. }
  267. if find.Name != nil {
  268. pbFind.Name = &pb.FindName{
  269. Regexp: find.Name.RegExp,
  270. }
  271. }
  272. // Make gRPC call
  273. req := &pb.GetAllSecretsRequest{
  274. ProviderRef: providerRef,
  275. CompatibilityStore: compatibilityStore,
  276. Find: pbFind,
  277. SourceNamespace: sourceNamespace,
  278. }
  279. c.log.V(1).Info("calling GetAllSecrets RPC",
  280. "target", c.conn.Target(),
  281. "timeout", getAllSecretsTimeout.String())
  282. resp, err := c.client.GetAllSecrets(ctx, req)
  283. if err != nil {
  284. c.log.Error(err, "GetAllSecrets RPC failed",
  285. "connectionState", c.conn.GetState().String(),
  286. "target", c.conn.Target())
  287. err = fmt.Errorf("failed to get all secrets via gRPC: %w", err)
  288. return nil, err
  289. }
  290. c.log.V(1).Info("GetAllSecrets RPC succeeded",
  291. "secretCount", len(resp.Secrets))
  292. return resp.Secrets, nil
  293. }
  294. func validateReadIdentity(providerRef *pb.ProviderReference, compatibilityStore *pb.CompatibilityStore) error {
  295. if providerRef == nil && compatibilityStore == nil {
  296. return fmt.Errorf(readIdentityRequiredErr)
  297. }
  298. return nil
  299. }
  300. func compatibilityStoreLogFields(store *pb.CompatibilityStore) []any {
  301. if store == nil {
  302. return nil
  303. }
  304. return []any{
  305. "compatibilityStoreKind", store.GetStoreKind(),
  306. "compatibilityStoreName", store.GetStoreName(),
  307. "compatibilityStoreNamespace", store.GetStoreNamespace(),
  308. "compatibilityStoreUID", store.GetStoreUid(),
  309. "compatibilityStoreGeneration", store.GetStoreGeneration(),
  310. "compatibilityStoreSpecBytes", len(store.GetStoreSpecJson()),
  311. }
  312. }
  313. // PushSecret writes a secret to the provider via gRPC.
  314. func (c *grpcProviderClient) PushSecret(
  315. ctx context.Context,
  316. secret *corev1.Secret,
  317. pushSecretData *pb.PushSecretData,
  318. providerRef *pb.ProviderReference,
  319. compatibilityStore *pb.CompatibilityStore,
  320. sourceNamespace string,
  321. ) error {
  322. start := time.Now()
  323. var err error
  324. defer func() {
  325. clientMetrics.ObserveRequest("PushSecret", c.conn.Target(), err, time.Since(start))
  326. }()
  327. if validationErr := validateReadIdentity(providerRef, compatibilityStore); validationErr != nil {
  328. err = validationErr
  329. return err
  330. }
  331. logFields := append([]any{
  332. "remoteKey", pushSecretData.RemoteKey,
  333. "property", pushSecretData.Property,
  334. "connectionState", c.conn.GetState().String(),
  335. "providerRef", providerRef,
  336. "sourceNamespace", sourceNamespace,
  337. }, compatibilityStoreLogFields(compatibilityStore)...)
  338. c.log.V(1).Info("pushing secret via gRPC", logFields...)
  339. // Create context with timeout
  340. ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
  341. defer cancel()
  342. // Make gRPC call
  343. req := &pb.PushSecretRequest{
  344. ProviderRef: providerRef,
  345. CompatibilityStore: compatibilityStore,
  346. SecretData: secret.Data,
  347. PushSecretData: pushSecretData,
  348. SourceNamespace: sourceNamespace,
  349. SecretType: string(secret.Type),
  350. SecretLabels: secret.Labels,
  351. SecretAnnotations: secret.Annotations,
  352. }
  353. c.log.V(1).Info("calling PushSecret RPC",
  354. "target", c.conn.Target())
  355. _, err = c.client.PushSecret(ctx, req)
  356. if err != nil {
  357. c.log.Error(err, "PushSecret RPC failed",
  358. "connectionState", c.conn.GetState().String(),
  359. "target", c.conn.Target())
  360. err = fmt.Errorf("failed to push secret via gRPC: %w", err)
  361. return err
  362. }
  363. c.log.V(1).Info("PushSecret RPC succeeded",
  364. "remoteKey", pushSecretData.RemoteKey)
  365. return nil
  366. }
  367. // DeleteSecret deletes a secret from the provider via gRPC.
  368. func (c *grpcProviderClient) DeleteSecret(
  369. ctx context.Context,
  370. remoteRef *pb.PushSecretRemoteRef,
  371. providerRef *pb.ProviderReference,
  372. compatibilityStore *pb.CompatibilityStore,
  373. sourceNamespace string,
  374. ) error {
  375. start := time.Now()
  376. var err error
  377. defer func() {
  378. clientMetrics.ObserveRequest("DeleteSecret", c.conn.Target(), err, time.Since(start))
  379. }()
  380. if validationErr := validateReadIdentity(providerRef, compatibilityStore); validationErr != nil {
  381. err = validationErr
  382. return err
  383. }
  384. logFields := append([]any{
  385. "remoteKey", remoteRef.RemoteKey,
  386. "property", remoteRef.Property,
  387. "connectionState", c.conn.GetState().String(),
  388. "providerRef", providerRef,
  389. "sourceNamespace", sourceNamespace,
  390. }, compatibilityStoreLogFields(compatibilityStore)...)
  391. c.log.V(1).Info("deleting secret via gRPC", logFields...)
  392. // Create context with timeout
  393. ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
  394. defer cancel()
  395. // Make gRPC call
  396. req := &pb.DeleteSecretRequest{
  397. ProviderRef: providerRef,
  398. CompatibilityStore: compatibilityStore,
  399. RemoteRef: remoteRef,
  400. SourceNamespace: sourceNamespace,
  401. }
  402. c.log.V(1).Info("calling DeleteSecret RPC",
  403. "target", c.conn.Target())
  404. _, err = c.client.DeleteSecret(ctx, req)
  405. if err != nil {
  406. c.log.Error(err, "DeleteSecret RPC failed",
  407. "connectionState", c.conn.GetState().String(),
  408. "target", c.conn.Target())
  409. err = fmt.Errorf("failed to delete secret via gRPC: %w", err)
  410. return err
  411. }
  412. c.log.V(1).Info("DeleteSecret RPC succeeded",
  413. "remoteKey", remoteRef.RemoteKey)
  414. return nil
  415. }
  416. // SecretExists checks if a secret exists in the provider via gRPC.
  417. func (c *grpcProviderClient) SecretExists(
  418. ctx context.Context,
  419. remoteRef *pb.PushSecretRemoteRef,
  420. providerRef *pb.ProviderReference,
  421. compatibilityStore *pb.CompatibilityStore,
  422. sourceNamespace string,
  423. ) (bool, error) {
  424. start := time.Now()
  425. var err error
  426. defer func() {
  427. clientMetrics.ObserveRequest("SecretExists", c.conn.Target(), err, time.Since(start))
  428. }()
  429. if validationErr := validateReadIdentity(providerRef, compatibilityStore); validationErr != nil {
  430. err = validationErr
  431. return false, err
  432. }
  433. logFields := append([]any{
  434. "remoteKey", remoteRef.RemoteKey,
  435. "property", remoteRef.Property,
  436. "connectionState", c.conn.GetState().String(),
  437. "providerRef", providerRef,
  438. "sourceNamespace", sourceNamespace,
  439. }, compatibilityStoreLogFields(compatibilityStore)...)
  440. c.log.V(1).Info("checking if secret exists via gRPC", logFields...)
  441. // Create context with timeout
  442. ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
  443. defer cancel()
  444. // Make gRPC call
  445. req := &pb.SecretExistsRequest{
  446. ProviderRef: providerRef,
  447. CompatibilityStore: compatibilityStore,
  448. RemoteRef: remoteRef,
  449. SourceNamespace: sourceNamespace,
  450. }
  451. c.log.V(1).Info("calling SecretExists RPC",
  452. "target", c.conn.Target())
  453. resp, err := c.client.SecretExists(ctx, req)
  454. if err != nil {
  455. c.log.Error(err, "SecretExists RPC failed",
  456. "connectionState", c.conn.GetState().String(),
  457. "target", c.conn.Target())
  458. err = fmt.Errorf("failed to check if secret exists via gRPC: %w", err)
  459. return false, err
  460. }
  461. c.log.V(1).Info("SecretExists RPC succeeded",
  462. "remoteKey", remoteRef.RemoteKey,
  463. "exists", resp.Exists)
  464. return resp.Exists, nil
  465. }
  466. // Capabilities retrieves the capabilities of the provider via gRPC.
  467. func (c *grpcProviderClient) Capabilities(ctx context.Context, providerRef *pb.ProviderReference, sourceNamespace string) (pb.SecretStoreCapabilities, error) {
  468. start := time.Now()
  469. var err error
  470. defer func() {
  471. clientMetrics.ObserveRequest("Capabilities", c.conn.Target(), err, time.Since(start))
  472. }()
  473. c.log.V(1).Info("getting provider capabilities via gRPC",
  474. "target", c.conn.Target(),
  475. "connectionState", c.conn.GetState().String(),
  476. "providerRef", providerRef,
  477. "sourceNamespace", sourceNamespace)
  478. // Create context with timeout
  479. ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
  480. defer cancel()
  481. // Make gRPC call with provider reference
  482. req := &pb.CapabilitiesRequest{
  483. ProviderRef: providerRef,
  484. SourceNamespace: sourceNamespace,
  485. }
  486. c.log.V(1).Info("calling Capabilities RPC",
  487. "target", c.conn.Target())
  488. resp, err := c.client.Capabilities(ctx, req)
  489. if err != nil {
  490. c.log.Error(err, "Capabilities RPC failed",
  491. "connectionState", c.conn.GetState().String(),
  492. "target", c.conn.Target())
  493. err = fmt.Errorf("failed to get capabilities via gRPC: %w", err)
  494. return pb.SecretStoreCapabilities_READ_ONLY, err
  495. }
  496. c.log.V(1).Info("Capabilities RPC succeeded",
  497. "capabilities", resp.Capabilities)
  498. return resp.Capabilities, nil
  499. }
  500. // Close closes the gRPC connection.
  501. func (c *grpcProviderClient) Close(_ context.Context) error {
  502. if c.conn != nil {
  503. c.log.V(1).Info("closing gRPC connection",
  504. "target", c.conn.Target(),
  505. "state", c.conn.GetState().String())
  506. return c.conn.Close()
  507. }
  508. c.log.V(1).Info("no connection to close")
  509. return nil
  510. }