瀏覽代碼

Fix grpc error handling and provider toolchains

Moritz Johner 2 月之前
父節點
當前提交
d12714b662

+ 1 - 2
providers/v2/aws/Dockerfile

@@ -1,6 +1,6 @@
 # Multi-stage build for AWS Provider Provider
 # Generated by providers/v2/hack/generate-provider-main.go. DO NOT EDIT.
-FROM golang:1.25-alpine AS builder
+FROM golang:1.26.2-alpine AS builder
 
 WORKDIR /workspace
 COPY apis/ apis/
@@ -23,4 +23,3 @@ COPY --from=builder /workspace/providers/v2/aws/provider-aws .
 USER 65532:65532
 
 ENTRYPOINT ["/provider-aws"]
-

+ 3 - 2
providers/v2/common/grpc/metrics.go

@@ -17,6 +17,7 @@ limitations under the License.
 package grpc
 
 import (
+	"errors"
 	"fmt"
 	"strconv"
 	"time"
@@ -269,7 +270,8 @@ func RegisterMetrics(registry prometheus.Registerer) error {
 	for _, collector := range collectors {
 		if err := registry.Register(collector); err != nil {
 			// Check if already registered
-			if _, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			var alreadyRegistered prometheus.AlreadyRegisteredError
+			if errors.As(err, &alreadyRegistered) {
 				continue
 			}
 			return fmt.Errorf("failed to register metric: %w", err)
@@ -288,4 +290,3 @@ func GetPoolMetrics() PoolMetrics {
 func GetClientMetrics() ClientMetrics {
 	return clientMetrics
 }
-

+ 42 - 0
providers/v2/common/grpc/metrics_test.go

@@ -0,0 +1,42 @@
+/*
+Copyright © The ESO Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    https://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package grpc
+
+import (
+	"fmt"
+	"testing"
+
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+type wrappedAlreadyRegisteredRegisterer struct{}
+
+func (wrappedAlreadyRegisteredRegisterer) Register(prometheus.Collector) error {
+	return fmt.Errorf("wrapped: %w", prometheus.AlreadyRegisteredError{})
+}
+
+func (wrappedAlreadyRegisteredRegisterer) MustRegister(...prometheus.Collector) {}
+
+func (wrappedAlreadyRegisteredRegisterer) Unregister(prometheus.Collector) bool {
+	return false
+}
+
+func TestRegisterMetrics_AllowsWrappedAlreadyRegisteredError(t *testing.T) {
+	if err := RegisterMetrics(wrappedAlreadyRegisteredRegisterer{}); err != nil {
+		t.Fatalf("expected wrapped already registered error to be ignored, got %v", err)
+	}
+}

+ 14 - 5
providers/v2/common/grpc/pool.go

@@ -133,7 +133,7 @@ func (p *ConnectionPool) Get(ctx context.Context, address string, tlsConfig *TLS
 		p.log.Info("cached connection invalid, cleaning up",
 			"address", address,
 			"state", pooled.conn.GetState().String())
-		pooled.conn.Close()
+		p.closeConn(pooled.conn, "failed to close invalid cached connection", "address", address)
 		delete(p.connections, key)
 	}
 
@@ -206,7 +206,7 @@ func (p *ConnectionPool) Close() error {
 	for _, pooled := range p.connections {
 		pooled.mu.Lock()
 		if pooled.conn != nil {
-			pooled.conn.Close()
+			p.closeConn(pooled.conn, "failed to close pooled connection during pool shutdown")
 		}
 		pooled.mu.Unlock()
 	}
@@ -251,11 +251,11 @@ func (p *ConnectionPool) cleanupIdleConnections() {
 		tooOld := now.Sub(pooled.created) > p.maxLifetime
 
 		if idleTooLong {
-			pooled.conn.Close()
+			p.closeConn(pooled.conn, "failed to close idle pooled connection", "connectionKey", key)
 			toRemove = append(toRemove, key)
 			evictions[key] = "idle_timeout"
 		} else if tooOld {
-			pooled.conn.Close()
+			p.closeConn(pooled.conn, "failed to close expired pooled connection", "connectionKey", key)
 			toRemove = append(toRemove, key)
 			evictions[key] = "max_lifetime"
 		}
@@ -283,7 +283,7 @@ func (p *ConnectionPool) checkConnectionHealth() {
 		// Check connection state
 		state := pooled.conn.GetState()
 		if state == connectivity.TransientFailure || state == connectivity.Shutdown {
-			pooled.conn.Close()
+			p.closeConn(pooled.conn, "failed to close unhealthy pooled connection", "connectionKey", key, "state", state.String())
 			toRemove = append(toRemove, key)
 		}
 
@@ -313,6 +313,15 @@ func (p *ConnectionPool) isConnectionValid(pooled *pooledConnection) bool {
 	return true
 }
 
+func (p *ConnectionPool) closeConn(conn *grpc.ClientConn, msg string, keysAndValues ...any) {
+	if conn == nil {
+		return
+	}
+	if err := conn.Close(); err != nil {
+		p.log.Error(err, msg, keysAndValues...)
+	}
+}
+
 // connectionKey generates a unique key for caching connections.
 func (p *ConnectionPool) connectionKey(address string, tlsConfig *TLSConfig) string {
 	if tlsConfig != nil {

+ 19 - 5
providers/v2/common/grpc/retry.go

@@ -18,6 +18,7 @@ package grpc
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"math"
 	"math/rand"
@@ -169,6 +170,9 @@ func (cb *CircuitBreaker) onSuccess() {
 	case StateClosed:
 		cb.failures = 0
 
+	case StateOpen:
+		// Requests should not reach onSuccess while open, keep state unchanged.
+
 	case StateHalfOpen:
 		// Success in half-open state means we can close the circuit
 		cb.state = StateClosed
@@ -188,6 +192,9 @@ func (cb *CircuitBreaker) onFailure() {
 			cb.state = StateOpen
 		}
 
+	case StateOpen:
+		// Keep tracking failure timing while the circuit remains open.
+
 	case StateHalfOpen:
 		// Failure in half-open state means we go back to open
 		cb.state = StateOpen
@@ -255,22 +262,29 @@ func isRetryable(err error) bool {
 			codes.Aborted:
 			return true
 
-		case codes.InvalidArgument,
+		case codes.OK,
+			codes.Canceled,
+			codes.InvalidArgument,
 			codes.NotFound,
 			codes.AlreadyExists,
 			codes.PermissionDenied,
-			codes.Unauthenticated:
+			codes.Unauthenticated,
+			codes.FailedPrecondition,
+			codes.OutOfRange,
+			codes.Unimplemented:
 			return false
 
-		default:
-			// For unknown codes, retry to be safe
+		case codes.Unknown,
+			codes.Internal,
+			codes.DataLoss:
+			// Retry transient or ambiguous failures.
 			return true
 		}
 	}
 
 	// For non-gRPC errors, retry network-related issues
 	// Context errors should not be retried
-	if err == context.Canceled || err == context.DeadlineExceeded {
+	if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
 		return false
 	}
 

+ 11 - 0
providers/v2/common/grpc/retry_test.go

@@ -18,6 +18,7 @@ package grpc
 
 import (
 	"context"
+	"fmt"
 	"testing"
 	"time"
 
@@ -185,6 +186,16 @@ func TestIsRetryable(t *testing.T) {
 			err:       context.Canceled,
 			retryable: false,
 		},
+		{
+			name:      "Wrapped context canceled is not retryable",
+			err:       fmt.Errorf("wrapped: %w", context.Canceled),
+			retryable: false,
+		},
+		{
+			name:      "Wrapped context deadline exceeded is not retryable",
+			err:       fmt.Errorf("wrapped: %w", context.DeadlineExceeded),
+			retryable: false,
+		},
 	}
 
 	for _, tt := range tests {

+ 1 - 2
providers/v2/fake/Dockerfile

@@ -1,6 +1,6 @@
 # Multi-stage build for Fake Provider Provider
 # Generated by providers/v2/hack/generate-provider-main.go. DO NOT EDIT.
-FROM golang:1.25-alpine AS builder
+FROM golang:1.26.2-alpine AS builder
 
 WORKDIR /workspace
 COPY apis/ apis/
@@ -23,4 +23,3 @@ COPY --from=builder /workspace/providers/v2/fake/provider-fake .
 USER 65532:65532
 
 ENTRYPOINT ["/provider-fake"]
-

+ 1 - 2
providers/v2/hack/templates/Dockerfile.tmpl

@@ -1,6 +1,6 @@
 # Multi-stage build for {{.Provider.Provider.DisplayName}} Provider
 # Generated by providers/v2/hack/generate-provider-main.go. DO NOT EDIT.
-FROM golang:1.25-alpine AS builder
+FROM golang:1.26.2-alpine AS builder
 
 WORKDIR /workspace
 COPY apis/ apis/
@@ -23,4 +23,3 @@ COPY --from=builder /workspace/providers/v2/{{.Provider.Provider.Name}}/provider
 USER 65532:65532
 
 ENTRYPOINT ["/provider-{{.Provider.Provider.Name}}"]
-