| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- /*
- Copyright © The ESO Authors
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- https://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package v2
- import (
- "bufio"
- "context"
- "fmt"
- "io"
- "net/http"
- "regexp"
- "sort"
- "strconv"
- "strings"
- "time"
- corev1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/portforward"
- "k8s.io/client-go/transport/spdy"
- . "github.com/onsi/ginkgo/v2"
- . "github.com/onsi/gomega"
- )
- type MetricSample struct {
- Name string
- Labels map[string]string
- Value float64
- }
- type MetricsMap map[string][]MetricSample
- func ScrapeControllerMetrics(ctx context.Context, config *rest.Config, clientset kubernetes.Interface, namespace string) (MetricsMap, error) {
- podName, err := findPod(ctx, clientset, namespace, "app.kubernetes.io/name=external-secrets")
- if err != nil {
- return nil, err
- }
- return scrapePodMetrics(ctx, config, clientset, namespace, podName, 8080)
- }
- func ScrapeProviderMetrics(ctx context.Context, config *rest.Config, clientset kubernetes.Interface, namespace, providerName string) (MetricsMap, error) {
- labelSelector := fmt.Sprintf("app.kubernetes.io/name=external-secrets-provider-%s", providerName)
- podName, err := findPod(ctx, clientset, namespace, labelSelector)
- if err != nil {
- return nil, err
- }
- return scrapePodMetrics(ctx, config, clientset, namespace, podName, 8081)
- }
- func GetMetricValue(metrics MetricsMap, metricName string, matchLabels map[string]string) (float64, bool) {
- samples, exists := metrics[metricName]
- if !exists {
- return 0, false
- }
- for _, sample := range samples {
- if labelsMatch(sample.Labels, matchLabels) {
- return sample.Value, true
- }
- }
- return 0, false
- }
- func SumMetricValues(metrics MetricsMap, metricName string, matchLabels map[string]string) float64 {
- samples, exists := metrics[metricName]
- if !exists {
- return 0
- }
- var total float64
- for _, sample := range samples {
- if labelsMatch(sample.Labels, matchLabels) {
- total += sample.Value
- }
- }
- return total
- }
- func CountMetricSamples(metrics MetricsMap, metricName string, matchLabels map[string]string) int {
- samples, exists := metrics[metricName]
- if !exists {
- return 0
- }
- count := 0
- for _, sample := range samples {
- if labelsMatch(sample.Labels, matchLabels) {
- count++
- }
- }
- return count
- }
- func ExpectMetricExists(metrics MetricsMap, metricName string) {
- _, exists := metrics[metricName]
- if !exists {
- availableMetrics := []string{}
- for name := range metrics {
- availableMetrics = append(availableMetrics, name)
- }
- sort.Strings(availableMetrics)
- GinkgoWriter.Printf("Available metrics: %v\n", availableMetrics)
- }
- Expect(exists).To(BeTrue(), "metric %s should exist", metricName)
- }
- func ExpectMetricValue(metrics MetricsMap, metricName string, matchLabels map[string]string, expectedValue float64) {
- value, found := GetMetricValue(metrics, metricName, matchLabels)
- Expect(found).To(BeTrue(), "metric %s with labels %v should exist", metricName, matchLabels)
- Expect(value).To(Equal(expectedValue), "metric %s value mismatch", metricName)
- }
- func ExpectMetricGreaterThan(metrics MetricsMap, metricName string, matchLabels map[string]string, threshold float64) {
- value, found := GetMetricValue(metrics, metricName, matchLabels)
- Expect(found).To(BeTrue(), "metric %s with labels %v should exist", metricName, matchLabels)
- Expect(value).To(BeNumerically(">", threshold), "metric %s should be greater than %f", metricName, threshold)
- }
- func WaitForMetric(ctx context.Context, scraper func() (MetricsMap, error), metricName string, matchLabels map[string]string, minValue float64, timeout time.Duration) error {
- ticker := time.NewTicker(time.Second)
- defer ticker.Stop()
- timer := time.NewTimer(timeout)
- defer timer.Stop()
- for {
- select {
- case <-ctx.Done():
- return fmt.Errorf("waiting for metric %s canceled: %w", metricName, ctx.Err())
- case <-timer.C:
- return fmt.Errorf("timeout waiting for metric %s with labels %v to reach %f", metricName, matchLabels, minValue)
- default:
- }
- metrics, err := scraper()
- if err == nil {
- value, found := GetMetricValue(metrics, metricName, matchLabels)
- if found && value >= minValue {
- return nil
- }
- }
- select {
- case <-ctx.Done():
- return fmt.Errorf("waiting for metric %s canceled: %w", metricName, ctx.Err())
- case <-timer.C:
- return fmt.Errorf("timeout waiting for metric %s with labels %v to reach %f", metricName, matchLabels, minValue)
- case <-ticker.C:
- }
- }
- }
- func scrapePodMetrics(ctx context.Context, config *rest.Config, clientset kubernetes.Interface, namespace, podName string, podPort int) (MetricsMap, error) {
- address, cleanup, err := setupPortForward(ctx, config, clientset, namespace, podName, podPort)
- if err != nil {
- return nil, fmt.Errorf("failed to setup port forward: %w", err)
- }
- defer cleanup()
- body, err := waitForMetricsEndpoint(ctx, address, 10*time.Second)
- if err != nil {
- return nil, err
- }
- return parsePrometheusMetrics(body)
- }
- func findPod(ctx context.Context, clientset kubernetes.Interface, namespace, labelSelector string) (string, error) {
- pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
- LabelSelector: labelSelector,
- })
- if err != nil {
- return "", fmt.Errorf("failed to list pods: %w", err)
- }
- for _, pod := range pods.Items {
- if pod.Status.Phase == corev1.PodRunning {
- return pod.Name, nil
- }
- }
- return "", fmt.Errorf("no running pod found for selector %s", labelSelector)
- }
- func setupPortForward(ctx context.Context, config *rest.Config, clientset kubernetes.Interface, namespace, podName string, podPort int) (string, func(), error) {
- pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
- if err != nil {
- return "", nil, fmt.Errorf("failed to get pod: %w", err)
- }
- if pod.Status.Phase != corev1.PodRunning {
- return "", nil, fmt.Errorf("pod %s is not running: %s", podName, pod.Status.Phase)
- }
- transport, upgrader, err := spdy.RoundTripperFor(config)
- if err != nil {
- return "", nil, fmt.Errorf("failed to create round tripper: %w", err)
- }
- url := clientset.CoreV1().RESTClient().Post().
- Resource("pods").
- Namespace(namespace).
- Name(podName).
- SubResource("portforward").
- URL()
- dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, url)
- stopChan := make(chan struct{}, 1)
- readyChan := make(chan struct{}, 1)
- ports := []string{fmt.Sprintf("0:%d", podPort)}
- pf, err := portforward.New(dialer, ports, stopChan, readyChan, GinkgoWriter, GinkgoWriter)
- if err != nil {
- return "", nil, fmt.Errorf("failed to create port forwarder: %w", err)
- }
- errChan := make(chan error, 1)
- go func() {
- if forwardErr := pf.ForwardPorts(); forwardErr != nil {
- errChan <- forwardErr
- }
- }()
- select {
- case <-readyChan:
- forwardedPorts, portErr := pf.GetPorts()
- if portErr != nil {
- close(stopChan)
- return "", nil, fmt.Errorf("failed to get forwarded ports: %w", portErr)
- }
- if len(forwardedPorts) == 0 {
- close(stopChan)
- return "", nil, fmt.Errorf("no ports were forwarded")
- }
- cleanup := func() {
- close(stopChan)
- }
- return fmt.Sprintf("localhost:%d", forwardedPorts[0].Local), cleanup, nil
- case err = <-errChan:
- close(stopChan)
- return "", nil, fmt.Errorf("port forward failed: %w", err)
- case <-ctx.Done():
- close(stopChan)
- return "", nil, fmt.Errorf("port forward canceled: %w", ctx.Err())
- case <-time.After(30 * time.Second):
- close(stopChan)
- return "", nil, fmt.Errorf("timeout waiting for port forward to be ready")
- }
- }
- func scrapeMetrics(ctx context.Context, address string) (string, error) {
- req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://%s/metrics", address), nil)
- if err != nil {
- return "", fmt.Errorf("failed to create request: %w", err)
- }
- resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req)
- if err != nil {
- return "", fmt.Errorf("failed to scrape metrics: %w", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- }
- body, err := io.ReadAll(resp.Body)
- if err != nil {
- return "", fmt.Errorf("failed to read response body: %w", err)
- }
- return string(body), nil
- }
- func waitForMetricsEndpoint(ctx context.Context, address string, timeout time.Duration) (string, error) {
- ticker := time.NewTicker(250 * time.Millisecond)
- defer ticker.Stop()
- timer := time.NewTimer(timeout)
- defer timer.Stop()
- var lastErr error
- for {
- body, err := scrapeMetrics(ctx, address)
- if err == nil {
- return body, nil
- }
- lastErr = err
- select {
- case <-ctx.Done():
- return "", fmt.Errorf("waiting for metrics endpoint canceled: %w", ctx.Err())
- case <-timer.C:
- return "", fmt.Errorf("timed out waiting for metrics endpoint: %w", lastErr)
- case <-ticker.C:
- }
- }
- }
- func parsePrometheusMetrics(body string) (MetricsMap, error) {
- metrics := make(MetricsMap)
- metricRegex := regexp.MustCompile(`^([a-zA-Z_:][a-zA-Z0-9_:]*?)(?:\{([^}]*)\})?\s+([^\s]+)`)
- scanner := bufio.NewScanner(strings.NewReader(body))
- scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
- for scanner.Scan() {
- line := scanner.Text()
- if strings.HasPrefix(line, "#") || strings.TrimSpace(line) == "" {
- continue
- }
- matches := metricRegex.FindStringSubmatch(line)
- if len(matches) != 4 {
- continue
- }
- value, err := strconv.ParseFloat(matches[3], 64)
- if err != nil {
- continue
- }
- sample := MetricSample{
- Name: matches[1],
- Labels: parseLabels(matches[2]),
- Value: value,
- }
- metrics[sample.Name] = append(metrics[sample.Name], sample)
- }
- if err := scanner.Err(); err != nil {
- return nil, fmt.Errorf("failed to scan metrics response: %w", err)
- }
- return metrics, nil
- }
- func parseLabels(labelsStr string) map[string]string {
- labels := make(map[string]string)
- if labelsStr == "" {
- return labels
- }
- labelRegex := regexp.MustCompile(`([a-zA-Z_][a-zA-Z0-9_]*)="((?:[^"\\]|\\.)*)"`)
- matches := labelRegex.FindAllStringSubmatch(labelsStr, -1)
- for _, match := range matches {
- if len(match) == 3 {
- value, err := strconv.Unquote(`"` + match[2] + `"`)
- if err != nil {
- value = match[2]
- }
- labels[match[1]] = value
- }
- }
- return labels
- }
- func labelsMatch(sampleLabels, matchLabels map[string]string) bool {
- for key, value := range matchLabels {
- if sampleLabels[key] != value {
- return false
- }
- }
- return true
- }
|