mirror of
https://github.com/SigNoz/signoz.git
synced 2026-03-19 03:02:16 +00:00
Compare commits
34 Commits
fix/order-
...
nv/6204
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
36e8d2e6c6 | ||
|
|
1e4191c7bf | ||
|
|
edffb359c1 | ||
|
|
22cc7509fc | ||
|
|
dee8e53b6b | ||
|
|
285a31afa6 | ||
|
|
8cfa9dc038 | ||
|
|
b986b0a5f4 | ||
|
|
28e0f2f7ad | ||
|
|
cd458f0205 | ||
|
|
ee2916e6c6 | ||
|
|
1c1d069263 | ||
|
|
7dc46db2e3 | ||
|
|
bfcd423a45 | ||
|
|
323b1163e5 | ||
|
|
673379a46c | ||
|
|
37f490c705 | ||
|
|
324e34092e | ||
|
|
2a2c365950 | ||
|
|
14065d39a6 | ||
|
|
bf2133f1ab | ||
|
|
d7d907f687 | ||
|
|
76b4549504 | ||
|
|
968a5089ff | ||
|
|
c082bc3d76 | ||
|
|
59e0dcc865 | ||
|
|
89840189ef | ||
|
|
b64a07db02 | ||
|
|
38d971b3c9 | ||
|
|
f8b266ce05 | ||
|
|
20f7562cbc | ||
|
|
29713964ce | ||
|
|
afb252b4f9 | ||
|
|
c808b4d759 |
@@ -1,65 +0,0 @@
|
||||
package cloudintegration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
citypes "github.com/SigNoz/signoz/pkg/types/cloudintegrationtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Module interface {
|
||||
CreateAccount(ctx context.Context, account *citypes.Account) error
|
||||
|
||||
// GetAccount returns cloud integration account
|
||||
GetAccount(ctx context.Context, orgID, accountID valuer.UUID) (*citypes.Account, error)
|
||||
|
||||
// ListAccounts lists accounts where agent is connected
|
||||
ListAccounts(ctx context.Context, orgID valuer.UUID) ([]*citypes.Account, error)
|
||||
|
||||
// UpdateAccount updates the cloud integration account for a specific organization.
|
||||
UpdateAccount(ctx context.Context, account *citypes.Account) error
|
||||
|
||||
// DisconnectAccount soft deletes/removes a cloud integration account.
|
||||
DisconnectAccount(ctx context.Context, orgID, accountID valuer.UUID) error
|
||||
|
||||
// GetConnectionArtifact returns cloud provider specific connection information,
|
||||
// client side handles how this information is shown
|
||||
GetConnectionArtifact(ctx context.Context, account *citypes.Account, req *citypes.ConnectionArtifactRequest) (*citypes.ConnectionArtifact, error)
|
||||
|
||||
// ListServicesMetadata returns the list of services metadata for a cloud provider attached with the integrationID.
|
||||
// This just returns a summary of the service and not the whole service definition
|
||||
ListServicesMetadata(ctx context.Context, orgID valuer.UUID, integrationID *valuer.UUID) ([]*citypes.ServiceMetadata, error)
|
||||
|
||||
// GetService returns service definition details for a serviceID. This returns config and
|
||||
// other details required to show in service details page on web client.
|
||||
GetService(ctx context.Context, orgID valuer.UUID, integrationID *valuer.UUID, serviceID string) (*citypes.Service, error)
|
||||
|
||||
// UpdateService updates cloud integration service
|
||||
UpdateService(ctx context.Context, orgID valuer.UUID, service *citypes.CloudIntegrationService) error
|
||||
|
||||
// AgentCheckIn is called by agent to heartbeat and get latest config in response.
|
||||
AgentCheckIn(ctx context.Context, orgID valuer.UUID, req *citypes.AgentCheckInRequest) (*citypes.AgentCheckInResponse, error)
|
||||
|
||||
// GetDashboardByID returns dashboard JSON for a given dashboard id.
|
||||
// this only returns the dashboard when the service (embedded in dashboard id) is enabled
|
||||
// in the org for any cloud integration account
|
||||
GetDashboardByID(ctx context.Context, orgID valuer.UUID, id string) (*dashboardtypes.Dashboard, error)
|
||||
|
||||
// ListDashboards returns list of dashboards across all connected cloud integration accounts
|
||||
// for enabled services in the org. This list gets added to dashboard list page
|
||||
ListDashboards(ctx context.Context, orgID valuer.UUID) ([]*dashboardtypes.Dashboard, error)
|
||||
}
|
||||
|
||||
type Handler interface {
|
||||
GetConnectionArtifact(http.ResponseWriter, *http.Request)
|
||||
ListAccounts(http.ResponseWriter, *http.Request)
|
||||
GetAccount(http.ResponseWriter, *http.Request)
|
||||
UpdateAccount(http.ResponseWriter, *http.Request)
|
||||
DisconnectAccount(http.ResponseWriter, *http.Request)
|
||||
ListServicesMetadata(http.ResponseWriter, *http.Request)
|
||||
GetService(http.ResponseWriter, *http.Request)
|
||||
UpdateService(http.ResponseWriter, *http.Request)
|
||||
AgentCheckIn(http.ResponseWriter, *http.Request)
|
||||
}
|
||||
@@ -69,6 +69,7 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
key string // deterministic join of label values
|
||||
}
|
||||
seriesMap := map[sKey]*qbtypes.TimeSeries{}
|
||||
var keyOrder []sKey // preserves ClickHouse row-arrival order
|
||||
|
||||
stepMs := uint64(step.Duration.Milliseconds())
|
||||
|
||||
@@ -219,6 +220,7 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
if !ok {
|
||||
series = &qbtypes.TimeSeries{Labels: lblObjs}
|
||||
seriesMap[key] = series
|
||||
keyOrder = append(keyOrder, key)
|
||||
}
|
||||
series.Values = append(series.Values, &qbtypes.TimeSeriesValue{
|
||||
Timestamp: ts,
|
||||
@@ -250,8 +252,8 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
|
||||
Alias: "__result_" + strconv.Itoa(i),
|
||||
}
|
||||
}
|
||||
for k, s := range seriesMap {
|
||||
buckets[k.agg].Series = append(buckets[k.agg].Series, s)
|
||||
for _, k := range keyOrder {
|
||||
buckets[k.agg].Series = append(buckets[k.agg].Series, seriesMap[k])
|
||||
}
|
||||
|
||||
var nonEmpty []*qbtypes.AggregationBucket
|
||||
|
||||
@@ -185,22 +185,6 @@ func postProcessMetricQuery(
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||
req *qbtypes.QueryRangeRequest,
|
||||
) *qbtypes.Result {
|
||||
|
||||
config := query.Aggregations[0]
|
||||
spaceAggOrderBy := fmt.Sprintf("%s(%s)", config.SpaceAggregation.StringValue(), config.MetricName)
|
||||
timeAggOrderBy := fmt.Sprintf("%s(%s)", config.TimeAggregation.StringValue(), config.MetricName)
|
||||
timeSpaceAggOrderBy := fmt.Sprintf("%s(%s(%s))", config.SpaceAggregation.StringValue(), config.TimeAggregation.StringValue(), config.MetricName)
|
||||
|
||||
for idx := range query.Order {
|
||||
if query.Order[idx].Key.Name == spaceAggOrderBy ||
|
||||
query.Order[idx].Key.Name == timeAggOrderBy ||
|
||||
query.Order[idx].Key.Name == timeSpaceAggOrderBy {
|
||||
query.Order[idx].Key.Name = qbtypes.DefaultOrderByKey
|
||||
}
|
||||
}
|
||||
|
||||
result = q.applySeriesLimit(result, query.Limit, query.Order)
|
||||
|
||||
if len(query.Functions) > 0 {
|
||||
step := query.StepInterval.Duration.Milliseconds()
|
||||
functions := q.prepareFillZeroArgsWithStep(query.Functions, req, step)
|
||||
|
||||
@@ -115,6 +115,7 @@ func (r *Repo) GetLatestVersion(
|
||||
func (r *Repo) insertConfig(
|
||||
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, c *opamptypes.AgentConfigVersion, elements []string,
|
||||
) error {
|
||||
|
||||
if c.ElementType.StringValue() == "" {
|
||||
return errors.NewInvalidInputf(CodeElementTypeRequired, "element type is required for creating agent config version")
|
||||
}
|
||||
@@ -228,25 +229,6 @@ func (r *Repo) updateDeployStatus(ctx context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDeployStatusByHash returns the DeployStatus for the given config hash
|
||||
// (stored with orgId prefix). Returns DeployStatusUnknown when no matching row exists.
|
||||
func (r *Repo) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
|
||||
var version opamptypes.AgentConfigVersion
|
||||
err := r.store.BunDB().NewSelect().
|
||||
Model(&version).
|
||||
ColumnExpr("deploy_status").
|
||||
Where("hash = ?", configHash).
|
||||
Where("org_id = ?", orgId).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return opamptypes.DeployStatusUnknown, nil
|
||||
}
|
||||
return opamptypes.DeployStatusUnknown, errors.WrapInternalf(err, errors.CodeInternal, "failed to query deploy status by hash")
|
||||
}
|
||||
return version.DeployStatus, nil
|
||||
}
|
||||
|
||||
func (r *Repo) updateDeployStatusByHash(
|
||||
ctx context.Context, orgId valuer.UUID, confighash string, status string, result string,
|
||||
) error {
|
||||
|
||||
@@ -180,12 +180,6 @@ func (m *Manager) ReportConfigDeploymentStatus(
|
||||
}
|
||||
}
|
||||
|
||||
// Implements model.AgentConfigProvider
|
||||
func (m *Manager) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
|
||||
return m.Repo.GetDeployStatusByHash(ctx, orgId, configHash)
|
||||
}
|
||||
|
||||
|
||||
func GetLatestVersion(
|
||||
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType,
|
||||
) (*opamptypes.AgentConfigVersion, error) {
|
||||
|
||||
@@ -131,32 +131,38 @@ func (ic *LogParsingPipelineController) ValidatePipelines(ctx context.Context,
|
||||
return err
|
||||
}
|
||||
|
||||
func (ic *LogParsingPipelineController) getNormalizePipeline() pipelinetypes.GettablePipeline {
|
||||
return pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: pipelinetypes.StoreablePipeline{
|
||||
Name: "Default Pipeline - PreProcessing Body",
|
||||
Alias: "NormalizeBodyDefault",
|
||||
Enabled: true,
|
||||
},
|
||||
Filter: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body",
|
||||
func (ic *LogParsingPipelineController) getDefaultPipelines() ([]pipelinetypes.GettablePipeline, error) {
|
||||
defaultPipelines := []pipelinetypes.GettablePipeline{}
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
preprocessingPipeline := pipelinetypes.GettablePipeline{
|
||||
StoreablePipeline: pipelinetypes.StoreablePipeline{
|
||||
Name: "Default Pipeline - PreProcessing Body",
|
||||
Alias: "NormalizeBodyDefault",
|
||||
Enabled: true,
|
||||
},
|
||||
Filter: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "body",
|
||||
},
|
||||
Operator: v3.FilterOperatorExists,
|
||||
},
|
||||
Operator: v3.FilterOperatorExists,
|
||||
},
|
||||
},
|
||||
},
|
||||
Config: []pipelinetypes.PipelineOperator{
|
||||
{
|
||||
ID: uuid.NewString(),
|
||||
Type: "normalize",
|
||||
Enabled: true,
|
||||
If: "body != nil",
|
||||
Config: []pipelinetypes.PipelineOperator{
|
||||
{
|
||||
ID: uuid.NewString(),
|
||||
Type: "normalize",
|
||||
Enabled: true,
|
||||
If: "body != nil",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
defaultPipelines = append(defaultPipelines, preprocessingPipeline)
|
||||
}
|
||||
return defaultPipelines, nil
|
||||
}
|
||||
|
||||
// Returns effective list of pipelines including user created
|
||||
@@ -289,10 +295,12 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
if querybuilder.BodyJSONQueryEnabled {
|
||||
// add default normalize pipeline at the beginning
|
||||
pipelinesResp.Pipelines = append([]pipelinetypes.GettablePipeline{pc.getNormalizePipeline()}, pipelinesResp.Pipelines...)
|
||||
// recommend default pipelines along with user created pipelines
|
||||
defaultPipelines, err := pc.getDefaultPipelines()
|
||||
if err != nil {
|
||||
return nil, "", model.InternalError(fmt.Errorf("failed to get default pipelines: %w", err))
|
||||
}
|
||||
pipelinesResp.Pipelines = append(pipelinesResp.Pipelines, defaultPipelines...)
|
||||
|
||||
updatedConf, err := GenerateCollectorConfigWithPipelines(currentConfYaml, pipelinesResp.Pipelines)
|
||||
if err != nil {
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package opamp
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
|
||||
)
|
||||
import "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
|
||||
|
||||
// Interface for a source of otel collector config recommendations.
|
||||
type AgentConfigProvider interface {
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"log"
|
||||
"net"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/google/uuid"
|
||||
"github.com/knadh/koanf"
|
||||
@@ -128,11 +127,6 @@ func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus(orgID valuer.UUID
|
||||
return exists
|
||||
}
|
||||
|
||||
// AgentConfigProvider interface
|
||||
func (ta *MockAgentConfigProvider) GetDeployStatusByHash(_ context.Context, _ valuer.UUID, _ string) (opamptypes.DeployStatus, error) {
|
||||
return opamptypes.DeployStatusUnknown, nil
|
||||
}
|
||||
|
||||
// AgentConfigProvider interface
|
||||
func (ta *MockAgentConfigProvider) SubscribeToConfigUpdates(callback func()) func() {
|
||||
subscriberId := uuid.NewString()
|
||||
|
||||
@@ -111,99 +111,90 @@ func ExtractLbFlag(agentDescr *protobufs.AgentDescription) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// agentDescriptionChanged returns true when the agent sends updated properties
|
||||
// (e.g. capability flag, version) mid-connection, signalling the server to
|
||||
// recompute and push a new RemoteConfig.
|
||||
//
|
||||
// On reconnect this always returns false: handleFirstStatus pre-copies
|
||||
// AgentDescription into agent.Status so no diff is detected, avoiding a
|
||||
// redundant config recompute.
|
||||
func (agent *Agent) agentDescriptionChanged(newStatus *protobufs.AgentToServer) bool {
|
||||
// nil AgentDescription means no change per OpAMP protocol.
|
||||
if newStatus.AgentDescription == nil {
|
||||
return false
|
||||
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
|
||||
prevStatus := agent.Status
|
||||
|
||||
if agent.Status == nil {
|
||||
// First time this Agent reports a status, remember it.
|
||||
agent.Status = newStatus
|
||||
agentDescrChanged = true
|
||||
} else {
|
||||
// Not a new Agent. Update the Status.
|
||||
agent.Status.SequenceNum = newStatus.SequenceNum
|
||||
|
||||
// Check what's changed in the AgentDescription.
|
||||
if newStatus.AgentDescription != nil {
|
||||
// If the AgentDescription field is set it means the Agent tells us
|
||||
// something is changed in the field since the last status report
|
||||
// (or this is the first report).
|
||||
// Make full comparison of previous and new descriptions to see if it
|
||||
// really is different.
|
||||
if prevStatus != nil && proto.Equal(prevStatus.AgentDescription, newStatus.AgentDescription) {
|
||||
// Agent description didn't change.
|
||||
agentDescrChanged = false
|
||||
} else {
|
||||
// Yes, the description is different, update it.
|
||||
agent.Status.AgentDescription = newStatus.AgentDescription
|
||||
agentDescrChanged = true
|
||||
}
|
||||
} else {
|
||||
// AgentDescription field is not set, which means description didn't change.
|
||||
agentDescrChanged = false
|
||||
}
|
||||
|
||||
// Update remote config status if it is included and is different from what we have.
|
||||
if newStatus.RemoteConfigStatus != nil &&
|
||||
!proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
|
||||
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
|
||||
|
||||
// todo: need to address multiple agent scenario here
|
||||
// for now, the first response will be sent back to the UI
|
||||
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED {
|
||||
onConfigSuccess(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
|
||||
}
|
||||
|
||||
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED {
|
||||
onConfigFailure(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
|
||||
}
|
||||
}
|
||||
}
|
||||
if proto.Equal(agent.Status.AgentDescription, newStatus.AgentDescription) {
|
||||
return false
|
||||
|
||||
if agentDescrChanged {
|
||||
agent.CanLB = ExtractLbFlag(newStatus.AgentDescription)
|
||||
}
|
||||
|
||||
return agentDescrChanged
|
||||
}
|
||||
|
||||
func (agent *Agent) updateHealth(newStatus *protobufs.AgentToServer) {
|
||||
if newStatus.Health == nil {
|
||||
return
|
||||
}
|
||||
|
||||
agent.Status.Health = newStatus.Health
|
||||
|
||||
if agent.Status != nil && agent.Status.Health != nil && agent.Status.Health.Healthy {
|
||||
agent.TimeAuditable.UpdatedAt = time.Unix(0, int64(agent.Status.Health.StartTimeUnixNano)).UTC()
|
||||
}
|
||||
agent.CanLB = ExtractLbFlag(newStatus.AgentDescription)
|
||||
return true
|
||||
}
|
||||
|
||||
// updateRemoteConfigStatus updates the stored RemoteConfigStatus and notifies
|
||||
// subscribers if the status has changed relative to what we have stored.
|
||||
func (agent *Agent) updateRemoteConfigStatus(newStatus *protobufs.AgentToServer) {
|
||||
if newStatus.RemoteConfigStatus == nil ||
|
||||
proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
|
||||
return
|
||||
}
|
||||
|
||||
// todo: need to address multiple agent scenario here
|
||||
// for now, the first response will be sent back to the UI
|
||||
hash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
|
||||
switch newStatus.RemoteConfigStatus.Status {
|
||||
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED:
|
||||
onConfigSuccess(agent.OrgID, agent.AgentID, hash)
|
||||
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED:
|
||||
onConfigFailure(agent.OrgID, agent.AgentID, hash, newStatus.RemoteConfigStatus.ErrorMessage)
|
||||
// Update remote config status if it is included and is different from what we have.
|
||||
if newStatus.RemoteConfigStatus != nil {
|
||||
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
|
||||
}
|
||||
}
|
||||
|
||||
// handleFirstStatus initializes agent.Status on the first message received from
|
||||
// this agent instance. It is a no-op for all subsequent messages.
|
||||
func (agent *Agent) handleFirstStatus(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) {
|
||||
if agent.Status != nil {
|
||||
return
|
||||
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
|
||||
if agent.Status == nil {
|
||||
// First time this Agent reports a status, remember it.
|
||||
agent.Status = newStatus
|
||||
agentDescrChanged = true
|
||||
}
|
||||
|
||||
// Initialize with a clean slate.
|
||||
agent.Status = &protobufs.AgentToServer{
|
||||
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
|
||||
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
},
|
||||
}
|
||||
|
||||
if newStatus.RemoteConfigStatus == nil ||
|
||||
newStatus.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET {
|
||||
// Agent just started fresh — no prior deployment to reconcile with the DB.
|
||||
return
|
||||
}
|
||||
|
||||
// Since the server's connection is restarted;
|
||||
// copy the agent description; so no change is detected by agentDescriptionChanged
|
||||
agent.Status.AgentDescription = newStatus.AgentDescription
|
||||
|
||||
// Server reconnected while the agent was already running.
|
||||
// Reconcile deployment status with DB; DB is the source of truth.
|
||||
// If DB says in_progress but agent now reports APPLIED/FAILED,
|
||||
// updateRemoteConfigStatus will detect the transition and notify subscribers.
|
||||
rawHash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
|
||||
deployStatus, err := configProvider.GetDeployStatusByHash(context.Background(), agent.OrgID, agent.OrgID.String()+rawHash)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
agent.Status.RemoteConfigStatus.Status = opamptypes.DeployStatusToProtoStatus[deployStatus]
|
||||
|
||||
// If the deployment is still in-flight, rehydrate the subscriber so that
|
||||
// updateRemoteConfigStatus can fire onConfigSuccess/onConfigFailure when
|
||||
// the agent next reports a terminal status.
|
||||
if deployStatus != opamptypes.Deployed && deployStatus != opamptypes.DeployFailed {
|
||||
ListenToConfigUpdate(agent.OrgID, agent.AgentID, rawHash, configProvider.ReportConfigDeploymentStatus)
|
||||
}
|
||||
}
|
||||
|
||||
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) bool {
|
||||
agent.handleFirstStatus(newStatus, configProvider)
|
||||
agentDescrChanged := agent.agentDescriptionChanged(newStatus)
|
||||
// record healthy timestamp
|
||||
if newStatus.Health != nil && newStatus.Health.Healthy {
|
||||
agent.TimeAuditable.UpdatedAt = time.Unix(0, int64(newStatus.Health.StartTimeUnixNano)).UTC()
|
||||
}
|
||||
// notify subscribers first; this will update the status in the DB
|
||||
agentDescrChanged = agent.updateAgentDescription(newStatus) || agentDescrChanged
|
||||
agent.updateRemoteConfigStatus(newStatus)
|
||||
// update local reference in last.
|
||||
agent.Status = newStatus
|
||||
agent.updateHealth(newStatus)
|
||||
return agentDescrChanged
|
||||
}
|
||||
|
||||
@@ -246,7 +237,7 @@ func (agent *Agent) processStatusUpdate(
|
||||
// current status is not up-to-date.
|
||||
lostPreviousUpdate := (agent.Status == nil) || (agent.Status != nil && agent.Status.SequenceNum+1 != newStatus.SequenceNum)
|
||||
|
||||
agentDescrChanged := agent.updateStatusField(newStatus, configProvider)
|
||||
agentDescrChanged := agent.updateStatusField(newStatus)
|
||||
|
||||
// Check if any fields were omitted in the status report.
|
||||
effectiveConfigOmitted := newStatus.EffectiveConfig == nil &&
|
||||
|
||||
@@ -1,11 +1,6 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
import "github.com/SigNoz/signoz/pkg/valuer"
|
||||
|
||||
// Interface for source of otel collector config recommendations.
|
||||
type AgentConfigProvider interface {
|
||||
@@ -25,10 +20,4 @@ type AgentConfigProvider interface {
|
||||
configId string,
|
||||
err error,
|
||||
)
|
||||
|
||||
// GetDeployStatusByHash returns the DeployStatus for the given config hash
|
||||
// (with orgId prefix as stored in the DB). Returns DeployStatusUnknown when
|
||||
// no matching row exists. Used by the agent's first-connect handler to
|
||||
// determine whether the reported RemoteConfigStatus resolves a pending deployment.
|
||||
GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error)
|
||||
}
|
||||
|
||||
@@ -66,7 +66,6 @@ func ListenToConfigUpdate(orgId valuer.UUID, agentId string, hash string, ss OnC
|
||||
defer coordinator.mutex.Unlock()
|
||||
|
||||
key := getSubscriberKey(orgId, hash)
|
||||
|
||||
if subs, ok := coordinator.subscribers[key]; ok {
|
||||
subs = append(subs, ss)
|
||||
coordinator.subscribers[key] = subs
|
||||
|
||||
@@ -51,7 +51,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, max(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, max(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
Args: []any{"signoz_calls_total", uint64(1747785600000), uint64(1747983420000), "cartservice", "cumulative", 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -84,7 +84,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
||||
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta"},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -117,7 +117,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `service.name`, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'service.name') AS `service.name`, sum(value)/86400 AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'service.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `service.name`, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
Args: []any{"signoz_calls_total", uint64(1747872000000), uint64(1747983420000), "cartservice", "delta", 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -150,7 +150,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'host.name') AS `host.name`, avg(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'host.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name`, ts",
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(86400)) AS ts, JSONExtractString(labels, 'host.name') AS `host.name`, avg(value) AS per_series_value FROM signoz_meter.distributed_samples AS points WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? AND JSONExtractString(labels, 'host.name') = ? AND LOWER(temporality) LIKE LOWER(?) GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT `host.name` FROM __spatial_aggregation_cte GROUP BY `host.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `host.name`) DESC, `host.name`, ts ASC",
|
||||
Args: []any{"system.memory.usage", uint64(1747872000000), uint64(1747983420000), "big-data-node-1", "unspecified", 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/flagger"
|
||||
@@ -546,6 +547,16 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
|
||||
) (*qbtypes.Statement, error) {
|
||||
metricType := query.Aggregations[0].Type
|
||||
spaceAgg := query.Aggregations[0].SpaceAggregation
|
||||
finalCTE := "__spatial_aggregation_cte"
|
||||
if metricType == metrictypes.HistogramType {
|
||||
histogramCTE, histogramCTEArgs, err := b.buildHistogramCTE(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cteFragments = append(cteFragments, histogramCTE)
|
||||
cteArgs = append(cteArgs, histogramCTEArgs)
|
||||
finalCTE = "__histogram_cte"
|
||||
}
|
||||
|
||||
combined := querybuilder.CombineCTEs(cteFragments)
|
||||
|
||||
@@ -555,60 +566,104 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
|
||||
}
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("*")
|
||||
sb.From(finalCTE)
|
||||
if query.Having != nil && query.Having.Expression != "" {
|
||||
rewriter := querybuilder.NewHavingExpressionRewriter()
|
||||
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
|
||||
sb.Where(rewrittenExpr)
|
||||
}
|
||||
|
||||
if metricType == metrictypes.HistogramType && spaceAgg.IsPercentile() {
|
||||
quantile := query.Aggregations[0].SpaceAggregation.Percentile()
|
||||
sb.Select("ts")
|
||||
for _, g := range query.GroupBy {
|
||||
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
groupByKeys := querybuilder.GroupByKeys(query.GroupBy)
|
||||
hasOrder := len(query.Order) > 0
|
||||
hasLimit := query.Limit > 0
|
||||
hasGroupBy := len(groupByKeys) > 0
|
||||
|
||||
isMetricAggOrderByKey := func(key string, config qbtypes.MetricAggregation) bool {
|
||||
spaceAggOrderBy := fmt.Sprintf("%s(%s)", config.SpaceAggregation.StringValue(), config.MetricName)
|
||||
timeAggOrderBy := fmt.Sprintf("%s(%s)", config.TimeAggregation.StringValue(), config.MetricName)
|
||||
timeSpaceAggOrderBy := fmt.Sprintf("%s(%s(%s))", config.SpaceAggregation.StringValue(), config.TimeAggregation.StringValue(), config.MetricName)
|
||||
return key == spaceAggOrderBy || key == timeAggOrderBy || key == timeSpaceAggOrderBy
|
||||
}
|
||||
|
||||
if !hasGroupBy {
|
||||
// do nothing, limits and orders don't mean anything
|
||||
} else if hasOrder && hasLimit {
|
||||
labelSelectorSubQueryBuilder := sqlbuilder.NewSelectBuilder()
|
||||
labelSelectorSubQueryBuilder.Select(groupByKeys...)
|
||||
labelSelectorSubQueryBuilder.From(finalCTE)
|
||||
labelSelectorOrderClauses := []string{}
|
||||
orderedKeys := map[string]struct{}{} // this will be used to add the remaining keys as tie breakers in the end
|
||||
for _, o := range query.Order {
|
||||
key := o.Key.Name
|
||||
var clause string
|
||||
if isMetricAggOrderByKey(key, query.Aggregations[0]) {
|
||||
clause = fmt.Sprintf("avg(value) %s", o.Direction.StringValue())
|
||||
} else {
|
||||
clause = fmt.Sprintf("`%s` %s", key, o.Direction.StringValue())
|
||||
orderedKeys[fmt.Sprintf("`%s`", key)] = struct{}{}
|
||||
}
|
||||
labelSelectorOrderClauses = append(labelSelectorOrderClauses, clause)
|
||||
}
|
||||
sb.SelectMore(fmt.Sprintf(
|
||||
"histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) AS value",
|
||||
quantile,
|
||||
))
|
||||
sb.From("__spatial_aggregation_cte")
|
||||
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
sb.GroupBy("ts")
|
||||
if query.Having != nil && query.Having.Expression != "" {
|
||||
rewriter := querybuilder.NewHavingExpressionRewriter()
|
||||
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
|
||||
sb.Having(rewrittenExpr)
|
||||
for _, gk := range groupByKeys { // keys that haven't been added via order by keys will be added at the end as tie breakers
|
||||
if _, ok := orderedKeys[gk]; !ok {
|
||||
labelSelectorOrderClauses = append(labelSelectorOrderClauses, fmt.Sprintf("%s ASC", gk))
|
||||
}
|
||||
}
|
||||
} else if metricType == metrictypes.HistogramType && spaceAgg == metrictypes.SpaceAggregationCount && query.Aggregations[0].ComparisonSpaceAggregationParam != nil {
|
||||
sb.Select("ts")
|
||||
labelSelectorSubQueryBuilder.GroupBy(groupByKeys...)
|
||||
labelSelectorSubQueryBuilder.OrderBy(labelSelectorOrderClauses...)
|
||||
labelSelectorSubQuery, _ := labelSelectorSubQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
labelSelectorSubQuery = fmt.Sprintf("%s LIMIT %d", labelSelectorSubQuery, query.Limit)
|
||||
|
||||
for _, g := range query.GroupBy {
|
||||
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
sb.Where(fmt.Sprintf("(%s) IN (%s)", strings.Join(groupByKeys, ", "), labelSelectorSubQuery))
|
||||
for _, o := range query.Order {
|
||||
key := o.Key.Name
|
||||
var clause string
|
||||
if isMetricAggOrderByKey(key, query.Aggregations[0]) {
|
||||
clause = fmt.Sprintf("avg(value) OVER (PARTITION BY %s) %s", strings.Join(groupByKeys, ", "), o.Direction.StringValue())
|
||||
} else {
|
||||
clause = fmt.Sprintf("`%s` %s", key, o.Direction.StringValue())
|
||||
}
|
||||
sb.OrderBy(clause)
|
||||
}
|
||||
|
||||
aggQuery, err := AggregationQueryForHistogramCountWithParams(query.Aggregations[0].ComparisonSpaceAggregationParam)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if hasOrder {
|
||||
// order by without limit: apply order by clauses directly
|
||||
for _, o := range query.Order {
|
||||
key := o.Key.Name
|
||||
if isMetricAggOrderByKey(key, query.Aggregations[0]) {
|
||||
sb.OrderBy(fmt.Sprintf("avg(value) OVER (PARTITION BY %s) %s", strings.Join(groupByKeys, ", "), o.Direction.StringValue()))
|
||||
continue
|
||||
}
|
||||
sb.OrderBy(fmt.Sprintf("`%s` %s", o.Key.Name, o.Direction.StringValue()))
|
||||
}
|
||||
sb.SelectMore(aggQuery)
|
||||
} else if hasLimit {
|
||||
labelSelectorSubQueryBuilder := sqlbuilder.NewSelectBuilder()
|
||||
labelSelectorSubQueryBuilder.Select(groupByKeys...)
|
||||
labelSelectorSubQueryBuilder.From(finalCTE)
|
||||
labelSelectorSubQueryBuilder.GroupBy(groupByKeys...)
|
||||
labelSelectorSubQueryBuilder.OrderBy("avg(value) DESC")
|
||||
labelSelectorSubQuery, _ := labelSelectorSubQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
labelSelectorSubQuery = fmt.Sprintf("%s LIMIT %d", labelSelectorSubQuery, query.Limit)
|
||||
|
||||
sb.From("__spatial_aggregation_cte")
|
||||
|
||||
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
sb.GroupBy("ts")
|
||||
|
||||
if query.Having != nil && query.Having.Expression != "" {
|
||||
rewriter := querybuilder.NewHavingExpressionRewriter()
|
||||
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
|
||||
sb.Having(rewrittenExpr)
|
||||
}
|
||||
sb.Where(fmt.Sprintf("(%s) IN (%s)", strings.Join(groupByKeys, ", "), labelSelectorSubQuery))
|
||||
sb.OrderBy(fmt.Sprintf("avg(value) OVER (PARTITION BY %s) DESC", strings.Join(groupByKeys, ", ")))
|
||||
} else {
|
||||
// for count aggregation on histograms with no params, the exact result of spatial aggregation can be sent forward
|
||||
sb.Select("*")
|
||||
sb.From("__spatial_aggregation_cte")
|
||||
if query.Having != nil && query.Having.Expression != "" {
|
||||
rewriter := querybuilder.NewHavingExpressionRewriter()
|
||||
rewrittenExpr := rewriter.RewriteForMetrics(query.Having.Expression, query.Aggregations)
|
||||
sb.Where(rewrittenExpr)
|
||||
// grouping without order by or limit: sort by avg(value) DESC with labels as tiebreakers
|
||||
sb.OrderBy(fmt.Sprintf("avg(value) OVER (PARTITION BY %s) DESC", strings.Join(groupByKeys, ", ")))
|
||||
}
|
||||
|
||||
// add any group-by keys not already in the order-by as tiebreakers
|
||||
orderKeySet := make(map[string]struct{})
|
||||
for _, o := range query.Order {
|
||||
orderKeySet[fmt.Sprintf("`%s`", o.Key.Name)] = struct{}{}
|
||||
}
|
||||
for _, g := range groupByKeys {
|
||||
if _, exists := orderKeySet[g]; !exists {
|
||||
sb.OrderBy(g)
|
||||
}
|
||||
}
|
||||
sb.OrderBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
sb.OrderBy("ts")
|
||||
|
||||
sb.OrderBy("ts ASC")
|
||||
if metricType == metrictypes.HistogramType && spaceAgg == metrictypes.SpaceAggregationCount && query.Aggregations[0].ComparisonSpaceAggregationParam == nil {
|
||||
sb.OrderBy("toFloat64(le)")
|
||||
}
|
||||
@@ -616,3 +671,45 @@ func (b *MetricQueryStatementBuilder) BuildFinalSelect(
|
||||
q, a := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
return &qbtypes.Statement{Query: combined + q, Args: append(args, a...)}, nil
|
||||
}
|
||||
|
||||
func (b *MetricQueryStatementBuilder) buildHistogramCTE(
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
|
||||
) (string, []any, error) {
|
||||
spaceAgg := query.Aggregations[0].SpaceAggregation
|
||||
histogramCTEQueryBuilder := sqlbuilder.NewSelectBuilder()
|
||||
if spaceAgg.IsPercentile() {
|
||||
histogramCTEQueryBuilder.Select("ts")
|
||||
for _, g := range query.GroupBy {
|
||||
histogramCTEQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
quantile := spaceAgg.Percentile()
|
||||
histogramCTEQueryBuilder.SelectMore(fmt.Sprintf(
|
||||
"histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) AS value",
|
||||
quantile,
|
||||
))
|
||||
histogramCTEQueryBuilder.From("__spatial_aggregation_cte")
|
||||
histogramCTEQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
histogramCTEQueryBuilder.GroupBy("ts")
|
||||
} else if spaceAgg == metrictypes.SpaceAggregationCount && query.Aggregations[0].ComparisonSpaceAggregationParam != nil {
|
||||
histogramCTEQueryBuilder.Select("ts")
|
||||
for _, g := range query.GroupBy {
|
||||
histogramCTEQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
|
||||
}
|
||||
aggQuery, err := AggregationQueryForHistogramCountWithParams(query.Aggregations[0].ComparisonSpaceAggregationParam)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
histogramCTEQueryBuilder.SelectMore(aggQuery)
|
||||
histogramCTEQueryBuilder.From("__spatial_aggregation_cte")
|
||||
histogramCTEQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
|
||||
histogramCTEQueryBuilder.GroupBy("ts")
|
||||
} else {
|
||||
// for count aggregation on histograms with no params, the exact result of spatial aggregation can be sent forward
|
||||
histogramCTEQueryBuilder.Select("*")
|
||||
histogramCTEQueryBuilder.From("__spatial_aggregation_cte")
|
||||
}
|
||||
histogramQueryCTE, histogramQueryCTEArgs := histogramCTEQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
histogramCTE := fmt.Sprintf("__histogram_cte AS (%s)", histogramQueryCTE)
|
||||
|
||||
return histogramCTE, histogramQueryCTEArgs, nil
|
||||
}
|
||||
|
||||
@@ -15,16 +15,17 @@ import (
|
||||
)
|
||||
|
||||
func TestStatementBuilder(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
|
||||
expected qbtypes.Statement
|
||||
expectedErr error
|
||||
}{
|
||||
type baseQuery struct {
|
||||
name string
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
|
||||
orderKey string
|
||||
args []any
|
||||
cte string
|
||||
}
|
||||
|
||||
bases := []baseQuery{
|
||||
{
|
||||
name: "test_cumulative_rate_sum",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
name: "cumulative_rate_sum",
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
@@ -40,24 +41,16 @@ func TestStatementBuilder(t *testing.T) {
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "service.name = 'cartservice'",
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
},
|
||||
},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
||||
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
orderKey: "service.name",
|
||||
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
|
||||
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`)",
|
||||
},
|
||||
{
|
||||
name: "test_cumulative_rate_sum_with_mat_column",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
name: "cumulative_rate_sum_with_mat_column",
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
@@ -73,24 +66,16 @@ func TestStatementBuilder(t *testing.T) {
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "materialized.key.name REGEXP 'cartservice' OR service.name = 'cartservice'",
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
},
|
||||
},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND (match(JSONExtractString(labels, 'materialized.key.name'), ?) OR JSONExtractString(labels, 'service.name') = ?) GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
||||
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
orderKey: "service.name",
|
||||
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "cartservice", "cartservice", "signoz_calls_total", uint64(1747947360000), uint64(1747983420000), 0},
|
||||
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND (match(JSONExtractString(labels, 'materialized.key.name'), ?) OR JSONExtractString(labels, 'service.name') = ?) GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`)",
|
||||
},
|
||||
{
|
||||
name: "test_delta_rate_sum",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
name: "delta_rate_sum",
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
@@ -106,24 +91,16 @@ func TestStatementBuilder(t *testing.T) {
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "service.name = 'cartservice'",
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
},
|
||||
},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name`, ts",
|
||||
Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
orderKey: "service.name",
|
||||
args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947390000), uint64(1747983420000)},
|
||||
cte: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`)",
|
||||
},
|
||||
{
|
||||
name: "test_histogram_percentile1",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
name: "histogram_percentile1",
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
@@ -139,24 +116,38 @@ func TestStatementBuilder(t *testing.T) {
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "service.name = 'cartservice'",
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
},
|
||||
},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name`, ts",
|
||||
Args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_latency", uint64(1747947390000), uint64(1747983420000)},
|
||||
},
|
||||
expectedErr: nil,
|
||||
orderKey: "service.name",
|
||||
args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983420000), "delta", false, "cartservice", "signoz_latency", uint64(1747947390000), uint64(1747983420000)},
|
||||
cte: "WITH __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ts, `service.name`, `le`), __histogram_cte AS (SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts)",
|
||||
},
|
||||
{
|
||||
name: "test_gauge_avg_sum",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
name: "histogram_percentile2",
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "http_server_duration_bucket",
|
||||
Type: metrictypes.HistogramType,
|
||||
Temporality: metrictypes.Cumulative,
|
||||
TimeAggregation: metrictypes.TimeAggregationRate,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationPercentile95,
|
||||
},
|
||||
},
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "service.name"}},
|
||||
},
|
||||
},
|
||||
orderKey: "service.name",
|
||||
args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "http_server_duration_bucket", uint64(1747947360000), uint64(1747983420000), 0},
|
||||
cte: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name`, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`, `le`), __histogram_cte AS (SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts)",
|
||||
},
|
||||
{
|
||||
name: "gauge_avg_sum",
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
@@ -172,53 +163,83 @@ func TestStatementBuilder(t *testing.T) {
|
||||
Filter: &qbtypes.Filter{
|
||||
Expression: "host.name = 'big-data-node-1'",
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "host.name",
|
||||
},
|
||||
},
|
||||
{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "host.name"}},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`) SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name`, ts",
|
||||
Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
},
|
||||
{
|
||||
name: "test_histogram_percentile2",
|
||||
requestType: qbtypes.RequestTypeTimeSeries,
|
||||
query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
|
||||
Signal: telemetrytypes.SignalMetrics,
|
||||
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
|
||||
Aggregations: []qbtypes.MetricAggregation{
|
||||
{
|
||||
MetricName: "http_server_duration_bucket",
|
||||
Type: metrictypes.HistogramType,
|
||||
Temporality: metrictypes.Cumulative,
|
||||
TimeAggregation: metrictypes.TimeAggregationRate,
|
||||
SpaceAggregation: metrictypes.SpaceAggregationPercentile95,
|
||||
},
|
||||
},
|
||||
Limit: 10,
|
||||
GroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, `service.name`, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `service.name`, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `service.name`, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `service.name`, `le`) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY `service.name`, ts ORDER BY `service.name`, ts",
|
||||
Args: []any{"http_server_duration_bucket", uint64(1747936800000), uint64(1747983420000), "cumulative", false, "http_server_duration_bucket", uint64(1747947360000), uint64(1747983420000), 0},
|
||||
},
|
||||
expectedErr: nil,
|
||||
orderKey: "host.name",
|
||||
args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983420000), "unspecified", false, "big-data-node-1", "system.memory.usage", uint64(1747947390000), uint64(1747983420000), 0},
|
||||
cte: "WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY fingerprint, `host.name`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `host.name` ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `host.name`)",
|
||||
},
|
||||
}
|
||||
|
||||
type variant struct {
|
||||
name string
|
||||
limit int
|
||||
hasOrder bool
|
||||
}
|
||||
|
||||
variants := []variant{
|
||||
{"with_limits", 10, false},
|
||||
{"without_limits", 0, false},
|
||||
{"with_order_by", 0, true},
|
||||
{"with_order_by_and_limits", 10, true},
|
||||
}
|
||||
|
||||
sumMetricsFinalSelects := map[string]string{
|
||||
"with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
"without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
"with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `service.name` asc, ts ASC",
|
||||
"with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __spatial_aggregation_cte GROUP BY `service.name` ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
|
||||
}
|
||||
|
||||
histogramMetricsFinalSelects := map[string]string{
|
||||
"with_limits": " SELECT * FROM __histogram_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __histogram_cte GROUP BY `service.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
"without_limits": " SELECT * FROM __histogram_cte ORDER BY avg(value) OVER (PARTITION BY `service.name`) DESC, `service.name`, ts ASC",
|
||||
"with_order_by": " SELECT * FROM __histogram_cte ORDER BY `service.name` asc, ts ASC",
|
||||
"with_order_by_and_limits": " SELECT * FROM __histogram_cte WHERE (`service.name`) IN (SELECT `service.name` FROM __histogram_cte GROUP BY `service.name` ORDER BY `service.name` asc LIMIT 10) ORDER BY `service.name` asc, ts ASC",
|
||||
}
|
||||
|
||||
// expectedFinalSelects maps "base/variant" to the final SELECT portion after the CTE.
|
||||
// The full expected query is: base.cte + expectedFinalSelects[name]
|
||||
expectedFinalSelects := map[string]string{
|
||||
// cumulative_rate_sum
|
||||
"cumulative_rate_sum/with_limits": sumMetricsFinalSelects["with_limits"],
|
||||
"cumulative_rate_sum/without_limits": sumMetricsFinalSelects["without_limits"],
|
||||
"cumulative_rate_sum/with_order_by": sumMetricsFinalSelects["with_order_by"],
|
||||
"cumulative_rate_sum/with_order_by_and_limits": sumMetricsFinalSelects["with_order_by_and_limits"],
|
||||
|
||||
// cumulative_rate_sum_with_mat_column
|
||||
"cumulative_rate_sum_with_mat_column/with_limits": sumMetricsFinalSelects["with_limits"],
|
||||
"cumulative_rate_sum_with_mat_column/without_limits": sumMetricsFinalSelects["without_limits"],
|
||||
"cumulative_rate_sum_with_mat_column/with_order_by": sumMetricsFinalSelects["with_order_by"],
|
||||
"cumulative_rate_sum_with_mat_column/with_order_by_and_limits": sumMetricsFinalSelects["with_order_by_and_limits"],
|
||||
|
||||
// delta_rate_sum
|
||||
"delta_rate_sum/with_limits": sumMetricsFinalSelects["with_limits"],
|
||||
"delta_rate_sum/without_limits": sumMetricsFinalSelects["without_limits"],
|
||||
"delta_rate_sum/with_order_by": sumMetricsFinalSelects["with_order_by"],
|
||||
"delta_rate_sum/with_order_by_and_limits": sumMetricsFinalSelects["with_order_by_and_limits"],
|
||||
|
||||
// histogram_percentile1
|
||||
"histogram_percentile1/with_limits": histogramMetricsFinalSelects["with_limits"],
|
||||
"histogram_percentile1/without_limits": histogramMetricsFinalSelects["without_limits"],
|
||||
"histogram_percentile1/with_order_by": histogramMetricsFinalSelects["with_order_by"],
|
||||
"histogram_percentile1/with_order_by_and_limits": histogramMetricsFinalSelects["with_order_by_and_limits"],
|
||||
|
||||
// histogram_percentile2
|
||||
"histogram_percentile2/with_limits": histogramMetricsFinalSelects["with_limits"],
|
||||
"histogram_percentile2/without_limits": histogramMetricsFinalSelects["without_limits"],
|
||||
"histogram_percentile2/with_order_by": histogramMetricsFinalSelects["with_order_by"],
|
||||
"histogram_percentile2/with_order_by_and_limits": histogramMetricsFinalSelects["with_order_by_and_limits"],
|
||||
|
||||
// gauge_avg_sum
|
||||
"gauge_avg_sum/with_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT `host.name` FROM __spatial_aggregation_cte GROUP BY `host.name` ORDER BY avg(value) DESC LIMIT 10) ORDER BY avg(value) OVER (PARTITION BY `host.name`) DESC, `host.name`, ts ASC",
|
||||
"gauge_avg_sum/without_limits": " SELECT * FROM __spatial_aggregation_cte ORDER BY avg(value) OVER (PARTITION BY `host.name`) DESC, `host.name`, ts ASC",
|
||||
"gauge_avg_sum/with_order_by": " SELECT * FROM __spatial_aggregation_cte ORDER BY `host.name` asc, ts ASC",
|
||||
"gauge_avg_sum/with_order_by_and_limits": " SELECT * FROM __spatial_aggregation_cte WHERE (`host.name`) IN (SELECT `host.name` FROM __spatial_aggregation_cte GROUP BY `host.name` ORDER BY `host.name` asc LIMIT 10) ORDER BY `host.name` asc, ts ASC",
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
@@ -227,15 +248,13 @@ func TestStatementBuilder(t *testing.T) {
|
||||
t.Fatalf("failed to load field keys: %v", err)
|
||||
}
|
||||
mockMetadataStore.KeysMap = keys
|
||||
// NOTE: LoadFieldKeysFromJSON doesn't set Materialized field
|
||||
// for keys, so we have to set it manually here for testing
|
||||
if _, ok := mockMetadataStore.KeysMap["materialized.key.name"]; ok {
|
||||
if len(mockMetadataStore.KeysMap["materialized.key.name"]) > 0 {
|
||||
mockMetadataStore.KeysMap["materialized.key.name"][0].Materialized = true
|
||||
}
|
||||
}
|
||||
|
||||
flagger, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
|
||||
fl, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create flagger: %v", err)
|
||||
}
|
||||
@@ -245,23 +264,30 @@ func TestStatementBuilder(t *testing.T) {
|
||||
mockMetadataStore,
|
||||
fm,
|
||||
cb,
|
||||
flagger,
|
||||
fl,
|
||||
)
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
for _, b := range bases {
|
||||
for _, v := range variants {
|
||||
name := b.name + "/" + v.name
|
||||
t.Run(name, func(t *testing.T) {
|
||||
q := b.query
|
||||
q.Limit = v.limit
|
||||
if v.hasOrder {
|
||||
q.Order = []qbtypes.OrderBy{
|
||||
{
|
||||
Key: qbtypes.OrderByKey{TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: b.orderKey}},
|
||||
Direction: qbtypes.OrderDirectionAsc,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
q, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, c.requestType, c.query, nil)
|
||||
result, err := statementBuilder.Build(context.Background(), 1747947419000, 1747983448000, qbtypes.RequestTypeTimeSeries, q, nil)
|
||||
|
||||
if c.expectedErr != nil {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), c.expectedErr.Error())
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.expected.Query, q.Query)
|
||||
require.Equal(t, c.expected.Args, q.Args)
|
||||
require.Equal(t, c.expected.Warnings, q.Warnings)
|
||||
}
|
||||
})
|
||||
require.Equal(t, b.cte+expectedFinalSelects[name], result.Query)
|
||||
require.Equal(t, b.args, result.Args)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,43 +0,0 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Account struct {
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
ProviderAccountId *string `json:"providerAccountID,omitempty"`
|
||||
Provider CloudProviderType `json:"provider"`
|
||||
RemovedAt *time.Time `json:"removedAt,omitempty"`
|
||||
AgentReport *AgentReport `json:"agentReport,omitempty"`
|
||||
OrgID valuer.UUID `json:"orgID"`
|
||||
Config *AccountConfig `json:"config,omitempty"`
|
||||
}
|
||||
|
||||
// AgentReport represents heartbeats sent by the agent.
|
||||
type AgentReport struct {
|
||||
TimestampMillis int64 `json:"timestampMillis"`
|
||||
Data map[string]any `json:"data"`
|
||||
}
|
||||
|
||||
type GettableAccounts struct {
|
||||
Accounts []*Account `json:"accounts"`
|
||||
}
|
||||
|
||||
type GettableAccount = Account
|
||||
|
||||
type UpdatableAccount struct {
|
||||
Config *AccountConfig `json:"config"`
|
||||
}
|
||||
|
||||
type AccountConfig struct {
|
||||
AWS *AWSAccountConfig `json:"aws,omitempty"`
|
||||
}
|
||||
|
||||
type AWSAccountConfig struct {
|
||||
Regions []string `json:"regions"`
|
||||
}
|
||||
@@ -1,80 +0,0 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/uptrace/bun"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeCloudIntegrationNotFound = errors.MustNewCode("cloud_integration_not_found")
|
||||
)
|
||||
|
||||
// StorableCloudIntegration represents a cloud integration stored in the database.
|
||||
// This is also referred as "Account" in the context of cloud integrations.
|
||||
type StorableCloudIntegration struct {
|
||||
bun.BaseModel `bun:"table:cloud_integration"`
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
|
||||
Provider CloudProviderType `bun:"provider,type:text"`
|
||||
Config string `bun:"config,type:text"` // Config is provider-specific data in JSON string format
|
||||
AccountID *string `bun:"account_id,type:text"`
|
||||
LastAgentReport *StorableAgentReport `bun:"last_agent_report,type:text"`
|
||||
RemovedAt *time.Time `bun:"removed_at,type:timestamp,nullzero"`
|
||||
OrgID valuer.UUID `bun:"org_id,type:text"`
|
||||
}
|
||||
|
||||
// StorableAgentReport represents the last heartbeat and arbitrary data sent by the agent
|
||||
// as of now there is no use case for Data field, but keeping it for backwards compatibility with older structure.
|
||||
type StorableAgentReport struct {
|
||||
TimestampMillis int64 `json:"timestamp_millis"` // backward compatibility
|
||||
Data map[string]any `json:"data"`
|
||||
}
|
||||
|
||||
// StorableCloudIntegrationService is to store service config for a cloud integration, which is a cloud provider specific configuration.
|
||||
type StorableCloudIntegrationService struct {
|
||||
bun.BaseModel `bun:"table:cloud_integration_service,alias:cis"`
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
|
||||
Type ServiceID `bun:"type,type:text,notnull"` // Keeping Type field name as is, but it is a service id
|
||||
Config string `bun:"config,type:text"` // Config is cloud provider's service specific data in JSON string format
|
||||
CloudIntegrationID valuer.UUID `bun:"cloud_integration_id,type:text"`
|
||||
}
|
||||
|
||||
// Scan scans value from DB.
|
||||
func (r *StorableAgentReport) Scan(src any) error {
|
||||
var data []byte
|
||||
switch v := src.(type) {
|
||||
case []byte:
|
||||
data = v
|
||||
case string:
|
||||
data = []byte(v)
|
||||
default:
|
||||
return errors.NewInternalf(errors.CodeInternal, "tried to scan from %T instead of string or bytes", src)
|
||||
}
|
||||
return json.Unmarshal(data, r)
|
||||
}
|
||||
|
||||
// Value creates value to be stored in DB.
|
||||
func (r *StorableAgentReport) Value() (driver.Value, error) {
|
||||
if r == nil {
|
||||
return nil, errors.NewInternalf(errors.CodeInternal, "agent report is nil")
|
||||
}
|
||||
|
||||
serialized, err := json.Marshal(r)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(
|
||||
err, errors.CodeInternal, "couldn't serialize agent report to JSON",
|
||||
)
|
||||
}
|
||||
// Return as string instead of []byte to ensure PostgreSQL stores as text, not bytes
|
||||
return string(serialized), nil
|
||||
}
|
||||
@@ -1,41 +0,0 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
// CloudProviderType type alias.
|
||||
type CloudProviderType struct{ valuer.String }
|
||||
|
||||
var (
|
||||
// cloud providers.
|
||||
CloudProviderTypeAWS = CloudProviderType{valuer.NewString("aws")}
|
||||
CloudProviderTypeAzure = CloudProviderType{valuer.NewString("azure")}
|
||||
|
||||
// errors.
|
||||
ErrCodeCloudProviderInvalidInput = errors.MustNewCode("invalid_cloud_provider")
|
||||
|
||||
AWSIntegrationUserEmail = valuer.MustNewEmail("aws-integration@signoz.io")
|
||||
AzureIntegrationUserEmail = valuer.MustNewEmail("azure-integration@signoz.io")
|
||||
)
|
||||
|
||||
// CloudIntegrationUserEmails is the list of valid emails for Cloud One Click integrations.
|
||||
// This is used for validation and restrictions in different contexts, across codebase.
|
||||
var CloudIntegrationUserEmails = []valuer.Email{
|
||||
AWSIntegrationUserEmail,
|
||||
AzureIntegrationUserEmail,
|
||||
}
|
||||
|
||||
// NewCloudProvider returns a new CloudProviderType from a string.
|
||||
// It validates the input and returns an error if the input is not valid cloud provider.
|
||||
func NewCloudProvider(provider string) (CloudProviderType, error) {
|
||||
switch provider {
|
||||
case CloudProviderTypeAWS.StringValue():
|
||||
return CloudProviderTypeAWS, nil
|
||||
case CloudProviderTypeAzure.StringValue():
|
||||
return CloudProviderTypeAzure, nil
|
||||
default:
|
||||
return CloudProviderType{}, errors.NewInvalidInputf(ErrCodeCloudProviderInvalidInput, "invalid cloud provider: %s", provider)
|
||||
}
|
||||
}
|
||||
@@ -1,88 +0,0 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import "github.com/SigNoz/signoz/pkg/types/integrationtypes"
|
||||
|
||||
type ConnectionArtifactRequest struct {
|
||||
Aws *AWSConnectionArtifactRequest `json:"aws"`
|
||||
}
|
||||
|
||||
type AWSConnectionArtifactRequest struct {
|
||||
DeploymentRegion string `json:"deploymentRegion"`
|
||||
Regions []string `json:"regions"`
|
||||
}
|
||||
|
||||
type PostableConnectionArtifact = ConnectionArtifactRequest
|
||||
|
||||
type ConnectionArtifact struct {
|
||||
Aws *AWSConnectionArtifact `json:"aws"`
|
||||
}
|
||||
|
||||
type AWSConnectionArtifact struct {
|
||||
ConnectionUrl string `json:"connectionURL"`
|
||||
}
|
||||
|
||||
type GettableConnectionArtifact = ConnectionArtifact
|
||||
|
||||
type AccountStatus struct {
|
||||
Id string `json:"id"`
|
||||
ProviderAccountId *string `json:"providerAccountID,omitempty"`
|
||||
Status integrationtypes.AccountStatus `json:"status"`
|
||||
}
|
||||
|
||||
type GettableAccountStatus = AccountStatus
|
||||
|
||||
type AgentCheckInRequest struct {
|
||||
// older backward compatible fields are mapped to new fields
|
||||
// CloudIntegrationId string `json:"cloudIntegrationId"`
|
||||
// AccountId string `json:"accountId"`
|
||||
|
||||
// New fields
|
||||
ProviderAccountId string `json:"providerAccountId"`
|
||||
CloudAccountId string `json:"cloudAccountId"`
|
||||
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
}
|
||||
|
||||
type PostableAgentCheckInRequest struct {
|
||||
AgentCheckInRequest
|
||||
// following are backward compatible fields for older running agents
|
||||
// which gets mapped to new fields in AgentCheckInRequest
|
||||
CloudIntegrationId string `json:"cloud_integration_id"`
|
||||
CloudAccountId string `json:"cloud_account_id"`
|
||||
}
|
||||
|
||||
type GettableAgentCheckInResponse struct {
|
||||
AgentCheckInResponse
|
||||
|
||||
// For backward compatibility
|
||||
CloudIntegrationId string `json:"cloud_integration_id"`
|
||||
AccountId string `json:"account_id"`
|
||||
}
|
||||
|
||||
type AgentCheckInResponse struct {
|
||||
// Older fields for backward compatibility are mapped to new fields below
|
||||
// CloudIntegrationId string `json:"cloud_integration_id"`
|
||||
// AccountId string `json:"account_id"`
|
||||
|
||||
// New fields
|
||||
ProviderAccountId string `json:"providerAccountId"`
|
||||
CloudAccountId string `json:"cloudAccountId"`
|
||||
|
||||
// IntegrationConfig populates data related to integration that is required for an agent
|
||||
// to start collecting telemetry data
|
||||
// keeping JSON key snake_case for backward compatibility
|
||||
IntegrationConfig *IntegrationConfig `json:"integration_config,omitempty"`
|
||||
}
|
||||
|
||||
type IntegrationConfig struct {
|
||||
EnabledRegions []string `json:"enabledRegions"` // backward compatible
|
||||
Telemetry *AWSCollectionStrategy `json:"telemetry,omitempty"` // backward compatible
|
||||
|
||||
// new fields
|
||||
AWS *AWSIntegrationConfig `json:"aws,omitempty"`
|
||||
}
|
||||
|
||||
type AWSIntegrationConfig struct {
|
||||
EnabledRegions []string `json:"enabledRegions"`
|
||||
Telemetry *AWSCollectionStrategy `json:"telemetry,omitempty"`
|
||||
}
|
||||
@@ -1,103 +0,0 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeInvalidCloudRegion = errors.MustNewCode("invalid_cloud_region")
|
||||
ErrCodeMismatchCloudProvider = errors.MustNewCode("cloud_provider_mismatch")
|
||||
)
|
||||
|
||||
// List of all valid cloud regions on Amazon Web Services.
|
||||
var ValidAWSRegions = map[string]struct{}{
|
||||
"af-south-1": {}, // Africa (Cape Town).
|
||||
"ap-east-1": {}, // Asia Pacific (Hong Kong).
|
||||
"ap-northeast-1": {}, // Asia Pacific (Tokyo).
|
||||
"ap-northeast-2": {}, // Asia Pacific (Seoul).
|
||||
"ap-northeast-3": {}, // Asia Pacific (Osaka).
|
||||
"ap-south-1": {}, // Asia Pacific (Mumbai).
|
||||
"ap-south-2": {}, // Asia Pacific (Hyderabad).
|
||||
"ap-southeast-1": {}, // Asia Pacific (Singapore).
|
||||
"ap-southeast-2": {}, // Asia Pacific (Sydney).
|
||||
"ap-southeast-3": {}, // Asia Pacific (Jakarta).
|
||||
"ap-southeast-4": {}, // Asia Pacific (Melbourne).
|
||||
"ca-central-1": {}, // Canada (Central).
|
||||
"ca-west-1": {}, // Canada West (Calgary).
|
||||
"eu-central-1": {}, // Europe (Frankfurt).
|
||||
"eu-central-2": {}, // Europe (Zurich).
|
||||
"eu-north-1": {}, // Europe (Stockholm).
|
||||
"eu-south-1": {}, // Europe (Milan).
|
||||
"eu-south-2": {}, // Europe (Spain).
|
||||
"eu-west-1": {}, // Europe (Ireland).
|
||||
"eu-west-2": {}, // Europe (London).
|
||||
"eu-west-3": {}, // Europe (Paris).
|
||||
"il-central-1": {}, // Israel (Tel Aviv).
|
||||
"me-central-1": {}, // Middle East (UAE).
|
||||
"me-south-1": {}, // Middle East (Bahrain).
|
||||
"sa-east-1": {}, // South America (Sao Paulo).
|
||||
"us-east-1": {}, // US East (N. Virginia).
|
||||
"us-east-2": {}, // US East (Ohio).
|
||||
"us-west-1": {}, // US West (N. California).
|
||||
"us-west-2": {}, // US West (Oregon).
|
||||
}
|
||||
|
||||
// List of all valid cloud regions for Microsoft Azure.
|
||||
var ValidAzureRegions = map[string]struct{}{
|
||||
"australiacentral": {}, // Australia Central
|
||||
"australiacentral2": {}, // Australia Central 2
|
||||
"australiaeast": {}, // Australia East
|
||||
"australiasoutheast": {}, // Australia Southeast
|
||||
"austriaeast": {}, // Austria East
|
||||
"belgiumcentral": {}, // Belgium Central
|
||||
"brazilsouth": {}, // Brazil South
|
||||
"brazilsoutheast": {}, // Brazil Southeast
|
||||
"canadacentral": {}, // Canada Central
|
||||
"canadaeast": {}, // Canada East
|
||||
"centralindia": {}, // Central India
|
||||
"centralus": {}, // Central US
|
||||
"chilecentral": {}, // Chile Central
|
||||
"denmarkeast": {}, // Denmark East
|
||||
"eastasia": {}, // East Asia
|
||||
"eastus": {}, // East US
|
||||
"eastus2": {}, // East US 2
|
||||
"francecentral": {}, // France Central
|
||||
"francesouth": {}, // France South
|
||||
"germanynorth": {}, // Germany North
|
||||
"germanywestcentral": {}, // Germany West Central
|
||||
"indonesiacentral": {}, // Indonesia Central
|
||||
"israelcentral": {}, // Israel Central
|
||||
"italynorth": {}, // Italy North
|
||||
"japaneast": {}, // Japan East
|
||||
"japanwest": {}, // Japan West
|
||||
"koreacentral": {}, // Korea Central
|
||||
"koreasouth": {}, // Korea South
|
||||
"malaysiawest": {}, // Malaysia West
|
||||
"mexicocentral": {}, // Mexico Central
|
||||
"newzealandnorth": {}, // New Zealand North
|
||||
"northcentralus": {}, // North Central US
|
||||
"northeurope": {}, // North Europe
|
||||
"norwayeast": {}, // Norway East
|
||||
"norwaywest": {}, // Norway West
|
||||
"polandcentral": {}, // Poland Central
|
||||
"qatarcentral": {}, // Qatar Central
|
||||
"southafricanorth": {}, // South Africa North
|
||||
"southafricawest": {}, // South Africa West
|
||||
"southcentralus": {}, // South Central US
|
||||
"southindia": {}, // South India
|
||||
"southeastasia": {}, // Southeast Asia
|
||||
"spaincentral": {}, // Spain Central
|
||||
"swedencentral": {}, // Sweden Central
|
||||
"switzerlandnorth": {}, // Switzerland North
|
||||
"switzerlandwest": {}, // Switzerland West
|
||||
"uaecentral": {}, // UAE Central
|
||||
"uaenorth": {}, // UAE North
|
||||
"uksouth": {}, // UK South
|
||||
"ukwest": {}, // UK West
|
||||
"westcentralus": {}, // West Central US
|
||||
"westeurope": {}, // West Europe
|
||||
"westindia": {}, // West India
|
||||
"westus": {}, // West US
|
||||
"westus2": {}, // West US 2
|
||||
"westus3": {}, // West US 3
|
||||
}
|
||||
@@ -1,248 +0,0 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
var (
|
||||
S3Sync = valuer.NewString("s3sync")
|
||||
// ErrCodeInvalidServiceID is the error code for invalid service id.
|
||||
ErrCodeInvalidServiceID = errors.MustNewCode("invalid_service_id")
|
||||
)
|
||||
|
||||
type ServiceID struct{ valuer.String }
|
||||
|
||||
type CloudIntegrationService struct {
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
Type ServiceID `json:"type"`
|
||||
Config *ServiceConfig `json:"config"`
|
||||
CloudIntegrationID valuer.UUID `json:"cloudIntegrationID"`
|
||||
}
|
||||
|
||||
// ServiceMetadata helps to quickly list available services and whether it is enabled or not.
|
||||
// As getting complete service definition is a heavy operation and the response is also large,
|
||||
// initial integration page load can be very slow.
|
||||
type ServiceMetadata struct {
|
||||
ServiceDefinitionMetadata
|
||||
// if the service is enabled for the account
|
||||
Enabled bool `json:"enabled"`
|
||||
}
|
||||
|
||||
type GettableServicesMetadata struct {
|
||||
Services []*ServiceMetadata `json:"services"`
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
ServiceDefinition
|
||||
ServiceConfig *ServiceConfig `json:"serviceConfig"`
|
||||
}
|
||||
|
||||
type GettableService = Service
|
||||
|
||||
type UpdatableService struct {
|
||||
Config *ServiceConfig `json:"config"`
|
||||
}
|
||||
|
||||
type ServiceConfig struct {
|
||||
AWS *AWSServiceConfig `json:"aws,omitempty"`
|
||||
}
|
||||
|
||||
type AWSServiceConfig struct {
|
||||
Logs *AWSServiceLogsConfig `json:"logs"`
|
||||
Metrics *AWSServiceMetricsConfig `json:"metrics"`
|
||||
}
|
||||
|
||||
// AWSServiceLogsConfig is AWS specific logs config for a service
|
||||
// NOTE: the JSON keys are snake case for backward compatibility with existing agents.
|
||||
type AWSServiceLogsConfig struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
S3Buckets map[string][]string `json:"s3_buckets,omitempty"`
|
||||
}
|
||||
|
||||
type AWSServiceMetricsConfig struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
}
|
||||
|
||||
// ServiceDefinitionMetadata represents service definition metadata. This is useful for showing service tab in frontend.
|
||||
type ServiceDefinitionMetadata struct {
|
||||
Id string `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Icon string `json:"icon"`
|
||||
}
|
||||
|
||||
type ServiceDefinition struct {
|
||||
ServiceDefinitionMetadata
|
||||
Overview string `json:"overview"` // markdown
|
||||
Assets Assets `json:"assets"`
|
||||
SupportedSignals SupportedSignals `json:"supported_signals"`
|
||||
DataCollected DataCollected `json:"dataCollected"`
|
||||
Strategy *CollectionStrategy `json:"telemetryCollectionStrategy"`
|
||||
}
|
||||
|
||||
// CollectionStrategy is cloud provider specific configuration for signal collection,
|
||||
// this is used by agent to understand the nitty-gritty for collecting telemetry for the cloud provider.
|
||||
type CollectionStrategy struct {
|
||||
AWS *AWSCollectionStrategy `json:"aws,omitempty"`
|
||||
}
|
||||
|
||||
// Assets represents the collection of dashboards.
|
||||
type Assets struct {
|
||||
Dashboards []Dashboard `json:"dashboards"`
|
||||
}
|
||||
|
||||
// SupportedSignals for cloud provider's service.
|
||||
type SupportedSignals struct {
|
||||
Logs bool `json:"logs"`
|
||||
Metrics bool `json:"metrics"`
|
||||
}
|
||||
|
||||
// DataCollected is curated static list of metrics and logs, this is shown as part of service overview.
|
||||
type DataCollected struct {
|
||||
Logs []CollectedLogAttribute `json:"logs"`
|
||||
Metrics []CollectedMetric `json:"metrics"`
|
||||
}
|
||||
|
||||
// CollectedLogAttribute represents a log attribute that is present in all log entries for a service,
|
||||
// this is shown as part of service overview.
|
||||
type CollectedLogAttribute struct {
|
||||
Name string `json:"name"`
|
||||
Path string `json:"path"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
// CollectedMetric represents a metric that is collected for a service, this is shown as part of service overview.
|
||||
type CollectedMetric struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Unit string `json:"unit"`
|
||||
Description string `json:"description"`
|
||||
}
|
||||
|
||||
// AWSCollectionStrategy represents signal collection strategy for AWS services.
|
||||
// this is AWS specific.
|
||||
// NOTE: this structure is still using snake case, for backward compatibility,
|
||||
// with existing agents.
|
||||
type AWSCollectionStrategy struct {
|
||||
Metrics *AWSMetricsStrategy `json:"aws_metrics,omitempty"`
|
||||
Logs *AWSLogsStrategy `json:"aws_logs,omitempty"`
|
||||
S3Buckets map[string][]string `json:"s3_buckets,omitempty"` // Only available in S3 Sync Service Type in AWS
|
||||
}
|
||||
|
||||
// AWSMetricsStrategy represents metrics collection strategy for AWS services.
|
||||
// this is AWS specific.
|
||||
// NOTE: this structure is still using snake case, for backward compatibility,
|
||||
// with existing agents.
|
||||
type AWSMetricsStrategy struct {
|
||||
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
|
||||
StreamFilters []struct {
|
||||
// json tags here are in the shape expected by AWS API as detailed at
|
||||
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
|
||||
Namespace string `json:"Namespace"`
|
||||
MetricNames []string `json:"MetricNames,omitempty"`
|
||||
} `json:"cloudwatch_metric_stream_filters"`
|
||||
}
|
||||
|
||||
// AWSLogsStrategy represents logs collection strategy for AWS services.
|
||||
// this is AWS specific.
|
||||
// NOTE: this structure is still using snake case, for backward compatibility,
|
||||
// with existing agents.
|
||||
type AWSLogsStrategy struct {
|
||||
Subscriptions []struct {
|
||||
// subscribe to all logs groups with specified prefix.
|
||||
// eg: `/aws/rds/`
|
||||
LogGroupNamePrefix string `json:"log_group_name_prefix"`
|
||||
|
||||
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
|
||||
// "" implies no filtering is required.
|
||||
FilterPattern string `json:"filter_pattern"`
|
||||
} `json:"cloudwatch_logs_subscriptions"`
|
||||
}
|
||||
|
||||
// Dashboard represents a dashboard definition for cloud integration.
|
||||
// This is used to show available pre-made dashboards for a service,
|
||||
// hence has additional fields like id, title and description
|
||||
type Dashboard struct {
|
||||
Id string `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Description string `json:"description"`
|
||||
Definition dashboardtypes.StorableDashboardData `json:"definition,omitempty"`
|
||||
}
|
||||
|
||||
// SupportedServices is the map of supported services for each cloud provider.
|
||||
var SupportedServices = map[CloudProviderType][]ServiceID{
|
||||
CloudProviderTypeAWS: {
|
||||
{valuer.NewString("alb")},
|
||||
{valuer.NewString("api-gateway")},
|
||||
{valuer.NewString("dynamodb")},
|
||||
{valuer.NewString("ec2")},
|
||||
{valuer.NewString("ecs")},
|
||||
{valuer.NewString("eks")},
|
||||
{valuer.NewString("elasticache")},
|
||||
{valuer.NewString("lambda")},
|
||||
{valuer.NewString("msk")},
|
||||
{valuer.NewString("rds")},
|
||||
{valuer.NewString("s3sync")},
|
||||
{valuer.NewString("sns")},
|
||||
{valuer.NewString("sqs")},
|
||||
},
|
||||
}
|
||||
|
||||
// NewServiceID returns a new ServiceID from a string, validated against the supported services for the given cloud provider.
|
||||
func NewServiceID(provider CloudProviderType, service string) (ServiceID, error) {
|
||||
services, ok := SupportedServices[provider]
|
||||
if !ok {
|
||||
return ServiceID{}, errors.NewInvalidInputf(ErrCodeInvalidServiceID, "no services defined for cloud provider: %s", provider)
|
||||
}
|
||||
for _, s := range services {
|
||||
if s.StringValue() == service {
|
||||
return s, nil
|
||||
}
|
||||
}
|
||||
return ServiceID{}, errors.NewInvalidInputf(ErrCodeInvalidServiceID, "invalid service id %q for cloud provider %s", service, provider)
|
||||
}
|
||||
|
||||
// UTILS
|
||||
|
||||
// GetCloudIntegrationDashboardID returns the dashboard id for a cloud integration, given the cloud provider, service id, and dashboard id.
|
||||
// This is used to generate unique dashboard ids for cloud integration, and also to parse the dashboard id to get the cloud provider and service id when needed.
|
||||
func GetCloudIntegrationDashboardID(cloudProvider CloudProviderType, svcId, dashboardId string) string {
|
||||
return fmt.Sprintf("cloud-integration--%s--%s--%s", cloudProvider, svcId, dashboardId)
|
||||
}
|
||||
|
||||
// GetDashboardsFromAssets returns the list of dashboards for the cloud provider service from definition.
|
||||
func GetDashboardsFromAssets(
|
||||
svcId string,
|
||||
orgID valuer.UUID,
|
||||
cloudProvider CloudProviderType,
|
||||
createdAt time.Time,
|
||||
assets Assets,
|
||||
) []*dashboardtypes.Dashboard {
|
||||
dashboards := make([]*dashboardtypes.Dashboard, 0)
|
||||
|
||||
for _, d := range assets.Dashboards {
|
||||
author := fmt.Sprintf("%s-integration", cloudProvider)
|
||||
dashboards = append(dashboards, &dashboardtypes.Dashboard{
|
||||
ID: GetCloudIntegrationDashboardID(cloudProvider, svcId, d.Id),
|
||||
Locked: true,
|
||||
OrgID: orgID,
|
||||
Data: d.Definition,
|
||||
TimeAuditable: types.TimeAuditable{
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: createdAt,
|
||||
},
|
||||
UserAuditable: types.UserAuditable{
|
||||
CreatedBy: author,
|
||||
UpdatedBy: author,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return dashboards
|
||||
}
|
||||
@@ -1,41 +0,0 @@
|
||||
package cloudintegrationtypes
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type Store interface {
|
||||
// GetAccountByID returns a cloud integration account by id
|
||||
GetAccountByID(ctx context.Context, orgID, id valuer.UUID, provider CloudProviderType) (*StorableCloudIntegration, error)
|
||||
|
||||
// CreateAccount creates a new cloud integration account
|
||||
CreateAccount(ctx context.Context, account *StorableCloudIntegration) (*StorableCloudIntegration, error)
|
||||
|
||||
// UpdateAccount updates an existing cloud integration account
|
||||
UpdateAccount(ctx context.Context, account *StorableCloudIntegration) error
|
||||
|
||||
// RemoveAccount marks a cloud integration account as removed by setting the RemovedAt field
|
||||
RemoveAccount(ctx context.Context, orgID, id valuer.UUID, provider CloudProviderType) error
|
||||
|
||||
// ListConnectedAccounts returns all the cloud integration accounts for the org and cloud provider
|
||||
ListConnectedAccounts(ctx context.Context, orgID valuer.UUID, provider CloudProviderType) ([]*StorableCloudIntegration, error)
|
||||
|
||||
// GetConnectedAccount for a given provider
|
||||
GetConnectedAccount(ctx context.Context, orgID valuer.UUID, provider CloudProviderType, providerAccountID string) (*StorableCloudIntegration, error)
|
||||
|
||||
// cloud_integration_service related methods
|
||||
|
||||
// GetServiceByServiceID returns the cloud integration service for the given cloud integration id and service id
|
||||
GetServiceByServiceID(ctx context.Context, cloudIntegrationID valuer.UUID, serviceID ServiceID) (*StorableCloudIntegrationService, error)
|
||||
|
||||
// CreateService creates a new cloud integration service
|
||||
CreateService(ctx context.Context, service *StorableCloudIntegrationService) (*StorableCloudIntegrationService, error)
|
||||
|
||||
// UpdateService updates an existing cloud integration service
|
||||
UpdateService(ctx context.Context, service *StorableCloudIntegrationService) error
|
||||
|
||||
// ListServices returns all the cloud integration services for the given cloud integration id
|
||||
ListServices(ctx context.Context, cloudIntegrationID valuer.UUID) ([]*StorableCloudIntegrationService, error)
|
||||
}
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/open-telemetry/opamp-go/protobufs"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
@@ -18,15 +17,6 @@ const (
|
||||
AgentStatusDisconnected
|
||||
)
|
||||
|
||||
var DeployStatusToProtoStatus = map[DeployStatus]protobufs.RemoteConfigStatuses{
|
||||
PendingDeploy: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
Deploying: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
|
||||
Deployed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
|
||||
DeployInitiated: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
|
||||
DeployFailed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
|
||||
DeployStatusUnknown: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
|
||||
}
|
||||
|
||||
type StorableAgent struct {
|
||||
bun.BaseModel `bun:"table:agent"`
|
||||
|
||||
@@ -40,6 +30,16 @@ type StorableAgent struct {
|
||||
Config string `bun:"config,type:text,notnull"`
|
||||
}
|
||||
|
||||
func NewStorableAgent(store sqlstore.SQLStore, orgID valuer.UUID, agentID string, status AgentStatus) StorableAgent {
|
||||
return StorableAgent{
|
||||
OrgID: orgID,
|
||||
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
|
||||
AgentID: agentID,
|
||||
TimeAuditable: types.TimeAuditable{CreatedAt: time.Now(), UpdatedAt: time.Now()},
|
||||
Status: status,
|
||||
}
|
||||
}
|
||||
|
||||
type ElementType struct{ valuer.String }
|
||||
|
||||
var (
|
||||
@@ -49,6 +49,24 @@ var (
|
||||
ElementTypeLbExporter = ElementType{valuer.NewString("lb_exporter")}
|
||||
)
|
||||
|
||||
// NewElementType creates a new ElementType from a string value.
|
||||
// Returns the corresponding ElementType constant if the string matches,
|
||||
// otherwise returns an empty ElementType.
|
||||
func NewElementType(value string) ElementType {
|
||||
switch valuer.NewString(value) {
|
||||
case ElementTypeSamplingRules.String:
|
||||
return ElementTypeSamplingRules
|
||||
case ElementTypeDropRules.String:
|
||||
return ElementTypeDropRules
|
||||
case ElementTypeLogPipelines.String:
|
||||
return ElementTypeLogPipelines
|
||||
case ElementTypeLbExporter.String:
|
||||
return ElementTypeLbExporter
|
||||
default:
|
||||
return ElementType{valuer.NewString("")}
|
||||
}
|
||||
}
|
||||
|
||||
type DeployStatus struct{ valuer.String }
|
||||
|
||||
var (
|
||||
@@ -80,26 +98,6 @@ type AgentConfigVersion struct {
|
||||
Config string `json:"config" bun:"config,type:text"`
|
||||
}
|
||||
|
||||
type AgentConfigElement struct {
|
||||
bun.BaseModel `bun:"table:agent_config_element"`
|
||||
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
ElementID string `bun:"element_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
ElementType string `bun:"element_type,type:text,notnull,unique:element_type_version_idx"`
|
||||
VersionID valuer.UUID `bun:"version_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
}
|
||||
|
||||
func NewStorableAgent(store sqlstore.SQLStore, orgID valuer.UUID, agentID string, status AgentStatus) StorableAgent {
|
||||
return StorableAgent{
|
||||
OrgID: orgID,
|
||||
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
|
||||
AgentID: agentID,
|
||||
TimeAuditable: types.TimeAuditable{CreatedAt: time.Now(), UpdatedAt: time.Now()},
|
||||
Status: status,
|
||||
}
|
||||
}
|
||||
|
||||
func NewAgentConfigVersion(orgId valuer.UUID, userId valuer.UUID, elementType ElementType) *AgentConfigVersion {
|
||||
return &AgentConfigVersion{
|
||||
TimeAuditable: types.TimeAuditable{
|
||||
@@ -120,20 +118,12 @@ func (a *AgentConfigVersion) IncrementVersion(lastVersion int) {
|
||||
a.Version = lastVersion + 1
|
||||
}
|
||||
|
||||
// NewElementType creates a new ElementType from a string value.
|
||||
// Returns the corresponding ElementType constant if the string matches,
|
||||
// otherwise returns an empty ElementType.
|
||||
func NewElementType(value string) ElementType {
|
||||
switch valuer.NewString(value) {
|
||||
case ElementTypeSamplingRules.String:
|
||||
return ElementTypeSamplingRules
|
||||
case ElementTypeDropRules.String:
|
||||
return ElementTypeDropRules
|
||||
case ElementTypeLogPipelines.String:
|
||||
return ElementTypeLogPipelines
|
||||
case ElementTypeLbExporter.String:
|
||||
return ElementTypeLbExporter
|
||||
default:
|
||||
return ElementType{valuer.NewString("")}
|
||||
}
|
||||
type AgentConfigElement struct {
|
||||
bun.BaseModel `bun:"table:agent_config_element"`
|
||||
|
||||
types.Identifiable
|
||||
types.TimeAuditable
|
||||
ElementID string `bun:"element_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
ElementType string `bun:"element_type,type:text,notnull,unique:element_type_version_idx"`
|
||||
VersionID valuer.UUID `bun:"version_id,type:text,notnull,unique:element_type_version_idx"`
|
||||
}
|
||||
|
||||
@@ -58,6 +58,8 @@ def build_builder_query(
|
||||
step_interval: int = DEFAULT_STEP_INTERVAL,
|
||||
group_by: Optional[List[str]] = None,
|
||||
filter_expression: Optional[str] = None,
|
||||
order_by: Optional[List[Dict]] = None,
|
||||
limit: Optional[int] = None,
|
||||
functions: Optional[List[Dict]] = None,
|
||||
disabled: bool = False,
|
||||
) -> Dict:
|
||||
@@ -93,6 +95,12 @@ def build_builder_query(
|
||||
if filter_expression:
|
||||
spec["filter"] = {"expression": filter_expression}
|
||||
|
||||
if order_by:
|
||||
spec["order"] = order_by
|
||||
|
||||
if limit is not None:
|
||||
spec["limit"] = limit
|
||||
|
||||
if functions:
|
||||
spec["functions"] = functions
|
||||
|
||||
|
||||
@@ -2,16 +2,20 @@
|
||||
Look at the cumulative_counters_1h.jsonl file for the relevant data
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Any, Callable, List
|
||||
from typing import Any, Callable, List, Optional, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.metrics import Metrics
|
||||
from fixtures.querier import (
|
||||
build_builder_query,
|
||||
build_order_by,
|
||||
get_all_series,
|
||||
get_series_values,
|
||||
make_query_request,
|
||||
@@ -71,16 +75,200 @@ def test_rate_with_steady_values_and_reset(
|
||||
assert v["value"] >= 0, f"Rate should not be negative: {v['value']}"
|
||||
|
||||
|
||||
def _assert_endpoint_rate_values(endpoint_values: dict) -> None:
|
||||
# /health: 60 data points (t01-t60), steady +10/min
|
||||
# rate = 10/60 = 0.167
|
||||
if "/health" in endpoint_values:
|
||||
health_values = endpoint_values["/health"]
|
||||
assert (
|
||||
len(health_values) >= 58
|
||||
), f"Expected >= 58 values for /health, got {len(health_values)}"
|
||||
count_steady_health = sum(1 for v in health_values if v["value"] == 0.167)
|
||||
assert (
|
||||
count_steady_health >= 57
|
||||
), f"Expected >= 57 steady rate values (0.167) for /health, got {count_steady_health}"
|
||||
# all /health rates should be 0.167 except possibly first/last due to boundaries
|
||||
for v in health_values[1:-1]:
|
||||
assert v["value"] == 0.167, f"Expected /health rate 0.167, got {v['value']}"
|
||||
|
||||
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
|
||||
# rate = 20/60 = 0.333, gap causes lower averaged rate at boundary
|
||||
if "/products" in endpoint_values:
|
||||
products_values = endpoint_values["/products"]
|
||||
assert (
|
||||
len(products_values) >= 49
|
||||
), f"Expected >= 49 values for /products, got {len(products_values)}"
|
||||
count_steady_products = sum(1 for v in products_values if v["value"] == 0.333)
|
||||
# most values should be 0.333, some boundary values differ due to 10-min gap
|
||||
assert (
|
||||
count_steady_products >= 46
|
||||
), f"Expected >= 46 steady rate values (0.333) for /products, got {count_steady_products}"
|
||||
# check that non-0.333 values are due to gap averaging (should be lower)
|
||||
gap_boundary_values = [v["value"] for v in products_values if v["value"] != 0.333]
|
||||
for val in gap_boundary_values:
|
||||
assert (
|
||||
0 < val < 0.333
|
||||
), f"Gap boundary values should be between 0 and 0.333, got {val}"
|
||||
|
||||
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
|
||||
# normal rate = 1/60 = 0.0167, spike rate = 50/60 = 0.833
|
||||
if "/checkout" in endpoint_values:
|
||||
checkout_values = endpoint_values["/checkout"]
|
||||
assert (
|
||||
len(checkout_values) >= 59
|
||||
), f"Expected >= 59 values for /checkout, got {len(checkout_values)}"
|
||||
count_steady_checkout = sum(1 for v in checkout_values if v["value"] == 0.0167)
|
||||
assert (
|
||||
count_steady_checkout >= 53
|
||||
), f"Expected >= 53 steady rate values (0.0167) for /checkout, got {count_steady_checkout}"
|
||||
# check that spike values exist (traffic spike +50/min at t40-t44)
|
||||
count_spike_checkout = sum(1 for v in checkout_values if v["value"] == 0.833)
|
||||
assert (
|
||||
count_spike_checkout >= 4
|
||||
), f"Expected >= 4 spike rate values (0.833) for /checkout, got {count_spike_checkout}"
|
||||
# spike values should be consecutive
|
||||
spike_indices = [
|
||||
i for i, v in enumerate[Any](checkout_values) if v["value"] == 0.833
|
||||
]
|
||||
assert len(spike_indices) >= 4, f"Expected >= 4 spike indices, got {spike_indices}"
|
||||
for i in range(1, len(spike_indices)):
|
||||
assert (
|
||||
spike_indices[i] == spike_indices[i - 1] + 1
|
||||
), f"Spike indices should be consecutive, got {spike_indices}"
|
||||
|
||||
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
|
||||
# rate = 5/60 = 0.0833
|
||||
# reset at t31 causes: rate at t30 includes gap (lower), t31 has high rate after reset
|
||||
if "/orders" in endpoint_values:
|
||||
orders_values = endpoint_values["/orders"]
|
||||
assert (
|
||||
len(orders_values) >= 58
|
||||
), f"Expected >= 58 values for /orders, got {len(orders_values)}"
|
||||
count_steady_orders = sum(1 for v in orders_values if v["value"] == 0.0833)
|
||||
assert (
|
||||
count_steady_orders >= 55
|
||||
), f"Expected >= 55 steady rate values (0.0833) for /orders, got {count_steady_orders}"
|
||||
# check for counter reset effects - there should be some non-standard values
|
||||
non_standard_orders = [v["value"] for v in orders_values if v["value"] != 0.0833]
|
||||
assert (
|
||||
len(non_standard_orders) >= 2
|
||||
), f"Expected >= 2 non-standard values due to counter reset, got {non_standard_orders}"
|
||||
# post-reset value should be higher (new counter value / interval)
|
||||
high_rate_orders = [v for v in non_standard_orders if v > 0.0833]
|
||||
assert (
|
||||
len(high_rate_orders) >= 1
|
||||
), f"Expected at least one high rate value after counter reset, got {non_standard_orders}"
|
||||
|
||||
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes
|
||||
# Rate = 1/60 = 0.0167 during increment, 0 during flat periods
|
||||
if "/users" in endpoint_values:
|
||||
users_values = endpoint_values["/users"]
|
||||
assert (
|
||||
len(users_values) >= 54
|
||||
), f"Expected >= 54 values for /users, got {len(users_values)}"
|
||||
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
|
||||
# most values should be 0 (flat periods between increments)
|
||||
assert (
|
||||
count_zero_users >= 40
|
||||
), f"Expected >= 40 zero rate values for /users (sparse data), got {count_zero_users}"
|
||||
# non-zero values should be 0.0167 (1/60 increment rate)
|
||||
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
|
||||
count_increment_rate = sum(1 for v in non_zero_users if v == 0.0167)
|
||||
assert (
|
||||
count_increment_rate >= 8
|
||||
), f"Expected >= 8 increment rate values (0.0167) for /users, got {count_increment_rate}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"metric_suffix,order_by,limit,expected_count,expected_endpoints",
|
||||
[
|
||||
(
|
||||
"no_order",
|
||||
None, # this is equivalent to sum(metric_name)
|
||||
None,
|
||||
5,
|
||||
["/products", "/health", "/checkout", "/orders", "/users"],
|
||||
),
|
||||
(
|
||||
"only_limit",
|
||||
None, # this is equivalent to sum(metric_name)
|
||||
3,
|
||||
3,
|
||||
["/products", "/health", "/checkout"],
|
||||
),
|
||||
(
|
||||
"asc",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
None,
|
||||
5,
|
||||
["/checkout", "/health", "/orders", "/products", "/users"],
|
||||
),
|
||||
(
|
||||
"asc_lim3",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
3,
|
||||
3,
|
||||
["/checkout", "/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"desc",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
None,
|
||||
5,
|
||||
["/users", "/products", "/orders", "/health", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_lim3",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
3,
|
||||
3,
|
||||
["/users", "/products", "/orders"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name",
|
||||
[build_order_by("sum(test_rate_groupby_asc_metric_name)", "asc")],
|
||||
None,
|
||||
5,
|
||||
["/users", "/orders", "/checkout", "/health", "/products"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name_lim3",
|
||||
[build_order_by("sum(test_rate_groupby_asc_metric_name_lim3)", "asc")],
|
||||
3,
|
||||
3,
|
||||
["/users", "/orders", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name",
|
||||
[build_order_by("sum(test_rate_groupby_desc_metric_name)", "desc")],
|
||||
None,
|
||||
5,
|
||||
["/products", "/health", "/checkout", "/orders", "/users"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name_lim3",
|
||||
[build_order_by("sum(test_rate_groupby_desc_metric_name_lim3)", "desc")],
|
||||
3,
|
||||
3,
|
||||
["/products", "/health", "/checkout"],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_rate_group_by_endpoint(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
metric_suffix: str,
|
||||
order_by: Optional[List],
|
||||
limit: Optional[int],
|
||||
expected_count: int,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = "test_rate_groupby"
|
||||
metric_name = f"test_rate_groupby_{metric_suffix}"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
CUMULATIVE_COUNTERS_FILE,
|
||||
@@ -97,6 +285,8 @@ def test_rate_group_by_endpoint(
|
||||
"sum",
|
||||
temporality="cumulative",
|
||||
group_by=["endpoint"],
|
||||
order_by=order_by,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query])
|
||||
@@ -105,10 +295,23 @@ def test_rate_group_by_endpoint(
|
||||
data = response.json()
|
||||
all_series = get_all_series(data, "A")
|
||||
|
||||
# Should have 5 different endpoints
|
||||
assert (
|
||||
len(all_series) == 5
|
||||
), f"Expected 5 series for 5 endpoints, got {len(all_series)}"
|
||||
len(all_series) == expected_count
|
||||
), f"Expected {expected_count} series, got {len(all_series)}"
|
||||
|
||||
endpoint_labels = [
|
||||
series.get("labels", [{}])[0].get("value", "unknown")
|
||||
for series in all_series
|
||||
]
|
||||
|
||||
if isinstance(expected_endpoints, set):
|
||||
assert (
|
||||
set(endpoint_labels) == expected_endpoints
|
||||
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_labels)}"
|
||||
else:
|
||||
assert endpoint_labels == expected_endpoints, (
|
||||
f"Expected endpoints {expected_endpoints}, got {endpoint_labels}"
|
||||
)
|
||||
|
||||
# endpoint -> values
|
||||
endpoint_values = {}
|
||||
@@ -117,11 +320,6 @@ def test_rate_group_by_endpoint(
|
||||
values = sorted(series.get("values", []), key=lambda x: x["timestamp"])
|
||||
endpoint_values[endpoint] = values
|
||||
|
||||
expected_endpoints = {"/products", "/health", "/checkout", "/orders", "/users"}
|
||||
assert (
|
||||
set(endpoint_values.keys()) == expected_endpoints
|
||||
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_values.keys())}"
|
||||
|
||||
# at no point rate should be negative
|
||||
for endpoint, values in endpoint_values.items():
|
||||
for v in values:
|
||||
@@ -129,103 +327,4 @@ def test_rate_group_by_endpoint(
|
||||
v["value"] >= 0
|
||||
), f"Rate for {endpoint} should not be negative: {v['value']}"
|
||||
|
||||
# /health: 60 data points (t01-t60), steady +10/min
|
||||
# rate = 10/60 = 0.167
|
||||
health_values = endpoint_values["/health"]
|
||||
assert (
|
||||
len(health_values) >= 58
|
||||
), f"Expected >= 58 values for /health, got {len(health_values)}"
|
||||
count_steady_health = sum(1 for v in health_values if v["value"] == 0.167)
|
||||
assert (
|
||||
count_steady_health >= 57
|
||||
), f"Expected >= 57 steady rate values (0.167) for /health, got {count_steady_health}"
|
||||
# all /health rates should be 0.167 except possibly first/last due to boundaries
|
||||
for v in health_values[1:-1]:
|
||||
assert v["value"] == 0.167, f"Expected /health rate 0.167, got {v['value']}"
|
||||
|
||||
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
|
||||
# rate = 20/60 = 0.333, gap causes lower averaged rate at boundary
|
||||
products_values = endpoint_values["/products"]
|
||||
assert (
|
||||
len(products_values) >= 49
|
||||
), f"Expected >= 49 values for /products, got {len(products_values)}"
|
||||
count_steady_products = sum(1 for v in products_values if v["value"] == 0.333)
|
||||
|
||||
# most values should be 0.333, some boundary values differ due to 10-min gap
|
||||
assert (
|
||||
count_steady_products >= 46
|
||||
), f"Expected >= 46 steady rate values (0.333) for /products, got {count_steady_products}"
|
||||
|
||||
# check that non-0.333 values are due to gap averaging (should be lower)
|
||||
gap_boundary_values = [v["value"] for v in products_values if v["value"] != 0.333]
|
||||
for val in gap_boundary_values:
|
||||
assert (
|
||||
0 < val < 0.333
|
||||
), f"Gap boundary values should be between 0 and 0.333, got {val}"
|
||||
|
||||
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
|
||||
# normal rate = 1/60 = 0.0167, spike rate = 50/60 = 0.833
|
||||
checkout_values = endpoint_values["/checkout"]
|
||||
assert (
|
||||
len(checkout_values) >= 59
|
||||
), f"Expected >= 59 values for /checkout, got {len(checkout_values)}"
|
||||
count_steady_checkout = sum(1 for v in checkout_values if v["value"] == 0.0167)
|
||||
assert (
|
||||
count_steady_checkout >= 53
|
||||
), f"Expected >= 53 steady rate values (0.0167) for /checkout, got {count_steady_checkout}"
|
||||
# check that spike values exist (traffic spike +50/min at t40-t44)
|
||||
count_spike_checkout = sum(1 for v in checkout_values if v["value"] == 0.833)
|
||||
assert (
|
||||
count_spike_checkout >= 4
|
||||
), f"Expected >= 4 spike rate values (0.833) for /checkout, got {count_spike_checkout}"
|
||||
|
||||
# spike values should be consecutive
|
||||
spike_indices = [
|
||||
i for i, v in enumerate[Any](checkout_values) if v["value"] == 0.833
|
||||
]
|
||||
assert len(spike_indices) >= 4, f"Expected >= 4 spike indices, got {spike_indices}"
|
||||
# consecutiveness
|
||||
for i in range(1, len(spike_indices)):
|
||||
assert (
|
||||
spike_indices[i] == spike_indices[i - 1] + 1
|
||||
), f"Spike indices should be consecutive, got {spike_indices}"
|
||||
|
||||
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
|
||||
# rate = 5/60 = 0.0833
|
||||
# reset at t31 causes: rate at t30 includes gap (lower), t31 has high rate after reset
|
||||
orders_values = endpoint_values["/orders"]
|
||||
assert (
|
||||
len(orders_values) >= 58
|
||||
), f"Expected >= 58 values for /orders, got {len(orders_values)}"
|
||||
count_steady_orders = sum(1 for v in orders_values if v["value"] == 0.0833)
|
||||
assert (
|
||||
count_steady_orders >= 55
|
||||
), f"Expected >= 55 steady rate values (0.0833) for /orders, got {count_steady_orders}"
|
||||
# check for counter reset effects - there should be some non-standard values
|
||||
non_standard_orders = [v["value"] for v in orders_values if v["value"] != 0.0833]
|
||||
assert (
|
||||
len(non_standard_orders) >= 2
|
||||
), f"Expected >= 2 non-standard values due to counter reset, got {non_standard_orders}"
|
||||
# post-reset value should be higher (new counter value / interval)
|
||||
high_rate_orders = [v for v in non_standard_orders if v > 0.0833]
|
||||
assert (
|
||||
len(high_rate_orders) >= 1
|
||||
), f"Expected at least one high rate value after counter reset, got {non_standard_orders}"
|
||||
|
||||
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes
|
||||
# Rate = 1/60 = 0.0167 during increment, 0 during flat periods
|
||||
users_values = endpoint_values["/users"]
|
||||
assert (
|
||||
len(users_values) >= 54
|
||||
), f"Expected >= 54 values for /users, got {len(users_values)}"
|
||||
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
|
||||
# most values should be 0 (flat periods between increments)
|
||||
assert (
|
||||
count_zero_users >= 40
|
||||
), f"Expected >= 40 zero rate values for /users (sparse data), got {count_zero_users}"
|
||||
# non-zero values should be 0.0167 (1/60 increment rate)
|
||||
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
|
||||
count_increment_rate = sum(1 for v in non_zero_users if v == 0.0167)
|
||||
assert (
|
||||
count_increment_rate >= 8
|
||||
), f"Expected >= 8 increment rate values (0.0167) for /users, got {count_increment_rate}"
|
||||
_assert_endpoint_rate_values(endpoint_values)
|
||||
|
||||
@@ -5,7 +5,7 @@ Look at the multi_temporality_counters_1h.jsonl file for the relevant data
|
||||
import random
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Callable, List
|
||||
from typing import Callable, List, Optional, Union
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -14,6 +14,7 @@ from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.metrics import Metrics
|
||||
from fixtures.querier import (
|
||||
build_builder_query,
|
||||
build_order_by,
|
||||
get_all_series,
|
||||
get_series_values,
|
||||
make_query_request,
|
||||
@@ -91,6 +92,198 @@ def test_with_steady_values_and_reset(
|
||||
), f"{time_aggregation} should not be negative: {v['value']}"
|
||||
|
||||
|
||||
def _assert_endpoint_group_values( # pylint: disable=too-many-arguments
|
||||
endpoint_values: dict,
|
||||
stable_health_value: float,
|
||||
stable_products_value: float,
|
||||
stable_checkout_value: float,
|
||||
spike_checkout_value: float,
|
||||
stable_orders_value: float,
|
||||
spike_users_value: float,
|
||||
time_aggregation: str,
|
||||
) -> None:
|
||||
# /health: 60 data points (t01-t60), steady +10/min
|
||||
if "/health" in endpoint_values:
|
||||
health_values = endpoint_values["/health"]
|
||||
assert (
|
||||
len(health_values) >= 58
|
||||
), f"Expected >= 58 values for /health, got {len(health_values)}"
|
||||
count_steady_health = sum(
|
||||
1 for v in health_values if v["value"] == stable_health_value
|
||||
)
|
||||
assert (
|
||||
count_steady_health >= 57
|
||||
), f"Expected >= 57 steady rate values ({stable_health_value}) for /health, got {count_steady_health}"
|
||||
# all /health rates should be stable except possibly first/last due to boundaries
|
||||
for v in health_values[1:-1]:
|
||||
assert (
|
||||
v["value"] == stable_health_value
|
||||
), f"Expected /health rate {stable_health_value}, got {v['value']}"
|
||||
|
||||
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
|
||||
if "/products" in endpoint_values:
|
||||
products_values = endpoint_values["/products"]
|
||||
assert (
|
||||
len(products_values) >= 49
|
||||
), f"Expected >= 49 values for /products, got {len(products_values)}"
|
||||
count_steady_products = sum(
|
||||
1 for v in products_values if v["value"] == stable_products_value
|
||||
)
|
||||
# most values should be stable, some boundary values differ due to 10-min gap
|
||||
assert (
|
||||
count_steady_products >= 46
|
||||
), f"Expected >= 46 steady rate values ({stable_products_value}) for /products, got {count_steady_products}"
|
||||
# check that non-stable values are due to gap averaging (should be lower)
|
||||
gap_boundary_values = [
|
||||
v["value"] for v in products_values if v["value"] != stable_products_value
|
||||
]
|
||||
for val in gap_boundary_values:
|
||||
assert (
|
||||
0 < val < stable_products_value
|
||||
), f"Gap boundary values should be between 0 and {stable_products_value}, got {val}"
|
||||
|
||||
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
|
||||
if "/checkout" in endpoint_values:
|
||||
checkout_values = endpoint_values["/checkout"]
|
||||
assert (
|
||||
len(checkout_values) >= 59
|
||||
), f"Expected >= 59 values for /checkout, got {len(checkout_values)}"
|
||||
count_steady_checkout = sum(
|
||||
1 for v in checkout_values if v["value"] == stable_checkout_value
|
||||
)
|
||||
assert (
|
||||
count_steady_checkout >= 53
|
||||
), f"Expected >= 53 steady {time_aggregation} values ({stable_checkout_value}) for /checkout, got {count_steady_checkout}"
|
||||
# check that spike values exist (traffic spike +50/min at t40-t44)
|
||||
count_spike_checkout = sum(
|
||||
1 for v in checkout_values if v["value"] == spike_checkout_value
|
||||
)
|
||||
assert (
|
||||
count_spike_checkout >= 4
|
||||
), f"Expected >= 4 spike {time_aggregation} values ({spike_checkout_value}) for /checkout, got {count_spike_checkout}"
|
||||
# spike values should be consecutive
|
||||
spike_indices = [
|
||||
i for i, v in enumerate(checkout_values) if v["value"] == spike_checkout_value
|
||||
]
|
||||
assert len(spike_indices) >= 4, f"Expected >= 4 spike indices, got {spike_indices}"
|
||||
for i in range(1, len(spike_indices)):
|
||||
assert (
|
||||
spike_indices[i] == spike_indices[i - 1] + 1
|
||||
), f"Spike indices should be consecutive, got {spike_indices}"
|
||||
|
||||
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
|
||||
# reset at t31 causes: rate/increase at t30 includes gap (lower), t31 has high rate after reset
|
||||
if "/orders" in endpoint_values:
|
||||
orders_values = endpoint_values["/orders"]
|
||||
assert (
|
||||
len(orders_values) >= 58
|
||||
), f"Expected >= 58 values for /orders, got {len(orders_values)}"
|
||||
count_steady_orders = sum(
|
||||
1 for v in orders_values if v["value"] == stable_orders_value
|
||||
)
|
||||
assert (
|
||||
count_steady_orders >= 55
|
||||
), f"Expected >= 55 steady {time_aggregation} values ({stable_orders_value}) for /orders, got {count_steady_orders}"
|
||||
# check for counter reset effects - there should be some non-standard values
|
||||
non_standard_orders = [
|
||||
v["value"] for v in orders_values if v["value"] != stable_orders_value
|
||||
]
|
||||
assert (
|
||||
len(non_standard_orders) >= 2
|
||||
), f"Expected >= 2 non-standard values due to counter reset, got {non_standard_orders}"
|
||||
# post-reset value should be higher (new counter value / interval)
|
||||
high_rate_orders = [v for v in non_standard_orders if v > stable_orders_value]
|
||||
assert (
|
||||
len(high_rate_orders) >= 1
|
||||
), f"Expected at least one high {time_aggregation} value after counter reset, got {non_standard_orders}"
|
||||
|
||||
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes
|
||||
if "/users" in endpoint_values:
|
||||
users_values = endpoint_values["/users"]
|
||||
assert (
|
||||
len(users_values) >= 54
|
||||
), f"Expected >= 54 values for /users, got {len(users_values)}"
|
||||
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
|
||||
# most values should be 0 (flat periods between increments)
|
||||
assert (
|
||||
count_zero_users >= 40
|
||||
), f"Expected >= 40 zero {time_aggregation} values for /users (sparse data), got {count_zero_users}"
|
||||
# non-zero values should be stable increment rate
|
||||
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
|
||||
count_increment_rate = sum(1 for v in non_zero_users if v == spike_users_value)
|
||||
assert (
|
||||
count_increment_rate >= 8
|
||||
), f"Expected >= 8 increment {time_aggregation} values ({spike_users_value}) for /users, got {count_increment_rate}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_suffix,order_by_spec,limit,expected_count,expected_endpoints",
|
||||
[
|
||||
(
|
||||
"no_order",
|
||||
None,
|
||||
None,
|
||||
5,
|
||||
["/products", "/health", "/checkout", "/orders", "/users"],
|
||||
),
|
||||
(
|
||||
"asc",
|
||||
("endpoint", "asc"),
|
||||
None,
|
||||
5,
|
||||
["/checkout", "/health", "/orders", "/products", "/users"],
|
||||
),
|
||||
(
|
||||
"asc_lim3",
|
||||
("endpoint", "asc"),
|
||||
3,
|
||||
3,
|
||||
["/checkout", "/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"desc",
|
||||
("endpoint", "desc"),
|
||||
None,
|
||||
5,
|
||||
["/users", "/products", "/orders", "/health", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_lim3",
|
||||
("endpoint", "desc"),
|
||||
3,
|
||||
3,
|
||||
["/users", "/products", "/orders"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name",
|
||||
("sum_metric", "asc"),
|
||||
None,
|
||||
5,
|
||||
["/users", "/orders", "/checkout", "/health", "/products"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name_lim3",
|
||||
("sum_metric", "asc"),
|
||||
3,
|
||||
3,
|
||||
["/users", "/orders", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name",
|
||||
("sum_metric", "desc"),
|
||||
None,
|
||||
5,
|
||||
["/products", "/health", "/checkout", "/orders", "/users"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name_lim3",
|
||||
("sum_metric", "desc"),
|
||||
3,
|
||||
3,
|
||||
["/products", "/health", "/checkout"],
|
||||
),
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"time_aggregation, stable_health_value, stable_products_value, stable_checkout_value, spike_checkout_value, stable_orders_value, spike_users_value",
|
||||
[
|
||||
@@ -110,11 +303,24 @@ def test_group_by_endpoint(
|
||||
spike_checkout_value: float,
|
||||
stable_orders_value: float,
|
||||
spike_users_value: float,
|
||||
order_suffix: str,
|
||||
order_by_spec: Optional[tuple],
|
||||
limit: Optional[int],
|
||||
expected_count: int,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = f"test_{time_aggregation}_groupby"
|
||||
metric_name = f"test_{time_aggregation}_groupby_{order_suffix}"
|
||||
|
||||
# Build order_by at runtime so metric name reflects actual time_aggregation
|
||||
order_by = None
|
||||
if order_by_spec is not None:
|
||||
key, direction = order_by_spec
|
||||
if key == "sum_metric":
|
||||
key = f"sum({metric_name})"
|
||||
order_by = [build_order_by(key, direction)]
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
MULTI_TEMPORALITY_FILE,
|
||||
@@ -130,6 +336,8 @@ def test_group_by_endpoint(
|
||||
time_aggregation,
|
||||
"sum",
|
||||
group_by=["endpoint"],
|
||||
order_by=order_by,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query])
|
||||
@@ -137,10 +345,23 @@ def test_group_by_endpoint(
|
||||
|
||||
data = response.json()
|
||||
all_series = get_all_series(data, "A")
|
||||
# Should have 5 different endpoints
|
||||
assert (
|
||||
len(all_series) == 5
|
||||
), f"Expected 5 series for 5 endpoints, got {len(all_series)}"
|
||||
len(all_series) == expected_count
|
||||
), f"Expected {expected_count} series, got {len(all_series)}"
|
||||
|
||||
endpoint_labels = [
|
||||
series.get("labels", [{}])[0].get("value", "unknown")
|
||||
for series in all_series
|
||||
]
|
||||
|
||||
if isinstance(expected_endpoints, set):
|
||||
assert (
|
||||
set(endpoint_labels) == expected_endpoints
|
||||
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_labels)}"
|
||||
else:
|
||||
assert endpoint_labels == expected_endpoints, (
|
||||
f"Expected endpoints {expected_endpoints}, got {endpoint_labels}"
|
||||
)
|
||||
|
||||
# endpoint -> values
|
||||
endpoint_values = {}
|
||||
@@ -149,11 +370,6 @@ def test_group_by_endpoint(
|
||||
values = sorted(series.get("values", []), key=lambda x: x["timestamp"])
|
||||
endpoint_values[endpoint] = values
|
||||
|
||||
expected_endpoints = {"/products", "/health", "/checkout", "/orders", "/users"}
|
||||
assert (
|
||||
set(endpoint_values.keys()) == expected_endpoints
|
||||
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_values.keys())}"
|
||||
|
||||
# at no point rate should be negative
|
||||
for endpoint, values in endpoint_values.items():
|
||||
for v in values:
|
||||
@@ -161,117 +377,16 @@ def test_group_by_endpoint(
|
||||
v["value"] >= 0
|
||||
), f"Rate for {endpoint} should not be negative: {v['value']}"
|
||||
|
||||
# /health: 60 data points (t01-t60), steady +10/min
|
||||
health_values = endpoint_values["/health"]
|
||||
assert (
|
||||
len(health_values) >= 58
|
||||
), f"Expected >= 58 values for /health, got {len(health_values)}"
|
||||
count_steady_health = sum(
|
||||
1 for v in health_values if v["value"] == stable_health_value
|
||||
_assert_endpoint_group_values(
|
||||
endpoint_values,
|
||||
stable_health_value,
|
||||
stable_products_value,
|
||||
stable_checkout_value,
|
||||
spike_checkout_value,
|
||||
stable_orders_value,
|
||||
spike_users_value,
|
||||
time_aggregation,
|
||||
)
|
||||
assert (
|
||||
count_steady_health >= 57
|
||||
), f"Expected >= 57 steady rate values ({stable_health_value}) for /health, got {count_steady_health}"
|
||||
# all /health rates should be state except possibly first/last due to boundaries
|
||||
for v in health_values[1:-1]:
|
||||
assert (
|
||||
v["value"] == stable_health_value
|
||||
), f"Expected /health rate {stable_health_value}, got {v['value']}"
|
||||
|
||||
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
|
||||
products_values = endpoint_values["/products"]
|
||||
assert (
|
||||
len(products_values) >= 49
|
||||
), f"Expected >= 49 values for /products, got {len(products_values)}"
|
||||
count_steady_products = sum(
|
||||
1 for v in products_values if v["value"] == stable_products_value
|
||||
)
|
||||
|
||||
# most values should be stable, some boundary values differ due to 10-min gap
|
||||
assert (
|
||||
count_steady_products >= 46
|
||||
), f"Expected >= 46 steady rate values ({stable_products_value}) for /products, got {count_steady_products}"
|
||||
|
||||
# check that non-stable values are due to gap averaging (should be lower)
|
||||
gap_boundary_values = [
|
||||
v["value"] for v in products_values if v["value"] != stable_products_value
|
||||
]
|
||||
for val in gap_boundary_values:
|
||||
assert (
|
||||
0 < val < stable_products_value
|
||||
), f"Gap boundary values should be between 0 and {stable_products_value}, got {val}"
|
||||
|
||||
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
|
||||
checkout_values = endpoint_values["/checkout"]
|
||||
assert (
|
||||
len(checkout_values) >= 59
|
||||
), f"Expected >= 59 values for /checkout, got {len(checkout_values)}"
|
||||
count_steady_checkout = sum(
|
||||
1 for v in checkout_values if v["value"] == stable_checkout_value
|
||||
)
|
||||
assert (
|
||||
count_steady_checkout >= 53
|
||||
), f"Expected >= 53 steady {time_aggregation} values ({stable_checkout_value}) for /checkout, got {count_steady_checkout}"
|
||||
# check that spike values exist (traffic spike +50/min at t40-t44)
|
||||
count_spike_checkout = sum(
|
||||
1 for v in checkout_values if v["value"] == spike_checkout_value
|
||||
)
|
||||
assert (
|
||||
count_spike_checkout >= 4
|
||||
), f"Expected >= 4 spike {time_aggregation} values ({spike_checkout_value}) for /checkout, got {count_spike_checkout}"
|
||||
|
||||
# spike values should be consecutive
|
||||
spike_indices = [
|
||||
i for i, v in enumerate(checkout_values) if v["value"] == spike_checkout_value
|
||||
]
|
||||
assert len(spike_indices) >= 4, f"Expected >= 4 spike indices, got {spike_indices}"
|
||||
# consecutiveness
|
||||
for i in range(1, len(spike_indices)):
|
||||
assert (
|
||||
spike_indices[i] == spike_indices[i - 1] + 1
|
||||
), f"Spike indices should be consecutive, got {spike_indices}"
|
||||
|
||||
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
|
||||
# reset at t31 causes: rate/increase at t30 includes gap (lower), t31 has high rate after reset
|
||||
orders_values = endpoint_values["/orders"]
|
||||
assert (
|
||||
len(orders_values) >= 58
|
||||
), f"Expected >= 58 values for /orders, got {len(orders_values)}"
|
||||
count_steady_orders = sum(
|
||||
1 for v in orders_values if v["value"] == stable_orders_value
|
||||
)
|
||||
assert (
|
||||
count_steady_orders >= 55
|
||||
), f"Expected >= 55 steady {time_aggregation} values ({stable_orders_value}) for /orders, got {count_steady_orders}"
|
||||
# check for counter reset effects - there should be some non-standard values
|
||||
non_standard_orders = [
|
||||
v["value"] for v in orders_values if v["value"] != stable_orders_value
|
||||
]
|
||||
assert (
|
||||
len(non_standard_orders) >= 2
|
||||
), f"Expected >= 2 non-standard values due to counter reset, got {non_standard_orders}"
|
||||
# post-reset value should be higher (new counter value / interval)
|
||||
high_rate_orders = [v for v in non_standard_orders if v > stable_orders_value]
|
||||
assert (
|
||||
len(high_rate_orders) >= 1
|
||||
), f"Expected at least one high {time_aggregation} value after counter reset, got {non_standard_orders}"
|
||||
|
||||
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes
|
||||
users_values = endpoint_values["/users"]
|
||||
assert (
|
||||
len(users_values) >= 54
|
||||
), f"Expected >= 54 values for /users, got {len(users_values)}"
|
||||
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
|
||||
# most values should be 0 (flat periods between increments)
|
||||
assert (
|
||||
count_zero_users >= 40
|
||||
), f"Expected >= 40 zero {time_aggregation} values for /users (sparse data), got {count_zero_users}"
|
||||
# non-zero values should be 0.0167 (1/60 increment rate)
|
||||
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
|
||||
count_increment_rate = sum(1 for v in non_zero_users if v == spike_users_value)
|
||||
assert (
|
||||
count_increment_rate >= 8
|
||||
), f"Expected >= 8 increment {time_aggregation} values ({spike_users_value}) for /users, got {count_increment_rate}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
||||
@@ -4,7 +4,7 @@ Look at the histogram_data_1h.jsonl file for the relevant data
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Callable, List
|
||||
from typing import Callable, List, Optional, Union
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -13,13 +13,16 @@ from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.metrics import Metrics
|
||||
from fixtures.querier import (
|
||||
build_builder_query,
|
||||
build_order_by,
|
||||
get_all_series,
|
||||
get_series_values,
|
||||
make_query_request,
|
||||
)
|
||||
from fixtures.utils import get_testdata_file_path
|
||||
|
||||
|
||||
FILE = get_testdata_file_path("histogram_data_1h.jsonl")
|
||||
FILE_WITH_MANY_GROUPS = get_testdata_file_path("histogram_data_1h_many_groups.jsonl")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -521,4 +524,564 @@ def test_histogram_percentile_for_delta_service(
|
||||
assert len(result_values) == 60
|
||||
assert result_values[0]["value"] == zeroth_value
|
||||
assert result_values[1]["value"] == first_value
|
||||
assert result_values[-1]["value"] == last_value
|
||||
assert result_values[-1]["value"] == last_value
|
||||
|
||||
|
||||
def _assert_series_endpoint_labels(
|
||||
series: list,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
prefix: str,
|
||||
) -> None:
|
||||
labels = [s.get("labels", [{}])[0].get("value", "unknown") for s in series]
|
||||
if isinstance(expected_endpoints, set):
|
||||
assert (
|
||||
set(labels) == expected_endpoints
|
||||
), f"Expected {prefix} endpoints {expected_endpoints}, got {set(labels)}"
|
||||
else:
|
||||
assert labels == expected_endpoints, (
|
||||
f"Expected {prefix} endpoints in order {expected_endpoints}, got {labels}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_suffix,order_by,limit,expected_count,expected_endpoints",
|
||||
[
|
||||
(
|
||||
"no_order",
|
||||
None,
|
||||
None,
|
||||
3,
|
||||
["/checkout", "/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"asc",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
None,
|
||||
3,
|
||||
["/checkout", "/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"asc_lim2",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
2,
|
||||
2,
|
||||
["/checkout", "/health"],
|
||||
),
|
||||
(
|
||||
"desc",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
None,
|
||||
3,
|
||||
["/orders", "/health", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_lim2",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
2,
|
||||
2,
|
||||
["/orders", "/health"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name",
|
||||
[build_order_by("count(test_histogram_count_groupby_asc_metric_name)", "asc")],
|
||||
None,
|
||||
3,
|
||||
["/health", "/orders", "/checkout"], ## health and orders have the same size so they are then sorted endpoint as a tiebreaker
|
||||
),
|
||||
(
|
||||
"asc_metric_name_lim2",
|
||||
[build_order_by("count(test_histogram_count_groupby_asc_metric_name_lim2)", "asc")],
|
||||
2,
|
||||
2,
|
||||
["/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name",
|
||||
[build_order_by("count(test_histogram_count_groupby_desc_metric_name)", "desc")],
|
||||
None,
|
||||
3,
|
||||
["/checkout", "/health", "/orders"], ## health and orders have the same size so they are then sorted endpoint as a tiebreaker
|
||||
),
|
||||
(
|
||||
"desc_metric_name_lim2",
|
||||
[build_order_by("count(test_histogram_count_groupby_desc_metric_name_lim2)", "desc")],
|
||||
2,
|
||||
2,
|
||||
["/checkout", "/health"],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_histogram_count_group_by_endpoint(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
order_suffix: str,
|
||||
order_by: Optional[List],
|
||||
limit: Optional[int],
|
||||
expected_count: int,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = f"test_histogram_count_groupby_{order_suffix}"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
FILE,
|
||||
base_time=now - timedelta(minutes=60),
|
||||
metric_name_override=metric_name,
|
||||
)
|
||||
insert_metrics(metrics)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
query_count = build_builder_query(
|
||||
"A",
|
||||
metric_name,
|
||||
"increase",
|
||||
"count",
|
||||
comparisonSpaceAggregationParam={"threshold": 1000, "operator": "<="},
|
||||
group_by=["endpoint"],
|
||||
order_by=order_by,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query_count])
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
data = response.json()
|
||||
count_all_series = get_all_series(data, "A")
|
||||
|
||||
assert (
|
||||
len(count_all_series) == expected_count
|
||||
), f"Expected {expected_count} series, got {len(count_all_series)}"
|
||||
|
||||
_assert_series_endpoint_labels(count_all_series, expected_endpoints, "count")
|
||||
|
||||
count_values = {}
|
||||
for series in count_all_series:
|
||||
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
|
||||
count_values[endpoint] = sorted(
|
||||
series.get("values", []), key=lambda x: x["timestamp"]
|
||||
)
|
||||
|
||||
for endpoint, values in count_values.items():
|
||||
for v in values:
|
||||
assert v["value"] >= 0, f"Count for {endpoint} should not be negative: {v['value']}"
|
||||
|
||||
# /health (cumulative, service=api): 59 points, increase starts at 11/min → 69/min
|
||||
if "/health" in count_values:
|
||||
vals = count_values["/health"]
|
||||
assert vals[0]["value"] == 11, f"Expected /health count first=11, got {vals[0]['value']}"
|
||||
assert vals[-1]["value"] == 69, f"Expected /health count last=69, got {vals[-1]['value']}"
|
||||
|
||||
# /orders (cumulative, service=api): same distribution as /health
|
||||
if "/orders" in count_values:
|
||||
vals = count_values["/orders"]
|
||||
assert vals[0]["value"] == 11, f"Expected /orders count first=11, got {vals[0]['value']}"
|
||||
assert vals[-1]["value"] == 69, f"Expected /orders count last=69, got {vals[-1]['value']}"
|
||||
|
||||
# /checkout (delta, service=web): 60 points, zeroth=12345 (raw delta), then 11/min → 69/min
|
||||
if "/checkout" in count_values:
|
||||
vals = count_values["/checkout"]
|
||||
assert vals[0]["value"] == 12345, f"Expected /checkout count zeroth=12345, got {vals[0]['value']}"
|
||||
assert vals[1]["value"] == 11, f"Expected /checkout count first=11, got {vals[1]['value']}"
|
||||
assert vals[-1]["value"] == 69, f"Expected /checkout count last=69, got {vals[-1]['value']}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_suffix,order_by,limit,expected_count,expected_endpoints",
|
||||
[
|
||||
(
|
||||
"no_order",
|
||||
None,
|
||||
None,
|
||||
4,
|
||||
[ "/checkout", "/health", "/orders", "/coupon"],
|
||||
),
|
||||
(
|
||||
"only_limit",
|
||||
None,
|
||||
1,
|
||||
1,
|
||||
[ "/checkout"], ##health and checkout have the same size so they are then sorted endpoint as a tiebreaker, and only checkout makes the limit
|
||||
),
|
||||
(
|
||||
"asc",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
None,
|
||||
4,
|
||||
[ "/checkout", "/coupon", "/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"asc_lim2",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
2,
|
||||
2,
|
||||
["/checkout", "/coupon"],
|
||||
),
|
||||
(
|
||||
"desc",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
None,
|
||||
4,
|
||||
["/orders", "/health", "/coupon", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_lim2",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
2,
|
||||
2,
|
||||
["/orders", "/health"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name",
|
||||
[build_order_by("p75(test_histogram_p75_groupby_asc_metric_name)", "asc")],
|
||||
None,
|
||||
4,
|
||||
["/coupon", "/orders", "/checkout", "/health"], ## health and checkout have the same size so they are then sorted endpoint as a tiebreaker
|
||||
),
|
||||
(
|
||||
"asc_metric_name_lim2",
|
||||
[build_order_by("p75(test_histogram_p75_groupby_asc_metric_name_lim2)", "asc")],
|
||||
2,
|
||||
2,
|
||||
["/coupon", "/orders"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name_lim3",
|
||||
[build_order_by("p75(test_histogram_p75_groupby_asc_metric_name_lim3)", "asc")],
|
||||
3,
|
||||
3,
|
||||
["/coupon", "/orders", "/checkout"], ##health and checkout have the same size so they are then sorted endpoint as a tiebreaker, and only checkout makes the limit
|
||||
),
|
||||
(
|
||||
"desc_metric_name",
|
||||
[build_order_by("p75(test_histogram_p75_groupby_desc_metric_name)", "desc")],
|
||||
None,
|
||||
4,
|
||||
[ "/checkout", "/health", "/orders", "/coupon"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name_lim2",
|
||||
[build_order_by("p75(test_histogram_p75_groupby_desc_metric_name_lim2)", "desc")],
|
||||
2,
|
||||
2,
|
||||
[ "/checkout", "/health"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name_lim2",
|
||||
[build_order_by("p75(test_histogram_p75_groupby_desc_metric_name_lim2)", "desc")],
|
||||
1,
|
||||
1,
|
||||
[ "/checkout"], ##health and checkout have the same size so they are then sorted endpoint as a tiebreaker, and only checkout makes the limit
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_histogram_percentile_group_by_endpoint(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
order_suffix: str,
|
||||
order_by: Optional[List],
|
||||
limit: Optional[int],
|
||||
expected_count: int,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = f"test_histogram_p75_groupby_{order_suffix}"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
FILE_WITH_MANY_GROUPS,
|
||||
base_time=now - timedelta(minutes=60),
|
||||
metric_name_override=metric_name,
|
||||
)
|
||||
insert_metrics(metrics)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
query_p75 = build_builder_query(
|
||||
"A",
|
||||
metric_name,
|
||||
"doesnotreallymatter",
|
||||
"p75",
|
||||
group_by=["endpoint"],
|
||||
order_by=order_by,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query_p75])
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
data = response.json()
|
||||
p75_series = get_all_series(data, "A")
|
||||
|
||||
for series in p75_series:
|
||||
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
|
||||
|
||||
assert (
|
||||
len(p75_series) == expected_count
|
||||
), f"Expected {expected_count} p75 series, got {len(p75_series)}"
|
||||
|
||||
_assert_series_endpoint_labels(p75_series, expected_endpoints, "p75")
|
||||
|
||||
p75_values = {}
|
||||
for series in p75_series:
|
||||
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
|
||||
p75_values[endpoint] = sorted(
|
||||
series.get("values", []), key=lambda x: x["timestamp"]
|
||||
)
|
||||
|
||||
for endpoint, values in p75_values.items():
|
||||
for v in values:
|
||||
assert v["value"] >= 0, f"p75 for {endpoint} should not be negative: {v['value']}"
|
||||
|
||||
# /health (cumulative, service=api)
|
||||
if "/health" in p75_values:
|
||||
vals = p75_values["/health"]
|
||||
assert vals[0]["value"] == 6000, f"Expected /health p75 first=6000, got {vals[0]['value']}"
|
||||
assert vals[-1]["value"] == 6000, f"Expected /health p75 last=6000, got {vals[-1]['value']}"
|
||||
|
||||
# /orders (cumulative, service=api): same distribution as /health
|
||||
if "/orders" in p75_values:
|
||||
vals = p75_values["/orders"]
|
||||
assert vals[0]["value"] == 4500, f"Expected /orders p75 first=4500, got {vals[0]['value']}"
|
||||
assert vals[-1]["value"] == 4500, f"Expected /orders p75 last=4500, got {vals[-1]['value']}"
|
||||
|
||||
# /checkout (delta, service=web): 60 points
|
||||
if "/checkout" in p75_values:
|
||||
vals = p75_values["/checkout"]
|
||||
assert vals[0]["value"] == 6000, f"Expected /checkout p75 zeroth=6000, got {vals[0]['value']}"
|
||||
assert vals[1]["value"] == 6000, f"Expected /checkout p75 first=6000, got {vals[1]['value']}"
|
||||
assert vals[-1]["value"] == 6000, f"Expected /checkout p75 last=6000, got {vals[-1]['value']}"
|
||||
|
||||
# /coupon (delta, service=web): 60 points
|
||||
if "/coupon" in p75_values:
|
||||
vals = p75_values["/coupon"]
|
||||
assert vals[0]["value"] == 1125, f"Expected /coupon p75 zeroth=1125, got {vals[0]['value']}"
|
||||
assert vals[1]["value"] == 1125, f"Expected /coupon p75 first=1125, got {vals[1]['value']}"
|
||||
assert vals[-1]["value"] == 1125, f"Expected /coupon p75 last=1125, got {vals[-1]['value']}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_suffix,order_by,limit,expected_count,expected_endpoints, expected_status_codes",
|
||||
[
|
||||
(
|
||||
"no_order",
|
||||
None,
|
||||
None,
|
||||
5,
|
||||
[ "/checkout", "/health", "/orders", "/coupon", "/coupon"], ## coupon has 200 and 500 status codes so it will appear twice
|
||||
[ "200", "200", "200", "200", "500"],
|
||||
),
|
||||
(
|
||||
"only_limit",
|
||||
None,
|
||||
1,
|
||||
1,
|
||||
[ "/checkout"], ##health and checkout have the same size so they are then sorted endpoint as a tiebreaker, and only checkout makes the limit
|
||||
[ "200"]
|
||||
),
|
||||
(
|
||||
"asc_endpoint",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
None,
|
||||
5,
|
||||
[ "/checkout", "/coupon", "/coupon", "/health", "/orders"],
|
||||
[ "200", "200", "500", "200", "200"],
|
||||
),
|
||||
(
|
||||
"asc_endpoint_status_code",
|
||||
[build_order_by("endpoint", "asc"), build_order_by("status_code", "asc")],
|
||||
None,
|
||||
5,
|
||||
[ "/checkout", "/coupon", "/coupon", "/health", "/orders"],
|
||||
[ "200", "200", "500", "200", "200"],
|
||||
),
|
||||
(
|
||||
"asc_status_code_endpoint",
|
||||
[build_order_by("status_code", "asc"), build_order_by("endpoint", "asc")],
|
||||
None,
|
||||
5,
|
||||
[ "/checkout", "/coupon", "/health", "/orders", "/coupon"],
|
||||
[ "200", "200", "200", "200", "500"],
|
||||
),
|
||||
(
|
||||
"asc_endpoint_limit_2",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
2,
|
||||
2,
|
||||
[ "/checkout", "/coupon"],
|
||||
[ "200", "200"],
|
||||
),
|
||||
(
|
||||
"asc_endpoint_status_code_limit_2",
|
||||
[build_order_by("endpoint", "asc"), build_order_by("status_code", "asc")],
|
||||
2,
|
||||
2,
|
||||
[ "/checkout", "/coupon"],
|
||||
[ "200", "200"],
|
||||
),
|
||||
(
|
||||
"asc_status_code_endpoint_limit_4",
|
||||
[build_order_by("status_code", "asc"), build_order_by("endpoint", "asc")],
|
||||
4,
|
||||
4,
|
||||
[ "/checkout", "/coupon", "/health", "/orders"],
|
||||
[ "200", "200", "200", "200"],
|
||||
),
|
||||
(
|
||||
"desc_endpoint",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
None,
|
||||
5,
|
||||
["/orders", "/health", "/coupon", "/coupon", "/checkout"],
|
||||
[ "200", "200", "200", "500", "200"],
|
||||
),
|
||||
(
|
||||
"desc_endpoint_status_code",
|
||||
[build_order_by("endpoint", "desc"), build_order_by("status_code", "desc")],
|
||||
None,
|
||||
5,
|
||||
["/orders", "/health", "/coupon", "/coupon", "/checkout"],
|
||||
[ "200", "200", "500", "200", "200"],
|
||||
),
|
||||
(
|
||||
"desc_status_code_endpoint",
|
||||
[build_order_by("status_code", "desc"), build_order_by("endpoint", "desc")],
|
||||
None,
|
||||
5,
|
||||
["/coupon", "/orders", "/health", "/coupon", "/checkout"],
|
||||
[ "500", "200", "200", "200", "200"],
|
||||
),
|
||||
(
|
||||
"desc_endpoint_limit2",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
3,
|
||||
3,
|
||||
["/orders", "/health", "/coupon"],
|
||||
[ "200", "200", "200"],
|
||||
),
|
||||
(
|
||||
"desc_endpoint_status_code_limit3",
|
||||
[build_order_by("endpoint", "desc"), build_order_by("status_code", "desc")],
|
||||
3,
|
||||
3,
|
||||
["/orders", "/health", "/coupon"],
|
||||
[ "200", "200", "500"],
|
||||
),
|
||||
(
|
||||
"desc_status_code_endpoint_limit2",
|
||||
[build_order_by("status_code", "desc"), build_order_by("endpoint", "desc")],
|
||||
2,
|
||||
2,
|
||||
["/coupon", "/orders"],
|
||||
[ "500", "200"],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_histogram_percentile_group_by_endpoint_and_status_code(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
order_suffix: str,
|
||||
order_by: Optional[List],
|
||||
limit: Optional[int],
|
||||
expected_count: int,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
expected_status_codes: Union[set, List[str]],
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = f"test_histogram_p75_groupby_{order_suffix}"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
FILE_WITH_MANY_GROUPS,
|
||||
base_time=now - timedelta(minutes=60),
|
||||
metric_name_override=metric_name,
|
||||
)
|
||||
insert_metrics(metrics)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
query_p75 = build_builder_query(
|
||||
"A",
|
||||
metric_name,
|
||||
"doesnotreallymatter",
|
||||
"p75",
|
||||
group_by=["endpoint", "status_code"],
|
||||
order_by=order_by,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query_p75])
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
data = response.json()
|
||||
p75_series = get_all_series(data, "A")
|
||||
|
||||
for series in p75_series:
|
||||
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
|
||||
status_code = series.get("labels", [{}])[1].get("value", "unknown")
|
||||
|
||||
assert (
|
||||
len(p75_series) == expected_count
|
||||
), f"Expected {expected_count} p75 series, got {len(p75_series)}"
|
||||
|
||||
_assert_series_endpoint_labels(p75_series, expected_endpoints, "p75")
|
||||
|
||||
endpoints = [s.get("labels", [{}])[0].get("value", "unknown") for s in p75_series]
|
||||
assert endpoints == expected_endpoints, (
|
||||
f"Expected p75 endpoints in order {expected_endpoints}, got {endpoints}"
|
||||
)
|
||||
status_codes = [s.get("labels", [{}])[1].get("value", "unknown") for s in p75_series]
|
||||
assert status_codes == expected_status_codes, (
|
||||
f"Expected p75 endpoints in order {expected_status_codes}, got {status_codes}"
|
||||
)
|
||||
|
||||
p75_values = {}
|
||||
for series in p75_series:
|
||||
endpoint = series.get("labels", [{}])[0].get("value", "unknown")
|
||||
status_code = series.get("labels", [{}])[1].get("value", "unknown")
|
||||
p75_values[endpoint+status_code] = sorted(
|
||||
series.get("values", []), key=lambda x: x["timestamp"]
|
||||
)
|
||||
|
||||
for endpoint, values in p75_values.items():
|
||||
for v in values:
|
||||
assert v["value"] >= 0, f"p75 for {endpoint} should not be negative: {v['value']}"
|
||||
|
||||
# /health (cumulative, service=api)
|
||||
if "/health200" in p75_values:
|
||||
vals = p75_values["/health200"]
|
||||
assert vals[0]["value"] == 6000, f"Expected /health p75 first=6000, got {vals[0]['value']}"
|
||||
assert vals[-1]["value"] == 6000, f"Expected /health p75 last=6000, got {vals[-1]['value']}"
|
||||
|
||||
# /orders (cumulative, service=api): same distribution as /health
|
||||
if "/orders200" in p75_values:
|
||||
vals = p75_values["/orders200"]
|
||||
assert vals[0]["value"] == 4500, f"Expected /orders p75 first=4500, got {vals[0]['value']}"
|
||||
assert vals[-1]["value"] == 4500, f"Expected /orders p75 last=4500, got {vals[-1]['value']}"
|
||||
|
||||
# /checkout (delta, service=web): 60 points
|
||||
if "/checkout200" in p75_values:
|
||||
vals = p75_values["/checkout200"]
|
||||
assert vals[0]["value"] == 6000, f"Expected /checkout p75 zeroth=6000, got {vals[0]['value']}"
|
||||
assert vals[1]["value"] == 6000, f"Expected /checkout p75 first=6000, got {vals[1]['value']}"
|
||||
assert vals[-1]["value"] == 6000, f"Expected /checkout p75 last=6000, got {vals[-1]['value']}"
|
||||
|
||||
# /coupon (delta, service=web): 60 points
|
||||
if "/coupon200" in p75_values:
|
||||
vals = p75_values["/coupon200"]
|
||||
assert vals[0]["value"] == 1250, f"Expected /coupon200 p75 zeroth=1250, got {vals[0]['value']}"
|
||||
assert vals[1]["value"] == 1250, f"Expected /coupon200 p75 first=1250, got {vals[1]['value']}"
|
||||
assert vals[-1]["value"] == 1250, f"Expected /coupon200 p75 last=1250, got {vals[-1]['value']}"
|
||||
|
||||
if "/coupon500" in p75_values:
|
||||
vals = p75_values["/coupon500"]
|
||||
assert vals[0]["value"] == 750, f"Expected /coupon500 p75 zeroth=750, got {vals[0]['value']}"
|
||||
assert vals[1]["value"] == 750, f"Expected /coupon500 p75 first=750, got {vals[1]['value']}"
|
||||
assert vals[-1]["value"] == 750, f"Expected /coupon500 p75 last=750, got {vals[-1]['value']}"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Callable, List
|
||||
from typing import Callable, List, Optional, Union
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -9,6 +9,8 @@ from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.metrics import Metrics
|
||||
from fixtures.querier import (
|
||||
build_builder_query,
|
||||
build_order_by,
|
||||
get_all_series,
|
||||
get_series_values,
|
||||
make_query_request,
|
||||
)
|
||||
@@ -139,3 +141,137 @@ def test_for_multiple_aggregations(
|
||||
assert result_values[19]["value"] == twentieth_min_val
|
||||
assert result_values[20]["value"] == twenty_first_min_val
|
||||
assert result_values[30]["value"] == thirty_first_min_val
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"metric_suffix,order_by,limit,expected_count,expected_services",
|
||||
[
|
||||
(
|
||||
"no_order",
|
||||
None, # default ordering: desc by avg of all metric values for a group
|
||||
None,
|
||||
3,
|
||||
["lab", "web", "api"], # sum of all values: lab=42000, api=36000, web=34000. avg of all sums: lab=700, api=600, web=680
|
||||
),
|
||||
(
|
||||
"only_limit",
|
||||
None,
|
||||
2,
|
||||
2,
|
||||
["lab", "web"], # top 2 by default desc: lab=42000, api=36000
|
||||
),
|
||||
(
|
||||
"asc",
|
||||
[build_order_by("service", "asc")],
|
||||
None,
|
||||
3,
|
||||
["api", "lab", "web"],
|
||||
),
|
||||
(
|
||||
"asc_lim2",
|
||||
[build_order_by("service", "asc")],
|
||||
2,
|
||||
2,
|
||||
["api", "lab"],
|
||||
),
|
||||
(
|
||||
"desc",
|
||||
[build_order_by("service", "desc")],
|
||||
None,
|
||||
3,
|
||||
["web", "lab", "api"],
|
||||
),
|
||||
(
|
||||
"desc_lim2",
|
||||
[build_order_by("service", "desc")],
|
||||
2,
|
||||
2,
|
||||
["web", "lab"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name",
|
||||
[build_order_by("sum(test_gauge_groupby_asc_metric_name)", "asc")],
|
||||
None,
|
||||
3,
|
||||
["api", "web", "lab"],
|
||||
),
|
||||
(
|
||||
"asc_metric_name_lim2",
|
||||
[build_order_by("sum(test_gauge_groupby_asc_metric_name_lim2)", "asc")],
|
||||
2,
|
||||
2,
|
||||
["api", "web"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name",
|
||||
[build_order_by("sum(test_gauge_groupby_desc_metric_name)", "desc")],
|
||||
None,
|
||||
3,
|
||||
["lab", "web", "api"],
|
||||
),
|
||||
(
|
||||
"desc_metric_name_lim2",
|
||||
[build_order_by("sum(test_gauge_groupby_desc_metric_name_lim2)", "desc")],
|
||||
2,
|
||||
2,
|
||||
["lab", "web"],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_gauge_group_by_service(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
metric_suffix: str,
|
||||
order_by: Optional[List],
|
||||
limit: Optional[int],
|
||||
expected_count: int,
|
||||
expected_services: Union[set, List[str]],
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = f"test_gauge_groupby_{metric_suffix}"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
FILE,
|
||||
base_time=now - timedelta(minutes=60),
|
||||
metric_name_override=metric_name,
|
||||
)
|
||||
insert_metrics(metrics)
|
||||
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
query = build_builder_query(
|
||||
"A",
|
||||
metric_name,
|
||||
"max",
|
||||
"sum",
|
||||
group_by=["service"],
|
||||
order_by=order_by,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query])
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
||||
data = response.json()
|
||||
all_series = get_all_series(data, "A")
|
||||
|
||||
assert (
|
||||
len(all_series) == expected_count
|
||||
), f"Expected {expected_count} series, got {len(all_series)}"
|
||||
|
||||
service_labels = [
|
||||
series.get("labels", [{}])[0].get("value", "unknown")
|
||||
for series in all_series
|
||||
]
|
||||
|
||||
if isinstance(expected_services, set):
|
||||
assert (
|
||||
set(service_labels) == expected_services
|
||||
), f"Expected services {expected_services}, got {set(service_labels)}"
|
||||
else:
|
||||
assert service_labels == expected_services, (
|
||||
f"Expected services {expected_services}, got {service_labels}"
|
||||
)
|
||||
|
||||
@@ -5,13 +5,16 @@ Look at the delta_counters_1h.jsonl file for the relevant data
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from http import HTTPStatus
|
||||
from typing import Any, Callable, List
|
||||
from typing import Any, Callable, List, Optional, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.metrics import Metrics
|
||||
from fixtures.querier import (
|
||||
build_builder_query,
|
||||
build_order_by,
|
||||
get_all_series,
|
||||
get_series_values,
|
||||
make_query_request,
|
||||
@@ -69,16 +72,61 @@ def test_rate_with_steady_values_and_reset(
|
||||
assert v["value"] >= 0, f"Rate should not be negative: {v['value']}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"order_suffix,order_by,limit,expected_count,expected_endpoints",
|
||||
[
|
||||
(
|
||||
"no_order",
|
||||
None,
|
||||
None,
|
||||
5,
|
||||
{"/products", "/health", "/checkout", "/orders", "/users"},
|
||||
),
|
||||
(
|
||||
"asc",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
None,
|
||||
5,
|
||||
["/checkout", "/health", "/orders", "/products", "/users"],
|
||||
),
|
||||
(
|
||||
"asc_lim3",
|
||||
[build_order_by("endpoint", "asc")],
|
||||
3,
|
||||
3,
|
||||
["/checkout", "/health", "/orders"],
|
||||
),
|
||||
(
|
||||
"desc",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
None,
|
||||
5,
|
||||
["/users", "/products", "/orders", "/health", "/checkout"],
|
||||
),
|
||||
(
|
||||
"desc_lim3",
|
||||
[build_order_by("endpoint", "desc")],
|
||||
3,
|
||||
3,
|
||||
["/users", "/products", "/orders"],
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_rate_group_by_endpoint(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_metrics: Callable[[List[Metrics]], None],
|
||||
order_suffix: str,
|
||||
order_by: Optional[List],
|
||||
limit: Optional[int],
|
||||
expected_count: int,
|
||||
expected_endpoints: Union[set, List[str]],
|
||||
) -> None:
|
||||
now = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0)
|
||||
start_ms = int((now - timedelta(minutes=65)).timestamp() * 1000)
|
||||
end_ms = int(now.timestamp() * 1000)
|
||||
metric_name = "test_rate_groupby"
|
||||
metric_name = f"test_rate_groupby_{order_suffix}"
|
||||
|
||||
metrics = Metrics.load_from_file(
|
||||
DELTA_COUNTERS_FILE,
|
||||
@@ -94,6 +142,8 @@ def test_rate_group_by_endpoint(
|
||||
"rate",
|
||||
"sum",
|
||||
group_by=["endpoint"],
|
||||
order_by=order_by,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
response = make_query_request(signoz, token, start_ms, end_ms, [query])
|
||||
@@ -102,10 +152,23 @@ def test_rate_group_by_endpoint(
|
||||
data = response.json()
|
||||
all_series = get_all_series(data, "A")
|
||||
|
||||
# Should have 5 different endpoints
|
||||
assert (
|
||||
len(all_series) == 5
|
||||
), f"Expected 5 series for 5 endpoints, got {len(all_series)}"
|
||||
len(all_series) == expected_count
|
||||
), f"Expected {expected_count} series, got {len(all_series)}"
|
||||
|
||||
endpoint_labels = [
|
||||
series.get("labels", [{}])[0].get("value", "unknown")
|
||||
for series in all_series
|
||||
]
|
||||
|
||||
if isinstance(expected_endpoints, set):
|
||||
assert (
|
||||
set(endpoint_labels) == expected_endpoints
|
||||
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_labels)}"
|
||||
else:
|
||||
assert endpoint_labels == expected_endpoints, (
|
||||
f"Expected endpoints {expected_endpoints}, got {endpoint_labels}"
|
||||
)
|
||||
|
||||
# endpoint -> values
|
||||
endpoint_values = {}
|
||||
@@ -114,11 +177,6 @@ def test_rate_group_by_endpoint(
|
||||
values = sorted(series.get("values", []), key=lambda x: x["timestamp"])
|
||||
endpoint_values[endpoint] = values
|
||||
|
||||
expected_endpoints = {"/products", "/health", "/checkout", "/orders", "/users"}
|
||||
assert (
|
||||
set(endpoint_values.keys()) == expected_endpoints
|
||||
), f"Expected endpoints {expected_endpoints}, got {set(endpoint_values.keys())}"
|
||||
|
||||
# at no point rate should be negative
|
||||
for endpoint, values in endpoint_values.items():
|
||||
for v in values:
|
||||
@@ -128,93 +186,95 @@ def test_rate_group_by_endpoint(
|
||||
|
||||
# /health: 60 data points (t01-t60), steady +10/min
|
||||
# rate = 10/60 = 0.167
|
||||
health_values = endpoint_values["/health"]
|
||||
assert (
|
||||
len(health_values) == 60
|
||||
), f"Expected 60 values for /health, got {len(health_values)}"
|
||||
count_steady_health = sum(1 for v in health_values if v["value"] == 0.167)
|
||||
assert (
|
||||
count_steady_health == 60
|
||||
), f"Expected == 60 steady rate values (0.167) for /health, got {count_steady_health}"
|
||||
# all /health rates should be 0.167 except possibly first/last due to boundaries
|
||||
for v in health_values[1:-1]:
|
||||
assert v["value"] == 0.167, f"Expected /health rate 0.167, got {v['value']}"
|
||||
if "/health" in endpoint_values:
|
||||
health_values = endpoint_values["/health"]
|
||||
assert (
|
||||
len(health_values) == 60
|
||||
), f"Expected 60 values for /health, got {len(health_values)}"
|
||||
count_steady_health = sum(1 for v in health_values if v["value"] == 0.167)
|
||||
assert (
|
||||
count_steady_health == 60
|
||||
), f"Expected == 60 steady rate values (0.167) for /health, got {count_steady_health}"
|
||||
# all /health rates should be 0.167 except possibly first/last due to boundaries
|
||||
for v in health_values[1:-1]:
|
||||
assert v["value"] == 0.167, f"Expected /health rate 0.167, got {v['value']}"
|
||||
|
||||
# /products: 51 data points with 10-minute gap (t20-t29 missing), steady +20/min
|
||||
# rate = 20/60 = 0.333, gap causes lower averaged rate at boundary
|
||||
products_values = endpoint_values["/products"]
|
||||
assert (
|
||||
len(products_values) == 51
|
||||
), f"Expected 51 values for /products, got {len(products_values)}"
|
||||
count_steady_products = sum(1 for v in products_values if v["value"] == 0.333)
|
||||
|
||||
assert (
|
||||
count_steady_products == 51
|
||||
), f"Expected 51 steady rate values (0.333) for /products, got {count_steady_products}"
|
||||
if "/products" in endpoint_values:
|
||||
products_values = endpoint_values["/products"]
|
||||
assert (
|
||||
len(products_values) == 51
|
||||
), f"Expected 51 values for /products, got {len(products_values)}"
|
||||
count_steady_products = sum(1 for v in products_values if v["value"] == 0.333)
|
||||
assert (
|
||||
count_steady_products == 51
|
||||
), f"Expected 51 steady rate values (0.333) for /products, got {count_steady_products}"
|
||||
|
||||
# /checkout: 61 data points (t00-t60), +1/min normal, +50/min spike at t40-t44
|
||||
# normal rate = 1/60 = 0.0167, spike rate = 50/60 = 0.833
|
||||
checkout_values = endpoint_values["/checkout"]
|
||||
assert (
|
||||
len(checkout_values) == 61
|
||||
), f"Expected 61 values for /checkout, got {len(checkout_values)}"
|
||||
count_steady_checkout = sum(1 for v in checkout_values if v["value"] == 0.0167)
|
||||
assert (
|
||||
count_steady_checkout == 56
|
||||
), f"Expected 56 steady rate values (0.0167) for /checkout, got {count_steady_checkout}"
|
||||
# check that spike values exist (traffic spike +50/min at t40-t44)
|
||||
count_spike_checkout = sum(1 for v in checkout_values if v["value"] == 0.833)
|
||||
assert (
|
||||
count_spike_checkout == 5
|
||||
), f"Expected 5 spike rate values (0.833) for /checkout, got {count_spike_checkout}"
|
||||
|
||||
# spike values should be consecutive
|
||||
spike_indices = [
|
||||
i for i, v in enumerate[Any](checkout_values) if v["value"] == 0.833
|
||||
]
|
||||
assert len(spike_indices) == 5, f"Expected 5 spike indices, got {spike_indices}"
|
||||
# consecutiveness
|
||||
for i in range(1, len(spike_indices)):
|
||||
if "/checkout" in endpoint_values:
|
||||
checkout_values = endpoint_values["/checkout"]
|
||||
assert (
|
||||
spike_indices[i] == spike_indices[i - 1] + 1
|
||||
), f"Spike indices should be consecutive, got {spike_indices}"
|
||||
len(checkout_values) == 61
|
||||
), f"Expected 61 values for /checkout, got {len(checkout_values)}"
|
||||
count_steady_checkout = sum(1 for v in checkout_values if v["value"] == 0.0167)
|
||||
assert (
|
||||
count_steady_checkout == 56
|
||||
), f"Expected 56 steady rate values (0.0167) for /checkout, got {count_steady_checkout}"
|
||||
# check that spike values exist (traffic spike +50/min at t40-t44)
|
||||
count_spike_checkout = sum(1 for v in checkout_values if v["value"] == 0.833)
|
||||
assert (
|
||||
count_spike_checkout == 5
|
||||
), f"Expected 5 spike rate values (0.833) for /checkout, got {count_spike_checkout}"
|
||||
# spike values should be consecutive
|
||||
spike_indices = [
|
||||
i for i, v in enumerate[Any](checkout_values) if v["value"] == 0.833
|
||||
]
|
||||
assert len(spike_indices) == 5, f"Expected 5 spike indices, got {spike_indices}"
|
||||
for i in range(1, len(spike_indices)):
|
||||
assert (
|
||||
spike_indices[i] == spike_indices[i - 1] + 1
|
||||
), f"Spike indices should be consecutive, got {spike_indices}"
|
||||
|
||||
# /orders: 60 data points (t00-t60) with gap at t30, counter reset at t31 (150->2)
|
||||
# rate = 5/60 = 0.0833
|
||||
# reset at t31 causes: rate at t30 includes gap (lower), t31 has high rate after reset
|
||||
orders_values = endpoint_values["/orders"]
|
||||
assert (
|
||||
len(orders_values) == 60
|
||||
), f"Expected 59 values for /orders, got {len(orders_values)}"
|
||||
count_steady_orders = sum(1 for v in orders_values if v["value"] == 0.0833)
|
||||
assert (
|
||||
count_steady_orders == 58
|
||||
), f"Expected 58 steady rate values (0.0833) for /orders, got {count_steady_orders}"
|
||||
# check for counter reset effects - there should be some non-standard values
|
||||
non_standard_orders = [v["value"] for v in orders_values if v["value"] != 0.0833]
|
||||
assert (
|
||||
len(non_standard_orders) == 2
|
||||
), f"Expected 2 non-standard values due to counter reset, got {non_standard_orders}"
|
||||
# post-reset value should be higher (new counter value / interval)
|
||||
high_rate_orders = [v for v in non_standard_orders if v > 0.0833]
|
||||
assert (
|
||||
len(high_rate_orders) == 1
|
||||
), f"Expected one high rate value after counter reset, got {non_standard_orders}"
|
||||
if "/orders" in endpoint_values:
|
||||
orders_values = endpoint_values["/orders"]
|
||||
assert (
|
||||
len(orders_values) == 60
|
||||
), f"Expected 60 values for /orders, got {len(orders_values)}"
|
||||
count_steady_orders = sum(1 for v in orders_values if v["value"] == 0.0833)
|
||||
assert (
|
||||
count_steady_orders == 58
|
||||
), f"Expected 58 steady rate values (0.0833) for /orders, got {count_steady_orders}"
|
||||
# check for counter reset effects - there should be some non-standard values
|
||||
non_standard_orders = [v["value"] for v in orders_values if v["value"] != 0.0833]
|
||||
assert (
|
||||
len(non_standard_orders) == 2
|
||||
), f"Expected 2 non-standard values due to counter reset, got {non_standard_orders}"
|
||||
# post-reset value should be higher (new counter value / interval)
|
||||
high_rate_orders = [v for v in non_standard_orders if v > 0.0833]
|
||||
assert (
|
||||
len(high_rate_orders) == 1
|
||||
), f"Expected one high rate value after counter reset, got {non_standard_orders}"
|
||||
|
||||
# /users: 56 data points (t05-t60), sparse +1 every 5 minutes (12 of them)
|
||||
# Rate = 1/60 = 0.0167 during increment, 0 during flat periods
|
||||
users_values = endpoint_values["/users"]
|
||||
assert (
|
||||
len(users_values) == 56
|
||||
), f"Expected 56 values for /users, got {len(users_values)}"
|
||||
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
|
||||
# most values should be 0 (flat periods between increments)
|
||||
assert (
|
||||
count_zero_users == 44
|
||||
), f"Expected 44 zero rate values for /users (sparse data), got {count_zero_users}"
|
||||
# non-zero values should be 0.0167 (1/60 increment rate)
|
||||
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
|
||||
count_increment_rate = sum(1 for v in non_zero_users if v == 0.0167)
|
||||
assert (
|
||||
count_increment_rate == 12
|
||||
), f"Expected 12 increment rate values (0.0167) for /users, got {count_increment_rate}"
|
||||
if "/users" in endpoint_values:
|
||||
users_values = endpoint_values["/users"]
|
||||
assert (
|
||||
len(users_values) == 56
|
||||
), f"Expected 56 values for /users, got {len(users_values)}"
|
||||
count_zero_users = sum(1 for v in users_values if v["value"] == 0)
|
||||
# most values should be 0 (flat periods between increments)
|
||||
assert (
|
||||
count_zero_users == 44
|
||||
), f"Expected 44 zero rate values for /users (sparse data), got {count_zero_users}"
|
||||
# non-zero values should be 0.0167 (1/60 increment rate)
|
||||
non_zero_users = [v["value"] for v in users_values if v["value"] != 0]
|
||||
count_increment_rate = sum(1 for v in non_zero_users if v == 0.0167)
|
||||
assert (
|
||||
count_increment_rate == 12
|
||||
), f"Expected 12 increment rate values (0.0167) for /users, got {count_increment_rate}"
|
||||
|
||||
2400
tests/integration/testdata/histogram_data_1h_many_groups.jsonl
vendored
Normal file
2400
tests/integration/testdata/histogram_data_1h_many_groups.jsonl
vendored
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user