pool.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. /*
  2. Licensed under the Apache License, Version 2.0 (the "License");
  3. you may not use this file except in compliance with the License.
  4. You may obtain a copy of the License at
  5. http://www.apache.org/licenses/LICENSE-2.0
  6. Unless required by applicable law or agreed to in writing, software
  7. distributed under the License is distributed on an "AS IS" BASIS,
  8. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  9. See the License for the specific language governing permissions and
  10. limitations under the License.
  11. */
  12. package grpc
  13. import (
  14. "context"
  15. "crypto/sha256"
  16. "encoding/hex"
  17. "fmt"
  18. "sync"
  19. "time"
  20. "github.com/go-logr/logr"
  21. "google.golang.org/grpc"
  22. "google.golang.org/grpc/connectivity"
  23. ctrl "sigs.k8s.io/controller-runtime"
  24. v2 "github.com/external-secrets/external-secrets/providers/v2/common"
  25. )
  26. // ConnectionPool manages a pool of gRPC connections to providers.
  27. // It handles connection reuse, health checking, and graceful shutdown.
  28. type ConnectionPool struct {
  29. mu sync.RWMutex
  30. connections map[string]*pooledConnection
  31. maxIdle time.Duration
  32. maxLifetime time.Duration
  33. healthCheck time.Duration
  34. log logr.Logger
  35. }
  36. // pooledConnection wraps a gRPC connection with metadata for pooling.
  37. type pooledConnection struct {
  38. conn *grpc.ClientConn
  39. client v2.Provider
  40. created time.Time
  41. lastUsed time.Time
  42. references int32 // Number of active users
  43. mu sync.Mutex
  44. }
  45. // PoolConfig configures the connection pool.
  46. type PoolConfig struct {
  47. // MaxIdleTime is how long a connection can be idle before being closed
  48. MaxIdleTime time.Duration
  49. // MaxLifetime is the maximum lifetime of a connection
  50. MaxLifetime time.Duration
  51. // HealthCheckInterval is how often to check connection health
  52. HealthCheckInterval time.Duration
  53. }
  54. // DefaultPoolConfig returns sensible defaults for connection pooling.
  55. func DefaultPoolConfig() PoolConfig {
  56. return PoolConfig{
  57. MaxIdleTime: 5 * time.Minute,
  58. MaxLifetime: 30 * time.Minute,
  59. HealthCheckInterval: 30 * time.Second,
  60. }
  61. }
  62. // NewConnectionPool creates a new connection pool with the given configuration.
  63. func NewConnectionPool(cfg PoolConfig) *ConnectionPool {
  64. pool := &ConnectionPool{
  65. connections: make(map[string]*pooledConnection),
  66. maxIdle: cfg.MaxIdleTime,
  67. maxLifetime: cfg.MaxLifetime,
  68. healthCheck: cfg.HealthCheckInterval,
  69. log: ctrl.Log.WithName("grpc-pool"),
  70. }
  71. pool.log.Info("connection pool initialized",
  72. "maxIdleTime", cfg.MaxIdleTime.String(),
  73. "maxLifetime", cfg.MaxLifetime.String(),
  74. "healthCheckInterval", cfg.HealthCheckInterval.String())
  75. // Start background goroutine for cleanup and health checks
  76. go pool.maintenance()
  77. return pool
  78. }
  79. // Get retrieves or creates a connection to the specified provider address.
  80. // The caller must call Release() when done with the connection.
  81. func (p *ConnectionPool) Get(ctx context.Context, address string, tlsConfig *TLSConfig) (v2.Provider, error) {
  82. p.mu.Lock()
  83. defer p.mu.Unlock()
  84. key := p.connectionKey(address, tlsConfig)
  85. p.log.V(1).Info("getting connection from pool",
  86. "address", address,
  87. "key", key)
  88. // Check if we have a valid cached connection
  89. if pooled, exists := p.connections[key]; exists {
  90. pooled.mu.Lock()
  91. defer pooled.mu.Unlock()
  92. p.log.V(1).Info("found cached connection",
  93. "address", address,
  94. "state", pooled.conn.GetState().String(),
  95. "references", pooled.references,
  96. "age", time.Since(pooled.created).String(),
  97. "idleTime", time.Since(pooled.lastUsed).String())
  98. // Check if connection is still valid
  99. if p.isConnectionValid(pooled) {
  100. pooled.references++
  101. pooled.lastUsed = time.Now()
  102. p.log.Info("reusing cached connection",
  103. "address", address,
  104. "references", pooled.references)
  105. // Record cache hit
  106. poolMetrics.RecordHit(address, tlsConfig != nil)
  107. return pooled.client, nil
  108. }
  109. // Connection is invalid, clean it up
  110. p.log.Info("cached connection invalid, cleaning up",
  111. "address", address,
  112. "state", pooled.conn.GetState().String())
  113. pooled.conn.Close()
  114. delete(p.connections, key)
  115. }
  116. // Create new connection
  117. p.log.Info("creating new connection",
  118. "address", address,
  119. "tlsEnabled", tlsConfig != nil)
  120. // Record cache miss
  121. poolMetrics.RecordMiss(address, tlsConfig != nil)
  122. providerClient, err := NewClient(address, tlsConfig)
  123. if err != nil {
  124. p.log.Error(err, "failed to create new connection", "address", address)
  125. // Record connection error
  126. poolMetrics.RecordConnectionError(address, tlsConfig != nil)
  127. return nil, fmt.Errorf("failed to create new connection: %w", err)
  128. }
  129. // Extract the underlying connection for pooling
  130. grpcClient, ok := providerClient.(*grpcProviderClient)
  131. if !ok {
  132. return nil, fmt.Errorf("unexpected client type")
  133. }
  134. pooled := &pooledConnection{
  135. conn: grpcClient.conn,
  136. client: providerClient,
  137. created: time.Now(),
  138. lastUsed: time.Now(),
  139. references: 1,
  140. }
  141. p.connections[key] = pooled
  142. p.log.Info("new connection added to pool",
  143. "address", address,
  144. "state", grpcClient.conn.GetState().String(),
  145. "target", grpcClient.conn.Target())
  146. return providerClient, nil
  147. }
  148. // Release marks a connection as no longer in use.
  149. // This should be called in a defer after Get().
  150. func (p *ConnectionPool) Release(address string, tlsConfig *TLSConfig) {
  151. p.mu.Lock()
  152. defer p.mu.Unlock()
  153. key := p.connectionKey(address, tlsConfig)
  154. if pooled, exists := p.connections[key]; exists {
  155. pooled.mu.Lock()
  156. defer pooled.mu.Unlock()
  157. if pooled.references > 0 {
  158. pooled.references--
  159. p.log.V(1).Info("released connection",
  160. "address", address,
  161. "remainingReferences", pooled.references)
  162. }
  163. }
  164. }
  165. // Close shuts down the connection pool and closes all connections.
  166. func (p *ConnectionPool) Close() error {
  167. p.mu.Lock()
  168. defer p.mu.Unlock()
  169. for _, pooled := range p.connections {
  170. pooled.mu.Lock()
  171. if pooled.conn != nil {
  172. pooled.conn.Close()
  173. }
  174. pooled.mu.Unlock()
  175. }
  176. p.connections = make(map[string]*pooledConnection)
  177. return nil
  178. }
  179. // maintenance runs periodic cleanup and health checks.
  180. func (p *ConnectionPool) maintenance() {
  181. ticker := time.NewTicker(p.healthCheck)
  182. defer ticker.Stop()
  183. for range ticker.C {
  184. p.cleanupIdleConnections()
  185. p.checkConnectionHealth()
  186. p.updatePoolMetrics()
  187. }
  188. }
  189. // cleanupIdleConnections removes connections that have been idle too long.
  190. func (p *ConnectionPool) cleanupIdleConnections() {
  191. p.mu.Lock()
  192. defer p.mu.Unlock()
  193. now := time.Now()
  194. toRemove := make([]string, 0)
  195. evictions := make(map[string]string) // key -> reason
  196. for key, pooled := range p.connections {
  197. pooled.mu.Lock()
  198. // Skip connections that are in use
  199. if pooled.references > 0 {
  200. pooled.mu.Unlock()
  201. continue
  202. }
  203. // Check if connection is too old or idle too long
  204. idleTooLong := now.Sub(pooled.lastUsed) > p.maxIdle
  205. tooOld := now.Sub(pooled.created) > p.maxLifetime
  206. if idleTooLong {
  207. pooled.conn.Close()
  208. toRemove = append(toRemove, key)
  209. evictions[key] = "idle_timeout"
  210. } else if tooOld {
  211. pooled.conn.Close()
  212. toRemove = append(toRemove, key)
  213. evictions[key] = "max_lifetime"
  214. }
  215. pooled.mu.Unlock()
  216. }
  217. for _, key := range toRemove {
  218. address, tlsEnabled := p.parseConnectionKey(key)
  219. poolMetrics.RecordEviction(address, tlsEnabled, evictions[key])
  220. delete(p.connections, key)
  221. }
  222. }
  223. // checkConnectionHealth verifies that pooled connections are still healthy.
  224. func (p *ConnectionPool) checkConnectionHealth() {
  225. p.mu.Lock()
  226. defer p.mu.Unlock()
  227. toRemove := make([]string, 0)
  228. for key, pooled := range p.connections {
  229. pooled.mu.Lock()
  230. // Check connection state
  231. state := pooled.conn.GetState()
  232. if state == connectivity.TransientFailure || state == connectivity.Shutdown {
  233. pooled.conn.Close()
  234. toRemove = append(toRemove, key)
  235. }
  236. pooled.mu.Unlock()
  237. }
  238. for _, key := range toRemove {
  239. address, tlsEnabled := p.parseConnectionKey(key)
  240. poolMetrics.RecordEviction(address, tlsEnabled, "health_check")
  241. delete(p.connections, key)
  242. }
  243. }
  244. // isConnectionValid checks if a pooled connection is still usable.
  245. func (p *ConnectionPool) isConnectionValid(pooled *pooledConnection) bool {
  246. // Check age
  247. if time.Since(pooled.created) > p.maxLifetime {
  248. return false
  249. }
  250. // Check connection state
  251. state := pooled.conn.GetState()
  252. if state == connectivity.Shutdown || state == connectivity.TransientFailure {
  253. return false
  254. }
  255. return true
  256. }
  257. // connectionKey generates a unique key for caching connections.
  258. func (p *ConnectionPool) connectionKey(address string, tlsConfig *TLSConfig) string {
  259. if tlsConfig != nil {
  260. sum := sha256.Sum256(append(append(append([]byte{}, tlsConfig.CACert...), tlsConfig.ClientCert...), tlsConfig.ClientKey...))
  261. return fmt.Sprintf("%s|%s-tls", address, hex.EncodeToString(sum[:8]))
  262. }
  263. return fmt.Sprintf("%s-insecure", address)
  264. }
  265. // parseConnectionKey extracts address and TLS status from a connection key.
  266. func (p *ConnectionPool) parseConnectionKey(key string) (address string, tlsEnabled bool) {
  267. if len(key) > 4 && key[len(key)-4:] == "-tls" {
  268. trimmed := key[:len(key)-4]
  269. for i := 0; i < len(trimmed); i++ {
  270. if trimmed[i] == '|' {
  271. return trimmed[:i], true
  272. }
  273. }
  274. return trimmed, true
  275. }
  276. if len(key) > 9 && key[len(key)-9:] == "-insecure" {
  277. return key[:len(key)-9], false
  278. }
  279. return key, false
  280. }
  281. // updatePoolMetrics updates pool state metrics.
  282. func (p *ConnectionPool) updatePoolMetrics() {
  283. p.mu.RLock()
  284. defer p.mu.RUnlock()
  285. // Track stats per address/TLS combination
  286. stats := make(map[string]struct {
  287. active int
  288. idle int
  289. total int
  290. })
  291. now := time.Now()
  292. for key, pooled := range p.connections {
  293. pooled.mu.Lock()
  294. address, tlsEnabled := p.parseConnectionKey(key)
  295. statKey := key
  296. s := stats[statKey]
  297. s.total++
  298. if pooled.references > 0 {
  299. s.active++
  300. } else {
  301. s.idle++
  302. }
  303. stats[statKey] = s
  304. // Record connection age and idle time
  305. poolMetrics.RecordConnectionAge(address, tlsEnabled, now.Sub(pooled.created))
  306. if pooled.references == 0 {
  307. poolMetrics.RecordConnectionIdle(address, tlsEnabled, now.Sub(pooled.lastUsed))
  308. }
  309. pooled.mu.Unlock()
  310. }
  311. // Update gauges
  312. for key, s := range stats {
  313. address, tlsEnabled := p.parseConnectionKey(key)
  314. poolMetrics.UpdatePoolState(address, tlsEnabled, s.active, s.idle, s.total)
  315. }
  316. }