Skip to content

snyk: add rate limit handling to audit_logs and issues data streams#16184

Open
efd6 wants to merge 16 commits intoelastic:mainfrom
efd6:e24453-snyk-again
Open

snyk: add rate limit handling to audit_logs and issues data streams#16184
efd6 wants to merge 16 commits intoelastic:mainfrom
efd6:e24453-snyk-again

Conversation

@efd6
Copy link
Contributor

@efd6 efd6 commented Dec 2, 2025

Proposed commit message

snyk: add rate limit handling to audit_logs and issues data streams

This enhancement implements comprehensive rate limiting for Snyk API calls
to prevent quota exhaustion and improve collection reliability.

Checklist

  • I have reviewed tips for building integrations and this pull request is aligned with them.
  • I have verified that all data streams collect metrics or logs.
  • I have added an entry to my package's changelog.yml file.
  • I have verified that Kibana version constraints are current according to guidelines.
  • I have verified that any added dashboard complies with Kibana's Dashboard good practices

Author's Checklist

  • [ ]

How to test this PR locally

The system tests exercise the rate limit handling path and you will see this in the pattern of event publications in the issues data stream when tests are run with -v.

To see the rate limit values and the extracted headers, make the following changes to the CEL programs

Details
diff --git a/packages/snyk/data_stream/audit_logs/agent/stream/cel.yml.hbs b/packages/snyk/data_stream/audit_logs/agent/stream/cel.yml.hbs
index 2a3e949a55..c3270bfd5b 100644
--- a/packages/snyk/data_stream/audit_logs/agent/stream/cel.yml.hbs
+++ b/packages/snyk/data_stream/audit_logs/agent/stream/cel.yml.hbs
@@ -124,14 +124,14 @@ program: |-
                 // Threading rate-limit results obtained here through to the final
                 // result is not tenable, so we do not do any work to allow them
                 // to be interpreted by the input and just drop them.
-                rate_limit(
+                debug("RATE_LIMIT_1", rate_limit(
                   headers,
                   "X-Ratelimit",
                   false,
                   true,
                   duration(string(headers[?"X-Ratelimit-Reset"][0].orValue("1")) + "s"),
                   0
-                )
+                ))
               ).drop(["rate", "next"])
             )
           ).as(resp, (resp.StatusCode == 200) ?
@@ -198,14 +198,14 @@ program: |-
           resp.with(
             resp.Header.as(headers,
               // Calculate and apply rate limits.
-              rate_limit(
+              debug("RATE_LIMIT_2", rate_limit(
                 headers,
                 "X-Ratelimit",
                 false,
                 true,
                 duration(string(headers[?"X-Ratelimit-Reset"][0].orValue("1")) + "s"),
                 0
-              )
+              ))
             ).as(rate_headers,
               {
                // Rate limit side-effects have already been applied to the
diff --git a/packages/snyk/data_stream/issues/agent/stream/cel.yml.hbs b/packages/snyk/data_stream/issues/agent/stream/cel.yml.hbs
index dfeccaa246..3f1d0ff8c3 100644
--- a/packages/snyk/data_stream/issues/agent/stream/cel.yml.hbs
+++ b/packages/snyk/data_stream/issues/agent/stream/cel.yml.hbs
@@ -134,14 +134,14 @@ program: |-
         resp.with(
           resp.Header.as(headers,
             // Calculate and apply rate limits.
-            rate_limit(
+            debug("RATE_LIMIT_1", rate_limit(
               headers,
               "X-Ratelimit",
               false,
               true,
               duration(string(headers[?"X-Ratelimit-Reset"][0].orValue("1")) + "s"),
               0
-            )
+            ))
           ).as(rate_headers,
               {
                // Rate limit side-effects have already been applied to the
@@ -195,14 +195,14 @@ program: |-
                                     // Threading rate-limit results obtained here through to the final
                                     // result is not tenable, so we do not do any work to allow them
                                     // to be interpreted by the input and just drop them.
-                                    rate_limit(
+                                    debug("RATE_LIMIT_2", rate_limit(
                                       headers,
                                       "X-Ratelimit",
                                       false,
                                       true,
                                       duration(string(headers[?"X-Ratelimit-Reset"][0].orValue("1")) + "s"),
                                       0
-                                    )
+                                    ))
                                   ).drop(["rate", "next"])
                                 )
                               ).as(resp, (resp.StatusCode == 200) ?

And examine the agent logs (either via docker logs or by getting a diagnostic).

The RATE_LIMIT_1 and RATE_LIMIT_2 tags will appear in the agent debug logs with the computed rate, burst, next, and reset fields from the rate limiter. Check that the values of these fields are sane relative to the HTTP headers which will also be present in the debug output in the message.

Related issues

Screenshots

@efd6 efd6 self-assigned this Dec 2, 2025
@efd6 efd6 added enhancement New feature or request Integration:snyk Snyk Team:Security-Service Integrations Security Service Integrations team [elastic/security-service-integrations] labels Dec 2, 2025
@elastic-vault-github-plugin-prod

🚀 Benchmarks report

To see the full report comment with /test benchmark fullreport

@botelastic
Copy link

botelastic bot commented Jan 1, 2026

Hi! We just realized that we haven't looked into this PR in a while. We're sorry! We're labeling this issue as Stale to make it hit our filters and make sure we get back to it as soon as possible. In the meantime, it'd be extremely helpful if you could take a look at it as well and confirm its relevance. A simple comment with a nice emoji will be enough :+1. Thank you for your contribution!

@botelastic botelastic bot added the Stalled label Jan 1, 2026
@efd6 efd6 removed the Stalled label Jan 18, 2026
@efd6 efd6 marked this pull request as ready for review January 18, 2026 22:18
@efd6 efd6 requested a review from a team as a code owner January 18, 2026 22:18
@elasticmachine
Copy link

Pinging @elastic/security-service-integrations (Team:Security-Service Integrations)

Copy link
Contributor

@chrisberkhout chrisberkhout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about testing this? I think the system test could be modified to exercise the new logic, but that's probably not worthwhile.

I think it would be nice if the commit message or at least the PR's "How to test this PR locally" section had some advice about how to manually exercise it.

I'm thinking I'd run the system test's stream config outside of a container, with extra response headers, then modify the CEL program to run in miko/mito and add some debug() calls to inspect the debugging value.

Comment on lines 96 to 104
rate_headers.with(
{
// Work around inf detection in input.
// If the headers are missing or rate_limit failed, rate and
// next may be missing. So use optional types.
?"rate": (rate_headers.?rate == optional.of(double("Infinity"))) ? optional.of("inf") : optional.none(),
?"next": (rate_headers.?next == optional.of(double("Infinity"))) ? optional.of("inf") : optional.none(),
}
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part will also only affect the debugging value, right?

I think it'd be better to combine it the the following by adding the {"rate_limit": ...} wrapper, so we have one block for side effects and one block for a debugging value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite, but you have identified a nest of bugs that are addressed in 94b456a.

@efd6 efd6 force-pushed the e24453-snyk-again branch from 486b884 to 14e09a1 Compare February 16, 2026 23:44
@efd6
Copy link
Contributor Author

efd6 commented Feb 17, 2026

@chrisberkhout there is significant work to do to get this to merge due to conflicts. I'll ping you when it's ready.

@efd6 efd6 force-pushed the e24453-snyk-again branch from 14e09a1 to 94b456a Compare February 17, 2026 02:43
efd6 added 3 commits February 17, 2026 13:50
* add rate-limit headers
This should not have been converted to a timestamp.
@efd6
Copy link
Contributor Author

efd6 commented Feb 17, 2026

/test

@efd6
Copy link
Contributor Author

efd6 commented Feb 17, 2026

/test

@efd6 efd6 requested a review from chrisberkhout February 17, 2026 06:37
Copy link
Contributor

@chrisberkhout chrisberkhout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

I put a few comments that don't need any response.

One thing left is that the "We are doing this work for the side-effects of the rate_limit call" comments are a bit misleading in the cases where the values are added to the response and returned in the final result.

Maybe some could be changed to:

We are doing this work only for the side-effects of the rate_limit call.

and the others could be removed or changed to something like:

The rate_limit call has side effects as well as generating values for the final result.

Comment on lines 135 to 145
@@ -144,26 +143,29 @@ program: |-
0
)
).as(rate_headers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use resp.Header directly rather than as headers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much clearer. Done

Comment on lines 156 to 157
?"rate": (rate_headers.?rate == optional.of(double("Infinity"))) ? optional.of("inf") : optional.none(),
?"next": (rate_headers.?next == optional.of(double("Infinity"))) ? optional.of("inf") : optional.none(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking...

Looks like this checks for an Infinity valued double and switches it to the string "inf", which in the input will be switched back to rate.Inf, which is the finite math.MaxFloat64 rather than infinite.

Seems like it could have been made simpler, especially in x/time/rate.

But given that's the way it is, maybe at some point rate_limit should handle avoiding infinite rates?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is unfortunate history. It's a conflict between between encoding/json, mito/lib, x/time/rate and cel-go. The origin of the "inf" is from JSON serialisation of math.Inf(1) that results from divisions. We could conceivably condition the return values from the rate limit extensions so that if either of these end up being infinite, it gets replaced with rate.Inf. I think this would be backwards compatible; the relevant text in the documentation is 'The map returned by the policy functions should have "rate" and "next" fields with type rate.Limit or string with the value "inf", a "burst" field with type int and a "reset" field with type time.Time in the UTC location. The semantics of "rate" and "burst" are described in the documentation for the golang.org/x/time/rate package.' This would remain true, but the second option would never happen.

Do you want to file an issue in mito?

Comment on lines 195 to 197
// Threading rate-limit results obtained here through to the final
// result is not tenable, so we do not do any work to allow them
// to be interpreted by the input and just drop them.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking about the existing structure of the CEL program...

Doing multiple requests per eval means you have to reduce the results with complicated logic and accept compromises regarding the feedback that can go back to the input. This is a case of that.

I think the work list approach like we have now in o365 is easier to reason about and can provide better feedback and error handling. The downside is the overhead of extra evals, but I think it's usually worth it. Or maybe I'm missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be worthwhile. I also think it's work for another PR.

@efd6 efd6 requested a review from chrisberkhout February 17, 2026 21:26
@elasticmachine
Copy link

💚 Build Succeeded

History

cc @efd6

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request Integration:snyk Snyk Team:Security-Service Integrations Security Service Integrations team [elastic/security-service-integrations]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

snyk: enhance snyk integration to properly handle rate limits

3 participants