Files
signoz/tests/integration/fixtures/gatewayutils.py
Karan Balani ddecf05d9f fix: omit unset limit values in gateway update api payload (#10388)
* fix: limit value size and count to pointers with omitempty

* fix: openapi specs backend

* fix: openapi specs frontend

* chore: add go tests for limits validations

* fix: liniting issues

* test: remove go test and add gateway integration tests with mocked gateway for all gateway apis

* feat: add gateway in integration ci src matrix

* chore: divide tests into multiple files for keys and limits and utilities

* fix: creation ingestion key returns 201, check for actual values in tests

* fix: creation ingestion key returns 201, check for actual values in tests

* fix: create ingestion key gateway api mock status code as 201
2026-02-23 16:08:40 +00:00

49 lines
1.5 KiB
Python

import json
from typing import Optional
import requests
from wiremock.client import WireMockMatchers
from fixtures import types
TEST_KEY_ID = "test-key-id-001"
TEST_LIMIT_ID = "test-limit-id-001"
def common_gateway_headers():
"""Common headers expected on requests forwarded to the gateway."""
return {
"X-Signoz-Cloud-Api-Key": {WireMockMatchers.EQUAL_TO: "secret-key"},
"X-Consumer-Username": {
WireMockMatchers.EQUAL_TO: "lid:00000000-0000-0000-0000-000000000000"
},
"X-Consumer-Groups": {WireMockMatchers.EQUAL_TO: "ns:default"},
}
def get_gateway_requests(signoz: types.SigNoz, method: str, url: str) -> list:
"""Return captured requests from the WireMock gateway journal.
Returns an empty list when no requests match or the admin API is unreachable.
"""
response = requests.post(
signoz.gateway.host_configs["8080"].get("/__admin/requests/find"),
json={"method": method, "url": url},
timeout=5,
)
return response.json().get("requests", [])
def get_latest_gateway_request_body(
signoz: types.SigNoz, method: str, url: str
) -> Optional[dict]:
"""Return the parsed JSON body of the most recent matching gateway request.
WireMock returns requests in reverse chronological order, so ``matched[0]``
is always the latest. Returns ``None`` when no matching request is found.
"""
matched = get_gateway_requests(signoz, method, url)
if not matched:
return None
return json.loads(matched[0]["body"])