pool.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. /*
  2. Copyright © The ESO Authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. https://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package grpc
  14. import (
  15. "context"
  16. "crypto/sha256"
  17. "encoding/hex"
  18. "fmt"
  19. "sync"
  20. "time"
  21. "github.com/go-logr/logr"
  22. "google.golang.org/grpc"
  23. "google.golang.org/grpc/connectivity"
  24. ctrl "sigs.k8s.io/controller-runtime"
  25. v2 "github.com/external-secrets/external-secrets/providers/v2/common"
  26. )
  27. // ConnectionPool manages a pool of gRPC connections to providers.
  28. // It handles connection reuse, health checking, and graceful shutdown.
  29. type ConnectionPool struct {
  30. mu sync.RWMutex
  31. connections map[string]*pooledConnection
  32. maxIdle time.Duration
  33. maxLifetime time.Duration
  34. healthCheck time.Duration
  35. log logr.Logger
  36. }
  37. // pooledConnection wraps a gRPC connection with metadata for pooling.
  38. type pooledConnection struct {
  39. conn *grpc.ClientConn
  40. client v2.Provider
  41. created time.Time
  42. lastUsed time.Time
  43. references int32 // Number of active users
  44. mu sync.Mutex
  45. }
  46. // PoolConfig configures the connection pool.
  47. type PoolConfig struct {
  48. // MaxIdleTime is how long a connection can be idle before being closed
  49. MaxIdleTime time.Duration
  50. // MaxLifetime is the maximum lifetime of a connection
  51. MaxLifetime time.Duration
  52. // HealthCheckInterval is how often to check connection health
  53. HealthCheckInterval time.Duration
  54. }
  55. // DefaultPoolConfig returns sensible defaults for connection pooling.
  56. func DefaultPoolConfig() PoolConfig {
  57. return PoolConfig{
  58. MaxIdleTime: 5 * time.Minute,
  59. MaxLifetime: 30 * time.Minute,
  60. HealthCheckInterval: 30 * time.Second,
  61. }
  62. }
  63. // NewConnectionPool creates a new connection pool with the given configuration.
  64. func NewConnectionPool(cfg PoolConfig) *ConnectionPool {
  65. pool := &ConnectionPool{
  66. connections: make(map[string]*pooledConnection),
  67. maxIdle: cfg.MaxIdleTime,
  68. maxLifetime: cfg.MaxLifetime,
  69. healthCheck: cfg.HealthCheckInterval,
  70. log: ctrl.Log.WithName("grpc-pool"),
  71. }
  72. pool.log.Info("connection pool initialized",
  73. "maxIdleTime", cfg.MaxIdleTime.String(),
  74. "maxLifetime", cfg.MaxLifetime.String(),
  75. "healthCheckInterval", cfg.HealthCheckInterval.String())
  76. // Start background goroutine for cleanup and health checks
  77. go pool.maintenance()
  78. return pool
  79. }
  80. // Get retrieves or creates a connection to the specified provider address.
  81. // The caller must call Release() when done with the connection.
  82. func (p *ConnectionPool) Get(_ context.Context, address string, tlsConfig *TLSConfig) (v2.Provider, error) {
  83. p.mu.Lock()
  84. defer p.mu.Unlock()
  85. key := p.connectionKey(address, tlsConfig)
  86. p.log.V(1).Info("getting connection from pool",
  87. "address", address,
  88. "key", key)
  89. // Check if we have a valid cached connection
  90. if pooled, exists := p.connections[key]; exists {
  91. pooled.mu.Lock()
  92. defer pooled.mu.Unlock()
  93. p.log.V(1).Info("found cached connection",
  94. "address", address,
  95. "state", pooled.conn.GetState().String(),
  96. "references", pooled.references,
  97. "age", time.Since(pooled.created).String(),
  98. "idleTime", time.Since(pooled.lastUsed).String())
  99. // Check if connection is still valid
  100. if p.isConnectionValid(pooled) {
  101. pooled.references++
  102. pooled.lastUsed = time.Now()
  103. p.log.Info("reusing cached connection",
  104. "address", address,
  105. "references", pooled.references)
  106. // Record cache hit
  107. poolMetrics.RecordHit(address, tlsConfig != nil)
  108. return pooled.client, nil
  109. }
  110. // Connection is invalid, clean it up
  111. p.log.Info("cached connection invalid, cleaning up",
  112. "address", address,
  113. "state", pooled.conn.GetState().String())
  114. p.closeConn(pooled.conn, "failed to close invalid cached connection", "address", address)
  115. delete(p.connections, key)
  116. }
  117. // Create new connection
  118. p.log.Info("creating new connection",
  119. "address", address,
  120. "tlsEnabled", tlsConfig != nil)
  121. // Record cache miss
  122. poolMetrics.RecordMiss(address, tlsConfig != nil)
  123. providerClient, err := NewClient(address, tlsConfig)
  124. if err != nil {
  125. p.log.Error(err, "failed to create new connection", "address", address)
  126. // Record connection error
  127. poolMetrics.RecordConnectionError(address, tlsConfig != nil)
  128. return nil, fmt.Errorf("failed to create new connection: %w", err)
  129. }
  130. // Extract the underlying connection for pooling
  131. grpcClient, ok := providerClient.(*grpcProviderClient)
  132. if !ok {
  133. return nil, fmt.Errorf("unexpected client type")
  134. }
  135. pooled := &pooledConnection{
  136. conn: grpcClient.conn,
  137. client: providerClient,
  138. created: time.Now(),
  139. lastUsed: time.Now(),
  140. references: 1,
  141. }
  142. p.connections[key] = pooled
  143. p.log.Info("new connection added to pool",
  144. "address", address,
  145. "state", grpcClient.conn.GetState().String(),
  146. "target", grpcClient.conn.Target())
  147. return providerClient, nil
  148. }
  149. // Release marks a connection as no longer in use.
  150. // This should be called in a defer after Get().
  151. func (p *ConnectionPool) Release(address string, tlsConfig *TLSConfig) {
  152. p.mu.Lock()
  153. defer p.mu.Unlock()
  154. key := p.connectionKey(address, tlsConfig)
  155. if pooled, exists := p.connections[key]; exists {
  156. pooled.mu.Lock()
  157. defer pooled.mu.Unlock()
  158. if pooled.references > 0 {
  159. pooled.references--
  160. p.log.V(1).Info("released connection",
  161. "address", address,
  162. "remainingReferences", pooled.references)
  163. }
  164. }
  165. }
  166. // Close shuts down the connection pool and closes all connections.
  167. func (p *ConnectionPool) Close() error {
  168. p.mu.Lock()
  169. defer p.mu.Unlock()
  170. for _, pooled := range p.connections {
  171. pooled.mu.Lock()
  172. if pooled.conn != nil {
  173. p.closeConn(pooled.conn, "failed to close pooled connection during pool shutdown")
  174. }
  175. pooled.mu.Unlock()
  176. }
  177. p.connections = make(map[string]*pooledConnection)
  178. return nil
  179. }
  180. // maintenance runs periodic cleanup and health checks.
  181. func (p *ConnectionPool) maintenance() {
  182. ticker := time.NewTicker(p.healthCheck)
  183. defer ticker.Stop()
  184. for range ticker.C {
  185. p.cleanupIdleConnections()
  186. p.checkConnectionHealth()
  187. p.updatePoolMetrics()
  188. }
  189. }
  190. // cleanupIdleConnections removes connections that have been idle too long.
  191. func (p *ConnectionPool) cleanupIdleConnections() {
  192. p.mu.Lock()
  193. defer p.mu.Unlock()
  194. now := time.Now()
  195. toRemove := make([]string, 0)
  196. evictions := make(map[string]string) // key -> reason
  197. for key, pooled := range p.connections {
  198. pooled.mu.Lock()
  199. // Skip connections that are in use
  200. if pooled.references > 0 {
  201. pooled.mu.Unlock()
  202. continue
  203. }
  204. // Check if connection is too old or idle too long
  205. idleTooLong := now.Sub(pooled.lastUsed) > p.maxIdle
  206. tooOld := now.Sub(pooled.created) > p.maxLifetime
  207. if idleTooLong {
  208. p.closeConn(pooled.conn, "failed to close idle pooled connection", "connectionKey", key)
  209. toRemove = append(toRemove, key)
  210. evictions[key] = "idle_timeout"
  211. } else if tooOld {
  212. p.closeConn(pooled.conn, "failed to close expired pooled connection", "connectionKey", key)
  213. toRemove = append(toRemove, key)
  214. evictions[key] = "max_lifetime"
  215. }
  216. pooled.mu.Unlock()
  217. }
  218. for _, key := range toRemove {
  219. address, tlsEnabled := p.parseConnectionKey(key)
  220. poolMetrics.RecordEviction(address, tlsEnabled, evictions[key])
  221. delete(p.connections, key)
  222. }
  223. }
  224. // checkConnectionHealth verifies that pooled connections are still healthy.
  225. func (p *ConnectionPool) checkConnectionHealth() {
  226. p.mu.Lock()
  227. defer p.mu.Unlock()
  228. toRemove := make([]string, 0)
  229. for key, pooled := range p.connections {
  230. pooled.mu.Lock()
  231. // Check connection state
  232. state := pooled.conn.GetState()
  233. if state == connectivity.TransientFailure || state == connectivity.Shutdown {
  234. p.closeConn(pooled.conn, "failed to close unhealthy pooled connection", "connectionKey", key, "state", state.String())
  235. toRemove = append(toRemove, key)
  236. }
  237. pooled.mu.Unlock()
  238. }
  239. for _, key := range toRemove {
  240. address, tlsEnabled := p.parseConnectionKey(key)
  241. poolMetrics.RecordEviction(address, tlsEnabled, "health_check")
  242. delete(p.connections, key)
  243. }
  244. }
  245. // isConnectionValid checks if a pooled connection is still usable.
  246. func (p *ConnectionPool) isConnectionValid(pooled *pooledConnection) bool {
  247. // Check age
  248. if time.Since(pooled.created) > p.maxLifetime {
  249. return false
  250. }
  251. // Check connection state
  252. state := pooled.conn.GetState()
  253. if state == connectivity.Shutdown || state == connectivity.TransientFailure {
  254. return false
  255. }
  256. return true
  257. }
  258. func (p *ConnectionPool) closeConn(conn *grpc.ClientConn, msg string, keysAndValues ...any) {
  259. if conn == nil {
  260. return
  261. }
  262. if err := conn.Close(); err != nil {
  263. p.log.Error(err, msg, keysAndValues...)
  264. }
  265. }
  266. // connectionKey generates a unique key for caching connections.
  267. func (p *ConnectionPool) connectionKey(address string, tlsConfig *TLSConfig) string {
  268. if tlsConfig != nil {
  269. sum := sha256.Sum256(append(append(append([]byte{}, tlsConfig.CACert...), tlsConfig.ClientCert...), tlsConfig.ClientKey...))
  270. return fmt.Sprintf("%s|%s-tls", address, hex.EncodeToString(sum[:8]))
  271. }
  272. return fmt.Sprintf("%s-insecure", address)
  273. }
  274. // parseConnectionKey extracts address and TLS status from a connection key.
  275. func (p *ConnectionPool) parseConnectionKey(key string) (address string, tlsEnabled bool) {
  276. if len(key) > 4 && key[len(key)-4:] == "-tls" {
  277. trimmed := key[:len(key)-4]
  278. for i := 0; i < len(trimmed); i++ {
  279. if trimmed[i] == '|' {
  280. return trimmed[:i], true
  281. }
  282. }
  283. return trimmed, true
  284. }
  285. if len(key) > 9 && key[len(key)-9:] == "-insecure" {
  286. return key[:len(key)-9], false
  287. }
  288. return key, false
  289. }
  290. // updatePoolMetrics updates pool state metrics.
  291. func (p *ConnectionPool) updatePoolMetrics() {
  292. p.mu.RLock()
  293. defer p.mu.RUnlock()
  294. // Track stats per address/TLS combination
  295. stats := make(map[string]struct {
  296. active int
  297. idle int
  298. total int
  299. })
  300. now := time.Now()
  301. for key, pooled := range p.connections {
  302. pooled.mu.Lock()
  303. address, tlsEnabled := p.parseConnectionKey(key)
  304. statKey := key
  305. s := stats[statKey]
  306. s.total++
  307. if pooled.references > 0 {
  308. s.active++
  309. } else {
  310. s.idle++
  311. }
  312. stats[statKey] = s
  313. // Record connection age and idle time
  314. poolMetrics.RecordConnectionAge(address, tlsEnabled, now.Sub(pooled.created))
  315. if pooled.references == 0 {
  316. poolMetrics.RecordConnectionIdle(address, tlsEnabled, now.Sub(pooled.lastUsed))
  317. }
  318. pooled.mu.Unlock()
  319. }
  320. // Update gauges
  321. for key, s := range stats {
  322. address, tlsEnabled := p.parseConnectionKey(key)
  323. poolMetrics.UpdatePoolState(address, tlsEnabled, s.active, s.idle, s.total)
  324. }
  325. }