metrics.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. /*
  2. Copyright © The ESO Authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. https://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package v2
  14. import (
  15. "bufio"
  16. "context"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "regexp"
  21. "sort"
  22. "strconv"
  23. "strings"
  24. "time"
  25. corev1 "k8s.io/api/core/v1"
  26. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  27. "k8s.io/client-go/kubernetes"
  28. "k8s.io/client-go/rest"
  29. "k8s.io/client-go/tools/portforward"
  30. "k8s.io/client-go/transport/spdy"
  31. . "github.com/onsi/ginkgo/v2"
  32. . "github.com/onsi/gomega"
  33. )
  34. type MetricSample struct {
  35. Name string
  36. Labels map[string]string
  37. Value float64
  38. }
  39. type MetricsMap map[string][]MetricSample
  40. func ScrapeControllerMetrics(ctx context.Context, config *rest.Config, clientset kubernetes.Interface, namespace string) (MetricsMap, error) {
  41. podName, err := findPod(ctx, clientset, namespace, "app.kubernetes.io/name=external-secrets")
  42. if err != nil {
  43. return nil, err
  44. }
  45. return scrapePodMetrics(ctx, config, clientset, namespace, podName, 8080)
  46. }
  47. func ScrapeProviderMetrics(ctx context.Context, config *rest.Config, clientset kubernetes.Interface, namespace, providerName string) (MetricsMap, error) {
  48. labelSelector := fmt.Sprintf("app.kubernetes.io/name=external-secrets-provider-%s", providerName)
  49. podName, err := findPod(ctx, clientset, namespace, labelSelector)
  50. if err != nil {
  51. return nil, err
  52. }
  53. return scrapePodMetrics(ctx, config, clientset, namespace, podName, 8081)
  54. }
  55. func GetMetricValue(metrics MetricsMap, metricName string, matchLabels map[string]string) (float64, bool) {
  56. samples, exists := metrics[metricName]
  57. if !exists {
  58. return 0, false
  59. }
  60. for _, sample := range samples {
  61. if labelsMatch(sample.Labels, matchLabels) {
  62. return sample.Value, true
  63. }
  64. }
  65. return 0, false
  66. }
  67. func SumMetricValues(metrics MetricsMap, metricName string, matchLabels map[string]string) float64 {
  68. samples, exists := metrics[metricName]
  69. if !exists {
  70. return 0
  71. }
  72. var total float64
  73. for _, sample := range samples {
  74. if labelsMatch(sample.Labels, matchLabels) {
  75. total += sample.Value
  76. }
  77. }
  78. return total
  79. }
  80. func CountMetricSamples(metrics MetricsMap, metricName string, matchLabels map[string]string) int {
  81. samples, exists := metrics[metricName]
  82. if !exists {
  83. return 0
  84. }
  85. count := 0
  86. for _, sample := range samples {
  87. if labelsMatch(sample.Labels, matchLabels) {
  88. count++
  89. }
  90. }
  91. return count
  92. }
  93. func ExpectMetricExists(metrics MetricsMap, metricName string) {
  94. _, exists := metrics[metricName]
  95. if !exists {
  96. availableMetrics := []string{}
  97. for name := range metrics {
  98. availableMetrics = append(availableMetrics, name)
  99. }
  100. sort.Strings(availableMetrics)
  101. GinkgoWriter.Printf("Available metrics: %v\n", availableMetrics)
  102. }
  103. Expect(exists).To(BeTrue(), "metric %s should exist", metricName)
  104. }
  105. func ExpectMetricValue(metrics MetricsMap, metricName string, matchLabels map[string]string, expectedValue float64) {
  106. value, found := GetMetricValue(metrics, metricName, matchLabels)
  107. Expect(found).To(BeTrue(), "metric %s with labels %v should exist", metricName, matchLabels)
  108. Expect(value).To(Equal(expectedValue), "metric %s value mismatch", metricName)
  109. }
  110. func ExpectMetricGreaterThan(metrics MetricsMap, metricName string, matchLabels map[string]string, threshold float64) {
  111. value, found := GetMetricValue(metrics, metricName, matchLabels)
  112. Expect(found).To(BeTrue(), "metric %s with labels %v should exist", metricName, matchLabels)
  113. Expect(value).To(BeNumerically(">", threshold), "metric %s should be greater than %f", metricName, threshold)
  114. }
  115. func WaitForMetric(ctx context.Context, scraper func() (MetricsMap, error), metricName string, matchLabels map[string]string, minValue float64, timeout time.Duration) error {
  116. ticker := time.NewTicker(time.Second)
  117. defer ticker.Stop()
  118. timer := time.NewTimer(timeout)
  119. defer timer.Stop()
  120. for {
  121. select {
  122. case <-ctx.Done():
  123. return fmt.Errorf("waiting for metric %s canceled: %w", metricName, ctx.Err())
  124. case <-timer.C:
  125. return fmt.Errorf("timeout waiting for metric %s with labels %v to reach %f", metricName, matchLabels, minValue)
  126. default:
  127. }
  128. metrics, err := scraper()
  129. if err == nil {
  130. value, found := GetMetricValue(metrics, metricName, matchLabels)
  131. if found && value >= minValue {
  132. return nil
  133. }
  134. }
  135. select {
  136. case <-ctx.Done():
  137. return fmt.Errorf("waiting for metric %s canceled: %w", metricName, ctx.Err())
  138. case <-timer.C:
  139. return fmt.Errorf("timeout waiting for metric %s with labels %v to reach %f", metricName, matchLabels, minValue)
  140. case <-ticker.C:
  141. }
  142. }
  143. }
  144. func scrapePodMetrics(ctx context.Context, config *rest.Config, clientset kubernetes.Interface, namespace, podName string, podPort int) (MetricsMap, error) {
  145. address, cleanup, err := setupPortForward(ctx, config, clientset, namespace, podName, podPort)
  146. if err != nil {
  147. return nil, fmt.Errorf("failed to setup port forward: %w", err)
  148. }
  149. defer cleanup()
  150. body, err := waitForMetricsEndpoint(ctx, address, 10*time.Second)
  151. if err != nil {
  152. return nil, err
  153. }
  154. return parsePrometheusMetrics(body)
  155. }
  156. func findPod(ctx context.Context, clientset kubernetes.Interface, namespace, labelSelector string) (string, error) {
  157. pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
  158. LabelSelector: labelSelector,
  159. })
  160. if err != nil {
  161. return "", fmt.Errorf("failed to list pods: %w", err)
  162. }
  163. for _, pod := range pods.Items {
  164. if pod.Status.Phase == corev1.PodRunning {
  165. return pod.Name, nil
  166. }
  167. }
  168. return "", fmt.Errorf("no running pod found for selector %s", labelSelector)
  169. }
  170. func setupPortForward(ctx context.Context, config *rest.Config, clientset kubernetes.Interface, namespace, podName string, podPort int) (string, func(), error) {
  171. pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
  172. if err != nil {
  173. return "", nil, fmt.Errorf("failed to get pod: %w", err)
  174. }
  175. if pod.Status.Phase != corev1.PodRunning {
  176. return "", nil, fmt.Errorf("pod %s is not running: %s", podName, pod.Status.Phase)
  177. }
  178. transport, upgrader, err := spdy.RoundTripperFor(config)
  179. if err != nil {
  180. return "", nil, fmt.Errorf("failed to create round tripper: %w", err)
  181. }
  182. url := clientset.CoreV1().RESTClient().Post().
  183. Resource("pods").
  184. Namespace(namespace).
  185. Name(podName).
  186. SubResource("portforward").
  187. URL()
  188. dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, url)
  189. stopChan := make(chan struct{}, 1)
  190. readyChan := make(chan struct{}, 1)
  191. ports := []string{fmt.Sprintf("0:%d", podPort)}
  192. pf, err := portforward.New(dialer, ports, stopChan, readyChan, GinkgoWriter, GinkgoWriter)
  193. if err != nil {
  194. return "", nil, fmt.Errorf("failed to create port forwarder: %w", err)
  195. }
  196. errChan := make(chan error, 1)
  197. go func() {
  198. if forwardErr := pf.ForwardPorts(); forwardErr != nil {
  199. errChan <- forwardErr
  200. }
  201. }()
  202. select {
  203. case <-readyChan:
  204. forwardedPorts, portErr := pf.GetPorts()
  205. if portErr != nil {
  206. close(stopChan)
  207. return "", nil, fmt.Errorf("failed to get forwarded ports: %w", portErr)
  208. }
  209. if len(forwardedPorts) == 0 {
  210. close(stopChan)
  211. return "", nil, fmt.Errorf("no ports were forwarded")
  212. }
  213. cleanup := func() {
  214. close(stopChan)
  215. }
  216. return fmt.Sprintf("localhost:%d", forwardedPorts[0].Local), cleanup, nil
  217. case err = <-errChan:
  218. close(stopChan)
  219. return "", nil, fmt.Errorf("port forward failed: %w", err)
  220. case <-ctx.Done():
  221. close(stopChan)
  222. return "", nil, fmt.Errorf("port forward canceled: %w", ctx.Err())
  223. case <-time.After(30 * time.Second):
  224. close(stopChan)
  225. return "", nil, fmt.Errorf("timeout waiting for port forward to be ready")
  226. }
  227. }
  228. func scrapeMetrics(ctx context.Context, address string) (string, error) {
  229. req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://%s/metrics", address), nil)
  230. if err != nil {
  231. return "", fmt.Errorf("failed to create request: %w", err)
  232. }
  233. resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req)
  234. if err != nil {
  235. return "", fmt.Errorf("failed to scrape metrics: %w", err)
  236. }
  237. defer resp.Body.Close()
  238. if resp.StatusCode != http.StatusOK {
  239. return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  240. }
  241. body, err := io.ReadAll(resp.Body)
  242. if err != nil {
  243. return "", fmt.Errorf("failed to read response body: %w", err)
  244. }
  245. return string(body), nil
  246. }
  247. func waitForMetricsEndpoint(ctx context.Context, address string, timeout time.Duration) (string, error) {
  248. ticker := time.NewTicker(250 * time.Millisecond)
  249. defer ticker.Stop()
  250. timer := time.NewTimer(timeout)
  251. defer timer.Stop()
  252. var lastErr error
  253. for {
  254. body, err := scrapeMetrics(ctx, address)
  255. if err == nil {
  256. return body, nil
  257. }
  258. lastErr = err
  259. select {
  260. case <-ctx.Done():
  261. return "", fmt.Errorf("waiting for metrics endpoint canceled: %w", ctx.Err())
  262. case <-timer.C:
  263. return "", fmt.Errorf("timed out waiting for metrics endpoint: %w", lastErr)
  264. case <-ticker.C:
  265. }
  266. }
  267. }
  268. func parsePrometheusMetrics(body string) (MetricsMap, error) {
  269. metrics := make(MetricsMap)
  270. metricRegex := regexp.MustCompile(`^([a-zA-Z_:][a-zA-Z0-9_:]*?)(?:\{([^}]*)\})?\s+([^\s]+)`)
  271. scanner := bufio.NewScanner(strings.NewReader(body))
  272. scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
  273. for scanner.Scan() {
  274. line := scanner.Text()
  275. if strings.HasPrefix(line, "#") || strings.TrimSpace(line) == "" {
  276. continue
  277. }
  278. matches := metricRegex.FindStringSubmatch(line)
  279. if len(matches) != 4 {
  280. continue
  281. }
  282. value, err := strconv.ParseFloat(matches[3], 64)
  283. if err != nil {
  284. continue
  285. }
  286. sample := MetricSample{
  287. Name: matches[1],
  288. Labels: parseLabels(matches[2]),
  289. Value: value,
  290. }
  291. metrics[sample.Name] = append(metrics[sample.Name], sample)
  292. }
  293. if err := scanner.Err(); err != nil {
  294. return nil, fmt.Errorf("failed to scan metrics response: %w", err)
  295. }
  296. return metrics, nil
  297. }
  298. func parseLabels(labelsStr string) map[string]string {
  299. labels := make(map[string]string)
  300. if labelsStr == "" {
  301. return labels
  302. }
  303. labelRegex := regexp.MustCompile(`([a-zA-Z_][a-zA-Z0-9_]*)="((?:[^"\\]|\\.)*)"`)
  304. matches := labelRegex.FindAllStringSubmatch(labelsStr, -1)
  305. for _, match := range matches {
  306. if len(match) == 3 {
  307. value, err := strconv.Unquote(`"` + match[2] + `"`)
  308. if err != nil {
  309. value = match[2]
  310. }
  311. labels[match[1]] = value
  312. }
  313. }
  314. return labels
  315. }
  316. func labelsMatch(sampleLabels, matchLabels map[string]string) bool {
  317. for key, value := range matchLabels {
  318. if sampleLabels[key] != value {
  319. return false
  320. }
  321. }
  322. return true
  323. }