metrics.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. /*
  2. Copyright © The ESO Authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. https://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package v2
  14. import (
  15. "bufio"
  16. "context"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "regexp"
  21. "strconv"
  22. "strings"
  23. "time"
  24. . "github.com/onsi/ginkgo/v2"
  25. . "github.com/onsi/gomega"
  26. corev1 "k8s.io/api/core/v1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/client-go/kubernetes"
  29. "k8s.io/client-go/rest"
  30. "k8s.io/client-go/tools/portforward"
  31. "k8s.io/client-go/transport/spdy"
  32. )
  33. type MetricSample struct {
  34. Name string
  35. Labels map[string]string
  36. Value float64
  37. }
  38. type MetricsMap map[string][]MetricSample
  39. func ScrapeControllerMetrics(ctx context.Context, config *rest.Config, clientset kubernetes.Interface, namespace string) (MetricsMap, error) {
  40. podName, err := findPod(ctx, clientset, namespace, "app.kubernetes.io/name=external-secrets")
  41. if err != nil {
  42. return nil, err
  43. }
  44. return scrapePodMetrics(ctx, config, clientset, namespace, podName, 8080)
  45. }
  46. func ScrapeProviderMetrics(ctx context.Context, config *rest.Config, clientset kubernetes.Interface, namespace, providerName string) (MetricsMap, error) {
  47. labelSelector := fmt.Sprintf("app.kubernetes.io/name=external-secrets-provider-%s", providerName)
  48. podName, err := findPod(ctx, clientset, namespace, labelSelector)
  49. if err != nil {
  50. return nil, err
  51. }
  52. return scrapePodMetrics(ctx, config, clientset, namespace, podName, 8081)
  53. }
  54. func GetMetricValue(metrics MetricsMap, metricName string, matchLabels map[string]string) (float64, bool) {
  55. samples, exists := metrics[metricName]
  56. if !exists {
  57. return 0, false
  58. }
  59. for _, sample := range samples {
  60. if labelsMatch(sample.Labels, matchLabels) {
  61. return sample.Value, true
  62. }
  63. }
  64. return 0, false
  65. }
  66. func ExpectMetricExists(metrics MetricsMap, metricName string) {
  67. _, exists := metrics[metricName]
  68. if !exists {
  69. availableMetrics := []string{}
  70. for name := range metrics {
  71. availableMetrics = append(availableMetrics, name)
  72. }
  73. GinkgoWriter.Printf("Available metrics: %v\n", availableMetrics)
  74. }
  75. Expect(exists).To(BeTrue(), "metric %s should exist", metricName)
  76. }
  77. func ExpectMetricValue(metrics MetricsMap, metricName string, matchLabels map[string]string, expectedValue float64) {
  78. value, found := GetMetricValue(metrics, metricName, matchLabels)
  79. Expect(found).To(BeTrue(), "metric %s with labels %v should exist", metricName, matchLabels)
  80. Expect(value).To(Equal(expectedValue), "metric %s value mismatch", metricName)
  81. }
  82. func ExpectMetricGreaterThan(metrics MetricsMap, metricName string, matchLabels map[string]string, threshold float64) {
  83. value, found := GetMetricValue(metrics, metricName, matchLabels)
  84. Expect(found).To(BeTrue(), "metric %s with labels %v should exist", metricName, matchLabels)
  85. Expect(value).To(BeNumerically(">", threshold), "metric %s should be greater than %f", metricName, threshold)
  86. }
  87. func WaitForMetric(ctx context.Context, scraper func() (MetricsMap, error), metricName string, matchLabels map[string]string, minValue float64, timeout time.Duration) error {
  88. deadline := time.Now().Add(timeout)
  89. for time.Now().Before(deadline) {
  90. metrics, err := scraper()
  91. if err == nil {
  92. value, found := GetMetricValue(metrics, metricName, matchLabels)
  93. if found && value >= minValue {
  94. return nil
  95. }
  96. }
  97. time.Sleep(time.Second)
  98. }
  99. return fmt.Errorf("timeout waiting for metric %s with labels %v to reach %f", metricName, matchLabels, minValue)
  100. }
  101. func scrapePodMetrics(ctx context.Context, config *rest.Config, clientset kubernetes.Interface, namespace, podName string, podPort int) (MetricsMap, error) {
  102. address, cleanup, err := setupPortForward(ctx, config, clientset, namespace, podName, podPort)
  103. if err != nil {
  104. return nil, fmt.Errorf("failed to setup port forward: %w", err)
  105. }
  106. defer cleanup()
  107. time.Sleep(time.Second)
  108. body, err := scrapeMetrics(ctx, address)
  109. if err != nil {
  110. return nil, err
  111. }
  112. return parsePrometheusMetrics(body), nil
  113. }
  114. func findPod(ctx context.Context, clientset kubernetes.Interface, namespace, labelSelector string) (string, error) {
  115. pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
  116. LabelSelector: labelSelector,
  117. })
  118. if err != nil {
  119. return "", fmt.Errorf("failed to list pods: %w", err)
  120. }
  121. for _, pod := range pods.Items {
  122. if pod.Status.Phase == corev1.PodRunning {
  123. return pod.Name, nil
  124. }
  125. }
  126. return "", fmt.Errorf("no running pod found for selector %s", labelSelector)
  127. }
  128. func setupPortForward(ctx context.Context, config *rest.Config, clientset kubernetes.Interface, namespace, podName string, podPort int) (string, func(), error) {
  129. pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
  130. if err != nil {
  131. return "", nil, fmt.Errorf("failed to get pod: %w", err)
  132. }
  133. if pod.Status.Phase != corev1.PodRunning {
  134. return "", nil, fmt.Errorf("pod %s is not running: %s", podName, pod.Status.Phase)
  135. }
  136. transport, upgrader, err := spdy.RoundTripperFor(config)
  137. if err != nil {
  138. return "", nil, fmt.Errorf("failed to create round tripper: %w", err)
  139. }
  140. url := clientset.CoreV1().RESTClient().Post().
  141. Resource("pods").
  142. Namespace(namespace).
  143. Name(podName).
  144. SubResource("portforward").
  145. URL()
  146. dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, url)
  147. stopChan := make(chan struct{}, 1)
  148. readyChan := make(chan struct{}, 1)
  149. ports := []string{fmt.Sprintf("0:%d", podPort)}
  150. pf, err := portforward.New(dialer, ports, stopChan, readyChan, GinkgoWriter, GinkgoWriter)
  151. if err != nil {
  152. return "", nil, fmt.Errorf("failed to create port forwarder: %w", err)
  153. }
  154. errChan := make(chan error, 1)
  155. go func() {
  156. if forwardErr := pf.ForwardPorts(); forwardErr != nil {
  157. errChan <- forwardErr
  158. }
  159. }()
  160. select {
  161. case <-readyChan:
  162. forwardedPorts, portErr := pf.GetPorts()
  163. if portErr != nil {
  164. close(stopChan)
  165. return "", nil, fmt.Errorf("failed to get forwarded ports: %w", portErr)
  166. }
  167. if len(forwardedPorts) == 0 {
  168. close(stopChan)
  169. return "", nil, fmt.Errorf("no ports were forwarded")
  170. }
  171. cleanup := func() {
  172. close(stopChan)
  173. }
  174. return fmt.Sprintf("localhost:%d", forwardedPorts[0].Local), cleanup, nil
  175. case err = <-errChan:
  176. close(stopChan)
  177. return "", nil, fmt.Errorf("port forward failed: %w", err)
  178. case <-time.After(30 * time.Second):
  179. close(stopChan)
  180. return "", nil, fmt.Errorf("timeout waiting for port forward to be ready")
  181. }
  182. }
  183. func scrapeMetrics(ctx context.Context, address string) (string, error) {
  184. req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://%s/metrics", address), nil)
  185. if err != nil {
  186. return "", fmt.Errorf("failed to create request: %w", err)
  187. }
  188. resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req)
  189. if err != nil {
  190. return "", fmt.Errorf("failed to scrape metrics: %w", err)
  191. }
  192. defer resp.Body.Close()
  193. if resp.StatusCode != http.StatusOK {
  194. return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  195. }
  196. body, err := io.ReadAll(resp.Body)
  197. if err != nil {
  198. return "", fmt.Errorf("failed to read response body: %w", err)
  199. }
  200. return string(body), nil
  201. }
  202. func parsePrometheusMetrics(body string) MetricsMap {
  203. metrics := make(MetricsMap)
  204. metricRegex := regexp.MustCompile(`^([a-zA-Z_:][a-zA-Z0-9_:]*?)(?:\{([^}]*)\})?\s+([^\s]+)`)
  205. scanner := bufio.NewScanner(strings.NewReader(body))
  206. for scanner.Scan() {
  207. line := scanner.Text()
  208. if strings.HasPrefix(line, "#") || strings.TrimSpace(line) == "" {
  209. continue
  210. }
  211. matches := metricRegex.FindStringSubmatch(line)
  212. if len(matches) != 4 {
  213. continue
  214. }
  215. value, err := strconv.ParseFloat(matches[3], 64)
  216. if err != nil {
  217. continue
  218. }
  219. sample := MetricSample{
  220. Name: matches[1],
  221. Labels: parseLabels(matches[2]),
  222. Value: value,
  223. }
  224. metrics[sample.Name] = append(metrics[sample.Name], sample)
  225. }
  226. return metrics
  227. }
  228. func parseLabels(labelsStr string) map[string]string {
  229. labels := make(map[string]string)
  230. if labelsStr == "" {
  231. return labels
  232. }
  233. labelRegex := regexp.MustCompile(`([a-zA-Z_][a-zA-Z0-9_]*)="([^"]*)"`)
  234. matches := labelRegex.FindAllStringSubmatch(labelsStr, -1)
  235. for _, match := range matches {
  236. if len(match) == 3 {
  237. labels[match[1]] = match[2]
  238. }
  239. }
  240. return labels
  241. }
  242. func labelsMatch(sampleLabels, matchLabels map[string]string) bool {
  243. for key, value := range matchLabels {
  244. if sampleLabels[key] != value {
  245. return false
  246. }
  247. }
  248. return true
  249. }