Compare commits

...

9 Commits

Author SHA1 Message Date
Srikanth Chekuri
deb09219f4 Merge branch 'main' into add-additional-validations 2026-04-04 16:20:01 +05:30
Srikanth Chekuri
b5b89bb678 fix: several panic errors (#10817)
* fix: several panic errors

* fix: add recover for dashboard sql
2026-04-04 10:38:42 +00:00
Pandey
b5eab118dd refactor: move and internalize resource filter statement builder (#10824)
* refactor: move resourcefilter to pkg/telemetryresourcefilter

Move pkg/querybuilder/resourcefilter to pkg/telemetryresourcefilter
to align with the existing telemetry package naming convention
(telemetrylogs, telemetrytraces, telemetrymetrics, telemetrymeter).
The resource filter is a statement builder, not a query builder utility.

* refactor: internalize resource filter construction in statement builders

Each telemetry statement builder (logs, traces) now creates its own
resource filter internally instead of receiving it as an injected
dependency. This makes it impossible to wire the wrong resource table
and simplifies the provider.

Delete telemetryresourcefilter/tables.go — each telemetry package now
owns its resource table constant (LogsResourceV2TableName in
telemetrylogs, TracesResourceV3TableName in telemetrytraces).

* refactor: create field mapper and condition builder inside resource filter New

Remove fieldMapper and conditionBuilder params from
telemetryresourcefilter.New — they are always the same
(NewFieldMapper + NewConditionBuilder) so create them internally.
2026-04-04 10:27:46 +00:00
Vishal Sharma
98d0bdfe49 chore(frontend): sync pylon chat widget theme with app theme (#10830)
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
2026-04-04 09:29:08 +00:00
Srikanth Chekuri
9fe237215f Merge branch 'main' into add-additional-validations 2026-04-04 12:28:04 +05:30
Vikrant Gupta
a71006b662 chore(user): add partial index for email,org_id (#10828)
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
* chore(user): add partial index for email,org_id

* chore(user): fix integration test fmt

* chore(user): fix integration test lint
2026-04-03 20:27:04 +00:00
Vishal Sharma
769e36ec84 feat(frontend): add new onboarding datasource entries (#10829) 2026-04-03 20:13:08 +00:00
srikanthccv
30de6f545e chore: address reivew comments 2026-04-03 11:20:34 +05:30
srikanthccv
f698ac9a21 chore: add schema version specific validations 2026-04-02 15:05:55 +05:30
51 changed files with 3025 additions and 587 deletions

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"log/slog"
"strings"
"sync"
"time"
@@ -64,12 +63,12 @@ func NewAnomalyRule(
BaseRule: baseRule,
}
switch strings.ToLower(p.RuleCondition.Seasonality) {
case "hourly":
switch p.RuleCondition.Seasonality {
case ruletypes.SeasonalityHourly:
t.seasonality = anomaly.SeasonalityHourly
case "daily":
case ruletypes.SeasonalityDaily:
t.seasonality = anomaly.SeasonalityDaily
case "weekly":
case ruletypes.SeasonalityWeekly:
t.seasonality = anomaly.SeasonalityWeekly
default:
t.seasonality = anomaly.SeasonalityDaily

View File

@@ -67,7 +67,7 @@ func TestAnomalyRule_NoData_AlertOnAbsent(t *testing.T) {
}},
},
SelectedQuery: "A",
Seasonality: "daily",
Seasonality: ruletypes.SeasonalityDaily,
Thresholds: &ruletypes.RuleThresholdData{
Kind: ruletypes.BasicThresholdKind,
Spec: ruletypes.BasicRuleThresholds{{
@@ -170,7 +170,7 @@ func TestAnomalyRule_NoData_AbsentFor(t *testing.T) {
}},
},
SelectedQuery: "A",
Seasonality: "daily",
Seasonality: ruletypes.SeasonalityDaily,
Thresholds: &ruletypes.RuleThresholdData{
Kind: ruletypes.BasicThresholdKind,
Spec: ruletypes.BasicRuleThresholds{{

View File

@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" fill="#1eb4d4" viewBox="0 0 24 24"><title>Hasura</title><path d="M23.558 8.172c.707-2.152.282-6.447-1.09-8.032a.42.42 0 0 0-.664.051l-1.69 2.59a1.32 1.32 0 0 1-1.737.276C16.544 1.885 14.354 1.204 12 1.204s-4.544.68-6.378 1.853a1.326 1.326 0 0 1-1.736-.276L2.196.191A.42.42 0 0 0 1.532.14C.16 1.728-.265 6.023.442 8.172c.236.716.3 1.472.16 2.207-.137.73-.276 1.61-.276 2.223C.326 18.898 5.553 24 11.997 24c6.447 0 11.671-5.105 11.671-11.398 0-.613-.138-1.494-.276-2.223a4.47 4.47 0 0 1 .166-2.207m-11.56 13.284c-4.984 0-9.036-3.96-9.036-8.827q0-.239.014-.473c.18-3.316 2.243-6.15 5.16-7.5 1.17-.546 2.481-.848 3.864-.848s2.69.302 3.864.85c2.917 1.351 4.98 4.187 5.16 7.501q.013.236.014.473c-.003 4.864-4.057 8.824-9.04 8.824m3.915-5.43-2.31-3.91-1.98-3.26a.26.26 0 0 0-.223-.125H9.508a.26.26 0 0 0-.227.13.25.25 0 0 0 .003.254l1.895 3.109-2.542 3.787a.25.25 0 0 0-.011.259.26.26 0 0 0 .23.132h1.905a.26.26 0 0 0 .218-.116l1.375-2.096 1.233 2.088a.26.26 0 0 0 .224.127h1.878c.094 0 .18-.049.224-.127a.24.24 0 0 0 0-.251z"/></svg>

After

Width:  |  Height:  |  Size: 1.0 KiB

View File

@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" fill="#ea4b71" viewBox="0 0 24 24"><title>n8n</title><path d="M21.474 5.684a2.53 2.53 0 0 0-2.447 1.895H16.13a2.526 2.526 0 0 0-2.492 2.11l-.103.624a1.26 1.26 0 0 1-1.246 1.055h-1.001a2.527 2.527 0 0 0-4.893 0H4.973a2.527 2.527 0 1 0 0 1.264h1.422a2.527 2.527 0 0 0 4.894 0h1a1.26 1.26 0 0 1 1.247 1.055l.103.623a2.526 2.526 0 0 0 2.492 2.111h.37a2.527 2.527 0 1 0 0-1.263h-.37a1.26 1.26 0 0 1-1.246-1.056l-.103-.623A2.52 2.52 0 0 0 13.96 12a2.52 2.52 0 0 0 .82-1.48l.104-.622a1.26 1.26 0 0 1 1.246-1.056h2.896a2.527 2.527 0 1 0 2.447-3.158m0 1.263a1.263 1.263 0 0 1 1.263 1.263 1.263 1.263 0 0 1-1.263 1.264A1.263 1.263 0 0 1 20.21 8.21a1.263 1.263 0 0 1 1.264-1.263m-18.948 3.79A1.263 1.263 0 0 1 3.79 12a1.263 1.263 0 0 1-1.264 1.263A1.263 1.263 0 0 1 1.263 12a1.263 1.263 0 0 1 1.263-1.263m6.316 0A1.263 1.263 0 0 1 10.105 12a1.263 1.263 0 0 1-1.263 1.263A1.263 1.263 0 0 1 7.58 12a1.263 1.263 0 0 1 1.263-1.263m10.106 3.79a1.263 1.263 0 0 1 1.263 1.263 1.263 1.263 0 0 1-1.263 1.263 1.263 1.263 0 0 1-1.264-1.263 1.263 1.263 0 0 1 1.263-1.264"/></svg>

After

Width:  |  Height:  |  Size: 1.1 KiB

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 5.8 KiB

View File

@@ -18,7 +18,7 @@ import AppLayout from 'container/AppLayout';
import Hex from 'crypto-js/enc-hex';
import HmacSHA256 from 'crypto-js/hmac-sha256';
import { KeyboardHotkeysProvider } from 'hooks/hotkeys/useKeyboardHotkeys';
import { useThemeConfig } from 'hooks/useDarkMode';
import { useIsDarkMode, useThemeConfig } from 'hooks/useDarkMode';
import { useGetTenantLicense } from 'hooks/useGetTenantLicense';
import { NotificationProvider } from 'hooks/useNotifications';
import { ResourceProvider } from 'hooks/useResourceAttribute';
@@ -212,6 +212,12 @@ function App(): JSX.Element {
activeLicenseFetchError,
]);
const isDarkMode = useIsDarkMode();
useEffect(() => {
window.Pylon?.('setTheme', isDarkMode ? 'dark' : 'light');
}, [isDarkMode]);
useEffect(() => {
if (
pathname === ROUTES.ONBOARDING ||

View File

@@ -5,7 +5,7 @@
"tags": [
"quickstart"
],
"module": "apm",
"module": "home",
"relatedSearchKeywords": [
"apm",
"application performance monitoring",
@@ -22,6 +22,28 @@
"imgUrl": "/Logos/quickstart.svg",
"link": "/docs/cloud/quickstart/"
},
{
"dataSource": "signoz-mcp-server",
"label": "SigNoz MCP Server",
"tags": [
"quickstart"
],
"module": "home",
"relatedSearchKeywords": [
"agent",
"ai",
"mcp",
"mcp server",
"model context protocol",
"quickstart",
"signoz",
"signoz mcp",
"signoz mcp server",
"setup"
],
"imgUrl": "/Logos/signoz-brand-logo.svg",
"link": "/docs/ai/signoz-mcp-server/"
},
{
"dataSource": "migrate-from-datadog",
"label": "From Datadog",
@@ -1524,18 +1546,24 @@
"link": "/docs/userguide/collect_docker_logs/"
},
{
"dataSource": "vercel-logs",
"label": "Vercel logs",
"dataSource": "vercel",
"label": "Vercel",
"imgUrl": "/Logos/vercel.svg",
"tags": [
"apm/traces",
"logs"
],
"module": "logs",
"module": "home",
"relatedSearchKeywords": [
"collect vercel logs",
"logging",
"logs",
"opentelemetry drains",
"trace drain",
"traces",
"tracing",
"vercel",
"vercel drains",
"vercel functions logs",
"vercel log forwarding",
"vercel log monitoring",
@@ -1545,10 +1573,12 @@
"vercel observability",
"vercel opentelemetry integration",
"vercel to otel",
"vercel trace drain",
"vercel traces",
"vercel-logs"
],
"id": "vercel-logs",
"link": "/docs/userguide/vercel_logs_to_signoz/"
"link": "/docs/userguide/vercel-to-signoz/"
},
{
"dataSource": "heroku-logs",
@@ -4029,6 +4059,57 @@
],
"link": "/docs/pydantic-ai-observability/"
},
{
"dataSource": "qwen-observability",
"label": "Qwen",
"imgUrl": "/Logos/qwen.svg",
"tags": [
"LLM Monitoring"
],
"module": "apm",
"relatedSearchKeywords": [
"alibaba cloud",
"dashscope",
"llm",
"llm monitoring",
"monitoring",
"observability",
"otel qwen",
"qwen",
"qwen logs",
"qwen metrics",
"qwen monitoring",
"qwen observability",
"qwen response time",
"qwen traces"
],
"id": "qwen-observability",
"link": "/docs/qwen-observability/"
},
{
"dataSource": "n8n-cloud",
"label": "n8n Cloud",
"imgUrl": "/Logos/n8n.svg",
"tags": [
"LLM Monitoring"
],
"module": "apm",
"relatedSearchKeywords": [
"llm monitoring",
"monitoring",
"n8n",
"n8n cloud",
"n8n monitoring",
"n8n observability",
"n8n traces",
"observability",
"otel n8n",
"workflow monitoring",
"workflow traces"
],
"id": "n8n-cloud",
"link": "/docs/n8n-monitoring/"
},
{
"dataSource": "mastra-monitoring",
"label": "Mastra",
@@ -5158,6 +5239,31 @@
"id": "microsoft-sql-server",
"link": "/docs/integrations/sql-server/"
},
{
"dataSource": "hasura",
"label": "Hasura",
"imgUrl": "/Logos/hasura.svg",
"tags": [
"database"
],
"module": "apm",
"relatedSearchKeywords": [
"database",
"graphql",
"graphql engine",
"hasura",
"hasura graphql",
"hasura logs",
"hasura metrics",
"hasura monitoring",
"hasura observability",
"hasura traces",
"opentelemetry hasura",
"telemetry"
],
"id": "hasura",
"link": "/docs/integrations/opentelemetry-hasura/"
},
{
"dataSource": "supabase",
"label": "Supabase",

View File

@@ -279,7 +279,6 @@ func (store *store) SoftDeleteUser(ctx context.Context, orgID string, id string)
_, err = tx.NewUpdate().
Model(new(types.User)).
Set("status = ?", types.UserStatusDeleted).
Set("deleted_at = ?", now).
Set("updated_at = ?", now).
Where("org_id = ?", orgID).
Where("id = ?", id).

View File

@@ -124,8 +124,12 @@ func (q *querier) postProcessResults(ctx context.Context, results map[string]any
continue
}
stepInterval, err := req.StepIntervalForQuery(name)
if err != nil {
return nil, err
}
funcs := []qbtypes.Function{{Name: qbtypes.FunctionNameFillZero}}
funcs = q.prepareFillZeroArgsWithStep(funcs, req, req.StepIntervalForQuery(name))
funcs = q.prepareFillZeroArgsWithStep(funcs, req, stepInterval)
// empty time series if it doesn't exist
tsData, ok := typedResults[name].Value.(*qbtypes.TimeSeriesData)
if !ok {

View File

@@ -9,7 +9,6 @@ import (
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetadata"
"github.com/SigNoz/signoz/pkg/telemetrymeter"
@@ -73,22 +72,12 @@ func newProvider(
traceFieldMapper := telemetrytraces.NewFieldMapper()
traceConditionBuilder := telemetrytraces.NewConditionBuilder(traceFieldMapper)
resourceFilterFieldMapper := resourcefilter.NewFieldMapper()
resourceFilterConditionBuilder := resourcefilter.NewConditionBuilder(resourceFilterFieldMapper)
resourceFilterStmtBuilder := resourcefilter.NewTraceResourceFilterStatementBuilder(
settings,
resourceFilterFieldMapper,
resourceFilterConditionBuilder,
telemetryMetadataStore,
)
traceAggExprRewriter := querybuilder.NewAggExprRewriter(settings, nil, traceFieldMapper, traceConditionBuilder, nil)
traceStmtBuilder := telemetrytraces.NewTraceQueryStatementBuilder(
settings,
telemetryMetadataStore,
traceFieldMapper,
traceConditionBuilder,
resourceFilterStmtBuilder,
traceAggExprRewriter,
telemetryStore,
)
@@ -99,22 +88,13 @@ func newProvider(
telemetryMetadataStore,
traceFieldMapper,
traceConditionBuilder,
traceStmtBuilder, // Pass the regular trace statement builder
resourceFilterStmtBuilder, // Pass the resource filter statement builder
traceStmtBuilder, // Pass the regular trace statement builder
traceAggExprRewriter,
)
// Create log statement builder
logFieldMapper := telemetrylogs.NewFieldMapper()
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper)
logResourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
settings,
resourceFilterFieldMapper,
resourceFilterConditionBuilder,
telemetryMetadataStore,
telemetrylogs.DefaultFullTextColumn,
telemetrylogs.GetBodyJSONKey,
)
logAggExprRewriter := querybuilder.NewAggExprRewriter(
settings,
telemetrylogs.DefaultFullTextColumn,
@@ -127,7 +107,6 @@ func newProvider(
telemetryMetadataStore,
logFieldMapper,
logConditionBuilder,
logResourceFilterStmtBuilder,
logAggExprRewriter,
telemetrylogs.DefaultFullTextColumn,
telemetrylogs.GetBodyJSONKey,

View File

@@ -14,10 +14,11 @@ func ApplyHavingClause(result []*v3.Result, queryRangeParams *v3.QueryRangeParam
builderQueries := queryRangeParams.CompositeQuery.BuilderQueries
// apply having clause for metrics and formula
if builderQueries != nil &&
(builderQueries[result.QueryName].DataSource == v3.DataSourceMetrics ||
builderQueries[result.QueryName].QueryName != builderQueries[result.QueryName].Expression) {
havingClause := builderQueries[result.QueryName].Having
builderQuery := builderQueries[result.QueryName]
if builderQuery != nil &&
(builderQuery.DataSource == v3.DataSourceMetrics ||
builderQuery.QueryName != builderQuery.Expression) {
havingClause := builderQuery.Having
for i := 0; i < len(result.Series); i++ {
for j := 0; j < len(result.Series[i].Points); j++ {

View File

@@ -312,6 +312,72 @@ func TestApplyHavingCaluse(t *testing.T) {
},
},
},
{
name: "query not in builder queries should not panic",
results: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Points: []v3.Point{
{Value: 1.0},
{Value: 2.0},
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{},
},
},
want: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Points: []v3.Point{
{Value: 1.0},
{Value: 2.0},
},
},
},
},
},
},
{
name: "nil builder queries should not panic",
results: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Points: []v3.Point{
{Value: 1.0},
},
},
},
},
},
params: &v3.QueryRangeParamsV3{
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: nil,
},
},
want: []*v3.Result{
{
QueryName: "A",
Series: []*v3.Series{
{
Points: []v3.Point{
{Value: 1.0},
},
},
},
},
},
},
}
for _, tc := range testCases {

View File

@@ -139,9 +139,6 @@ func WithRuleStateHistoryModule(module rulestatehistory.Module) RuleOption {
}
func NewBaseRule(id string, orgID valuer.UUID, p *ruletypes.PostableRule, opts ...RuleOption) (*BaseRule, error) {
if p.RuleCondition == nil || !p.RuleCondition.IsValid() {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid rule condition")
}
threshold, err := p.RuleCondition.Thresholds.GetRuleThreshold()
if err != nil {
return nil, err

View File

@@ -336,7 +336,9 @@ func (m *Manager) EditRule(ctx context.Context, ruleStr string, id valuer.UUID)
if err != nil {
return err
}
if err := parsedRule.Validate(); err != nil {
return err
}
existingRule, err := m.ruleStore.GetStoredRule(ctx, id)
if err != nil {
return err
@@ -533,7 +535,9 @@ func (m *Manager) CreateRule(ctx context.Context, ruleStr string) (*ruletypes.Ge
if err != nil {
return nil, err
}
if err := parsedRule.Validate(); err != nil {
return nil, err
}
now := time.Now()
storedRule := &ruletypes.Rule{
Identifiable: types.Identifiable{
@@ -920,7 +924,9 @@ func (m *Manager) PatchRule(ctx context.Context, ruleStr string, id valuer.UUID)
m.logger.ErrorContext(ctx, "failed to unmarshal patched rule with given id", slog.String("rule.id", id.StringValue()), errors.Attr(err))
return nil, err
}
if err := storedRule.Validate(); err != nil {
return nil, err
}
// deploy or un-deploy task according to patched (new) rule state
if err := m.syncRuleStateWithTask(ctx, orgID, taskName, &storedRule); err != nil {
m.logger.ErrorContext(ctx, "failed to sync stored rule state with the task", slog.String("task.name", taskName), errors.Attr(err))
@@ -971,6 +977,9 @@ func (m *Manager) TestNotification(ctx context.Context, orgID valuer.UUID, ruleS
if err != nil {
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "failed to unmarshal rule")
}
if err := parsedRule.Validate(); err != nil {
return 0, err
}
if !parsedRule.NotificationSettings.UsePolicy {
parsedRule.NotificationSettings.GroupBy = append(parsedRule.NotificationSettings.GroupBy, ruletypes.LabelThresholdName)
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrymetrics"
"github.com/SigNoz/signoz/pkg/telemetrystore"
@@ -66,19 +65,8 @@ func prepareQuerierForLogs(telemetryStore telemetrystore.TelemetryStore, keysMap
}
metadataStore.KeysMap = keysMap
resourceFilterFieldMapper := resourcefilter.NewFieldMapper()
resourceFilterConditionBuilder := resourcefilter.NewConditionBuilder(resourceFilterFieldMapper)
logFieldMapper := telemetrylogs.NewFieldMapper()
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper)
logResourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
providerSettings,
resourceFilterFieldMapper,
resourceFilterConditionBuilder,
metadataStore,
telemetrylogs.DefaultFullTextColumn,
telemetrylogs.GetBodyJSONKey,
)
logAggExprRewriter := querybuilder.NewAggExprRewriter(
providerSettings,
telemetrylogs.DefaultFullTextColumn,
@@ -91,7 +79,6 @@ func prepareQuerierForLogs(telemetryStore telemetrystore.TelemetryStore, keysMap
metadataStore,
logFieldMapper,
logConditionBuilder,
logResourceFilterStmtBuilder,
logAggExprRewriter,
telemetrylogs.DefaultFullTextColumn,
telemetrylogs.GetBodyJSONKey,
@@ -127,22 +114,12 @@ func prepareQuerierForTraces(telemetryStore telemetrystore.TelemetryStore, keysM
traceFieldMapper := telemetrytraces.NewFieldMapper()
traceConditionBuilder := telemetrytraces.NewConditionBuilder(traceFieldMapper)
resourceFilterFieldMapper := resourcefilter.NewFieldMapper()
resourceFilterConditionBuilder := resourcefilter.NewConditionBuilder(resourceFilterFieldMapper)
resourceFilterStmtBuilder := resourcefilter.NewTraceResourceFilterStatementBuilder(
providerSettings,
resourceFilterFieldMapper,
resourceFilterConditionBuilder,
metadataStore,
)
traceAggExprRewriter := querybuilder.NewAggExprRewriter(providerSettings, nil, traceFieldMapper, traceConditionBuilder, nil)
traceStmtBuilder := telemetrytraces.NewTraceQueryStatementBuilder(
providerSettings,
metadataStore,
traceFieldMapper,
traceConditionBuilder,
resourceFilterStmtBuilder,
traceAggExprRewriter,
telemetryStore,
)

View File

@@ -1,8 +0,0 @@
package resourcefilter
const (
TracesDBName = "signoz_traces"
TraceResourceV3TableName = "distributed_traces_v3_resource"
LogsDBName = "signoz_logs"
LogsResourceV2TableName = "distributed_logs_v2_resource"
)

View File

@@ -104,8 +104,15 @@ func extractCHOriginFieldFromQuery(query string) (string, error) {
return "", errors.NewInternalf(errors.CodeInternal, "failed to parse origin field from query: %s", err.Error())
}
if len(stmts) == 0 {
return "", errors.NewInternalf(errors.CodeInternal, "no statements found in query")
}
// Get the first statement which should be a SELECT
selectStmt := stmts[0].(*parser.SelectQuery)
selectStmt, ok := stmts[0].(*parser.SelectQuery)
if !ok {
return "", errors.NewInternalf(errors.CodeInternal, "expected SELECT query, got %T", stmts[0])
}
// If query has multiple select items, return blank string as we don't expect multiple select items
if len(selectStmt.SelectItems) > 1 {

View File

@@ -2,6 +2,7 @@ package queryparser
import (
"context"
"fmt"
"strings"
@@ -23,7 +24,15 @@ func New(settings factory.ProviderSettings) QueryParser {
}
}
func (p *queryParserImpl) AnalyzeQueryFilter(ctx context.Context, queryType qbtypes.QueryType, query string) (*queryfilterextractor.FilterResult, error) {
func (p *queryParserImpl) AnalyzeQueryFilter(ctx context.Context, queryType qbtypes.QueryType, query string) (result *queryfilterextractor.FilterResult, err error) {
// the third-party clickhouse sql parser can panic on certain inputs, recover gracefully
defer func() {
if r := recover(); r != nil {
result = nil
err = errors.NewInternalf(errors.CodeInternal, "failed to analyze query filter: %s", fmt.Sprint(r))
}
}()
var extractorType queryfilterextractor.ExtractorType
switch queryType {
case qbtypes.QueryTypePromQL:

View File

@@ -194,6 +194,7 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewAddServiceAccountFactory(sqlstore, sqlschema),
sqlmigration.NewDeprecateAPIKeyFactory(sqlstore, sqlschema),
sqlmigration.NewServiceAccountAuthzactory(sqlstore),
sqlmigration.NewDropUserDeletedAtFactory(sqlstore, sqlschema),
)
}

View File

@@ -0,0 +1,84 @@
package sqlmigration
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type dropUserDeletedAt struct {
sqlstore sqlstore.SQLStore
sqlschema sqlschema.SQLSchema
}
func NewDropUserDeletedAtFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("drop_user_deleted_at"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return &dropUserDeletedAt{sqlstore: sqlstore, sqlschema: sqlschema}, nil
})
}
func (migration *dropUserDeletedAt) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *dropUserDeletedAt) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
table, _, err := migration.sqlschema.GetTable(ctx, sqlschema.TableName("users"))
if err != nil {
return err
}
deletedAtColumn := &sqlschema.Column{
Name: sqlschema.ColumnName("deleted_at"),
DataType: sqlschema.DataTypeTimestamp,
Nullable: false,
}
sqls := [][]byte{}
dropIndexSQLs := migration.sqlschema.Operator().DropIndex(&sqlschema.UniqueIndex{TableName: "users", ColumnNames: []sqlschema.ColumnName{"org_id", "email", "deleted_at"}})
sqls = append(sqls, dropIndexSQLs...)
dropSQLs := migration.sqlschema.Operator().DropColumn(table, deletedAtColumn)
sqls = append(sqls, dropSQLs...)
indexSQLs := migration.sqlschema.Operator().CreateIndex(
&sqlschema.PartialUniqueIndex{
TableName: "users",
ColumnNames: []sqlschema.ColumnName{"email", "org_id"},
Where: "status != 'deleted'",
})
sqls = append(sqls, indexSQLs...)
for _, sql := range sqls {
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
return err
}
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (migration *dropUserDeletedAt) Down(context.Context, *bun.DB) error {
return nil
}

View File

@@ -7,7 +7,6 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
@@ -833,25 +832,13 @@ func buildJSONTestStatementBuilder(t *testing.T) *logQueryStatementBuilder {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
resourceFilterFM := resourcefilter.NewFieldMapper()
resourceFilterCB := resourcefilter.NewConditionBuilder(resourceFilterFM)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
resourceFilterFM,
resourceFilterCB,
mockMetadataStore,
DefaultFullTextColumn,
GetBodyJSONKey,
)
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,

View File

@@ -9,6 +9,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetryresourcefilter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
@@ -33,13 +34,22 @@ func NewLogQueryStatementBuilder(
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
aggExprRewriter qbtypes.AggExprRewriter,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *logQueryStatementBuilder {
logsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrylogs")
resourceFilterStmtBuilder := telemetryresourcefilter.New[qbtypes.LogAggregation](
settings,
DBName,
LogsResourceV2TableName,
telemetrytypes.SignalLogs,
metadataStore,
fullTextColumn,
jsonKeyToKey,
)
return &logQueryStatementBuilder{
logger: logsSettings.Logger(),
metadataStore: metadataStore,

View File

@@ -8,35 +8,12 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
)
func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.LogAggregation] {
fm := resourcefilter.NewFieldMapper()
cb := resourcefilter.NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
keysMap := buildCompleteFieldKeyMap(time.Now())
for _, keys := range keysMap {
for _, key := range keys {
key.Signal = telemetrytypes.SignalLogs
}
}
mockMetadataStore.KeysMap = keysMap
return resourcefilter.NewLogResourceFilterStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
fm,
cb,
mockMetadataStore,
DefaultFullTextColumn,
GetBodyJSONKey,
)
}
func TestStatementBuilderTimeSeries(t *testing.T) {
// Create a test release time
@@ -225,14 +202,11 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
@@ -349,14 +323,11 @@ func TestStatementBuilderListQuery(t *testing.T) {
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
@@ -492,14 +463,11 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
@@ -569,14 +537,11 @@ func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) {
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
@@ -665,14 +630,11 @@ func TestStatementBuilderListQueryServiceCollision(t *testing.T) {
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
@@ -890,14 +852,11 @@ func TestAdjustKey(t *testing.T) {
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
@@ -1045,13 +1004,11 @@ func TestStmtBuilderBodyField(t *testing.T) {
mockMetadataStore.KeysMap[field.Name] = append(mockMetadataStore.KeysMap[field.Name], &f)
}
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,
@@ -1135,13 +1092,11 @@ func TestStmtBuilderBodyFullTextSearch(t *testing.T) {
mockMetadataStore.KeysMap[field.Name] = append(mockMetadataStore.KeysMap[field.Name], &f)
}
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewLogQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
GetBodyJSONKey,

View File

@@ -8,6 +8,7 @@ const (
TagAttributesV2LocalTableName = "tag_attributes_v2"
LogAttributeKeysTblName = "distributed_logs_attribute_keys"
LogResourceKeysTblName = "distributed_logs_resource_keys"
LogsResourceV2TableName = "distributed_logs_v2_resource"
PathTypesTableName = "distributed_json_path_types"
PromotedPathsTableName = "distributed_json_promoted_paths"
SkipIndexTableName = "system.data_skipping_indices"

View File

@@ -1,4 +1,4 @@
package resourcefilter
package telemetryresourcefilter
import (
"context"

View File

@@ -1,4 +1,4 @@
package resourcefilter
package telemetryresourcefilter
import (
"context"

View File

@@ -1,4 +1,4 @@
package resourcefilter
package telemetryresourcefilter
import (
"context"

View File

@@ -1,11 +1,10 @@
package resourcefilter
package telemetryresourcefilter
import (
"context"
"fmt"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -13,30 +12,11 @@ import (
"github.com/huandu/go-sqlbuilder"
)
var (
ErrUnsupportedSignal = errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported signal type")
)
// Configuration for different signal types.
type signalConfig struct {
dbName string
tableName string
}
var signalConfigs = map[telemetrytypes.Signal]signalConfig{
telemetrytypes.SignalTraces: {
dbName: TracesDBName,
tableName: TraceResourceV3TableName,
},
telemetrytypes.SignalLogs: {
dbName: LogsDBName,
tableName: LogsResourceV2TableName,
},
}
// Generic resource filter statement builder.
// resourceFilterStatementBuilder builds resource fingerprint filter CTEs.
type resourceFilterStatementBuilder[T any] struct {
logger *slog.Logger
dbName string
tableName string
fieldMapper qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
metadataStore telemetrytypes.MetadataStore
@@ -52,38 +32,26 @@ var (
_ qbtypes.StatementBuilder[qbtypes.LogAggregation] = (*resourceFilterStatementBuilder[qbtypes.LogAggregation])(nil)
)
// NewTraceResourceFilterStatementBuilder creates a new trace resource filter statement builder.
func NewTraceResourceFilterStatementBuilder(
func New[T any](
settings factory.ProviderSettings,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
metadataStore telemetrytypes.MetadataStore,
) *resourceFilterStatementBuilder[qbtypes.TraceAggregation] {
set := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter")
return &resourceFilterStatementBuilder[qbtypes.TraceAggregation]{
logger: set.Logger(),
fieldMapper: fieldMapper,
conditionBuilder: conditionBuilder,
metadataStore: metadataStore,
signal: telemetrytypes.SignalTraces,
}
}
func NewLogResourceFilterStatementBuilder(
settings factory.ProviderSettings,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
dbName string,
tableName string,
signal telemetrytypes.Signal,
metadataStore telemetrytypes.MetadataStore,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *resourceFilterStatementBuilder[qbtypes.LogAggregation] {
set := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter")
return &resourceFilterStatementBuilder[qbtypes.LogAggregation]{
) *resourceFilterStatementBuilder[T] {
set := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetryresourcefilter")
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
return &resourceFilterStatementBuilder[T]{
logger: set.Logger(),
fieldMapper: fieldMapper,
conditionBuilder: conditionBuilder,
dbName: dbName,
tableName: tableName,
fieldMapper: fm,
conditionBuilder: cb,
metadataStore: metadataStore,
signal: telemetrytypes.SignalLogs,
signal: signal,
fullTextColumn: fullTextColumn,
jsonKeyToKey: jsonKeyToKey,
}
@@ -120,14 +88,9 @@ func (b *resourceFilterStatementBuilder[T]) Build(
query qbtypes.QueryBuilderQuery[T],
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
config, exists := signalConfigs[b.signal]
if !exists {
return nil, errors.WrapInvalidInputf(ErrUnsupportedSignal, errors.CodeInvalidInput, "unsupported signal: %s", b.signal)
}
q := sqlbuilder.NewSelectBuilder()
q.Select("fingerprint")
q.From(fmt.Sprintf("%s.%s", config.dbName, config.tableName))
q.From(fmt.Sprintf("%s.%s", b.dbName, b.tableName))
keySelectors := b.getKeySelectors(query)
keys, _, err := b.metadataStore.GetKeysMulti(ctx, keySelectors)

View File

@@ -1,4 +1,4 @@
package resourcefilter
package telemetryresourcefilter
import (
"context"
@@ -367,16 +367,17 @@ func TestResourceFilterStatementBuilder_Traces(t *testing.T) {
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildTestFieldKeyMap(telemetrytypes.SignalTraces)
builder := NewTraceResourceFilterStatementBuilder(
builder := New[qbtypes.TraceAggregation](
instrumentationtest.New().ToProviderSettings(),
fm,
cb,
"signoz_traces",
"distributed_traces_v3_resource",
telemetrytypes.SignalTraces,
mockMetadataStore,
nil,
nil,
)
for _, c := range cases {
@@ -583,15 +584,14 @@ func TestResourceFilterStatementBuilder_Logs(t *testing.T) {
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildTestFieldKeyMap(telemetrytypes.SignalLogs)
builder := NewLogResourceFilterStatementBuilder(
builder := New[qbtypes.LogAggregation](
instrumentationtest.New().ToProviderSettings(),
fm,
cb,
"signoz_logs",
"distributed_logs_v2_resource",
telemetrytypes.SignalLogs,
mockMetadataStore,
nil,
nil,
@@ -645,16 +645,17 @@ func TestResourceFilterStatementBuilder_Variables(t *testing.T) {
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildTestFieldKeyMap(telemetrytypes.SignalTraces)
builder := NewTraceResourceFilterStatementBuilder(
builder := New[qbtypes.TraceAggregation](
instrumentationtest.New().ToProviderSettings(),
fm,
cb,
"signoz_traces",
"distributed_traces_v3_resource",
telemetrytypes.SignalTraces,
mockMetadataStore,
nil,
nil,
)
for _, c := range cases {

View File

@@ -10,6 +10,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetryresourcefilter"
"github.com/SigNoz/signoz/pkg/telemetrystore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
@@ -38,11 +39,21 @@ func NewTraceQueryStatementBuilder(
metadataStore telemetrytypes.MetadataStore,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
aggExprRewriter qbtypes.AggExprRewriter,
telemetryStore telemetrystore.TelemetryStore,
) *traceQueryStatementBuilder {
tracesSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrytraces")
resourceFilterStmtBuilder := telemetryresourcefilter.New[qbtypes.TraceAggregation](
settings,
DBName,
TracesResourceV3TableName,
telemetrytypes.SignalTraces,
metadataStore,
nil,
nil,
)
return &traceQueryStatementBuilder{
logger: tracesSettings.Logger(),
metadataStore: metadataStore,

View File

@@ -8,27 +8,12 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
)
func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.TraceAggregation] {
fm := resourcefilter.NewFieldMapper()
cb := resourcefilter.NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
return resourcefilter.NewTraceResourceFilterStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
fm,
cb,
mockMetadataStore,
)
}
func TestStatementBuilder(t *testing.T) {
cases := []struct {
name string
@@ -372,14 +357,11 @@ func TestStatementBuilder(t *testing.T) {
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
nil,
)
@@ -668,14 +650,11 @@ func TestStatementBuilderListQuery(t *testing.T) {
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
nil,
)
@@ -778,14 +757,11 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
}
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
nil,
)
@@ -931,14 +907,11 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
nil,
)
@@ -1147,14 +1120,11 @@ func TestAdjustKey(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
nil,
)
@@ -1422,14 +1392,11 @@ func TestAdjustKeys(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
nil,
)

View File

@@ -9,4 +9,5 @@ const (
TopLevelOperationsTableName = "distributed_top_level_operations"
TraceSummaryTableName = "distributed_trace_summary"
SpanAttributesKeysTblName = "distributed_span_attributes_keys"
TracesResourceV3TableName = "distributed_traces_v3_resource"
)

View File

@@ -392,13 +392,11 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
traceStmtBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
nil,
)
@@ -409,7 +407,6 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
fm,
cb,
traceStmtBuilder,
resourceFilterStmtBuilder,
aggExprRewriter,
)
@@ -508,13 +505,11 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
traceStmtBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
nil,
)
@@ -525,7 +520,6 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
fm,
cb,
traceStmtBuilder,
resourceFilterStmtBuilder,
aggExprRewriter,
)

View File

@@ -7,6 +7,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/telemetryresourcefilter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
@@ -29,10 +30,20 @@ func NewTraceOperatorStatementBuilder(
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
traceStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.TraceAggregation],
aggExprRewriter qbtypes.AggExprRewriter,
) *traceOperatorStatementBuilder {
tracesSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrytraces")
resourceFilterStmtBuilder := telemetryresourcefilter.New[qbtypes.TraceAggregation](
settings,
DBName,
TracesResourceV3TableName,
telemetrytypes.SignalTraces,
metadataStore,
nil,
nil,
)
return &traceOperatorStatementBuilder{
logger: tracesSettings.Logger(),
metadataStore: metadataStore,

View File

@@ -7,7 +7,6 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
@@ -35,15 +34,6 @@ func TestTraceTimeRangeOptimization(t *testing.T) {
Signal: telemetrytypes.SignalTraces,
}}
resourceFilterFM := resourcefilter.NewFieldMapper()
resourceFilterCB := resourcefilter.NewConditionBuilder(resourceFilterFM)
resourceFilterStmtBuilder := resourcefilter.NewTraceResourceFilterStatementBuilder(
instrumentationtest.New().ToProviderSettings(),
resourceFilterFM,
resourceFilterCB,
mockMetadataStore,
)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
statementBuilder := NewTraceQueryStatementBuilder(
@@ -51,7 +41,6 @@ func TestTraceTimeRangeOptimization(t *testing.T) {
mockMetadataStore,
fm,
cb,
resourceFilterStmtBuilder,
aggExprRewriter,
nil, // telemetryStore is nil - optimization won't happen but code path is tested
)

View File

@@ -91,6 +91,16 @@ func (f QueryBuilderFormula) Validate() error {
)
}
// Validate expression is parseable
if _, err := govaluate.NewEvaluableExpressionWithFunctions(f.Expression, EvalFuncs()); err != nil {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"failed to parse expression for formula query %q: %s",
f.Name,
err.Error(),
)
}
// Validate functions if present
for i, fn := range f.Functions {
if err := fn.Validate(); err != nil {

View File

@@ -305,7 +305,7 @@ func (q *QueryRangeRequest) PrepareJSONSchema(schema *jsonschema.Schema) error {
return nil
}
func (r *QueryRangeRequest) StepIntervalForQuery(name string) int64 {
func (r *QueryRangeRequest) StepIntervalForQuery(name string) (int64, error) {
stepsMap := make(map[string]int64)
for _, query := range r.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
@@ -317,11 +317,13 @@ func (r *QueryRangeRequest) StepIntervalForQuery(name string) int64 {
stepsMap[spec.Name] = spec.StepInterval.Milliseconds()
case PromQuery:
stepsMap[spec.Name] = spec.Step.Milliseconds()
case QueryBuilderTraceOperator:
stepsMap[spec.Name] = spec.StepInterval.Milliseconds()
}
}
if step, ok := stepsMap[name]; ok {
return step
return step, nil
}
exprStr := ""
@@ -335,12 +337,15 @@ func (r *QueryRangeRequest) StepIntervalForQuery(name string) int64 {
}
}
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(exprStr, EvalFuncs())
expression, err := govaluate.NewEvaluableExpressionWithFunctions(exprStr, EvalFuncs())
if err != nil {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "failed to parse expression for formula query %q: %s", name, err.Error())
}
steps := []int64{}
for _, v := range expression.Vars() {
steps = append(steps, stepsMap[v])
}
return LCMList(steps)
return LCMList(steps), nil
}
func (r *QueryRangeRequest) NumAggregationForQuery(name string) int64 {

View File

@@ -1798,3 +1798,108 @@ func TestQueryRangeRequest_GetQueriesSupportingZeroDefault(t *testing.T) {
})
}
}
func TestQueryRangeRequest_StepIntervalForQuery(t *testing.T) {
tests := []struct {
name string
request QueryRangeRequest
queryName string
wantStep int64
wantErr bool
}{
{
name: "trace operator returns its step interval",
request: QueryRangeRequest{
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
StepInterval: Step{Duration: 60 * time.Second},
Aggregations: []TraceAggregation{{Expression: "count()"}},
},
},
{
Type: QueryTypeTraceOperator,
Spec: QueryBuilderTraceOperator{
Name: "Trace Operator",
StepInterval: Step{Duration: 120 * time.Second},
Expression: "A",
},
},
},
},
},
queryName: "Trace Operator",
wantStep: 120000,
},
{
name: "formula computes LCM of referenced query steps",
request: QueryRangeRequest{
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "A",
Signal: telemetrytypes.SignalTraces,
StepInterval: Step{Duration: 60 * time.Second},
Aggregations: []TraceAggregation{{Expression: "count()"}},
},
},
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[TraceAggregation]{
Name: "B",
Signal: telemetrytypes.SignalTraces,
StepInterval: Step{Duration: 90 * time.Second},
Aggregations: []TraceAggregation{{Expression: "count()"}},
},
},
{
Type: QueryTypeFormula,
Spec: QueryBuilderFormula{
Name: "F1",
Expression: "A + B",
},
},
},
},
},
queryName: "F1",
wantStep: 180000, // LCM of 60s and 90s = 180s
},
{
name: "invalid formula expression returns error",
request: QueryRangeRequest{
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{
{
Type: QueryTypeFormula,
Spec: QueryBuilderFormula{
Name: "F1",
Expression: "A +",
},
},
},
},
},
queryName: "F1",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.request.StepIntervalForQuery(tt.queryName)
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.Equal(t, tt.wantStep, got)
})
}
}

View File

@@ -496,6 +496,19 @@ func (r *QueryRangeRequest) Validate(opts ...ValidationOption) error {
)
}
// raw/trace request types don't support metric queries;
// metrics are always aggregated and there is no raw form.
if r.RequestType == RequestTypeRaw || r.RequestType == RequestTypeRawStream || r.RequestType == RequestTypeTrace {
for _, envelope := range r.CompositeQuery.Queries {
if envelope.GetSignal() == telemetrytypes.SignalMetrics {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"raw request type is not supported for metric queries",
)
}
}
}
// Validate composite query
if err := r.CompositeQuery.Validate(opts...); err != nil {
return err
@@ -584,13 +597,7 @@ func validateQueryEnvelope(envelope QueryEnvelope, opts ...ValidationOption) err
"invalid formula spec",
)
}
if spec.Expression == "" {
return errors.NewInvalidInputf(
errors.CodeInvalidInput,
"formula expression is required",
)
}
return nil
return spec.Validate()
case QueryTypeJoin:
_, ok := envelope.Spec.(QueryBuilderJoin)
if !ok {

View File

@@ -518,7 +518,7 @@ func TestQueryRangeRequest_ValidateCompositeQuery(t *testing.T) {
},
},
wantErr: true,
errMsg: "expression is required",
errMsg: "expression cannot be blank",
},
{
name: "promql with empty query should return error",
@@ -665,6 +665,57 @@ func TestQueryRangeRequest_ValidateCompositeQuery(t *testing.T) {
},
wantErr: false,
},
{
name: "raw request with metric query should return error",
request: QueryRangeRequest{
Start: 1640995200000,
End: 1640998800000,
RequestType: RequestTypeRaw,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[MetricAggregation]{
Name: "A",
Disabled: true,
Signal: telemetrytypes.SignalMetrics,
Aggregations: []MetricAggregation{},
},
},
{
Type: QueryTypeFormula,
Spec: QueryBuilderFormula{
Name: "F1",
Expression: "A",
},
},
},
},
},
wantErr: true,
errMsg: "raw request type is not supported for metric queries",
},
{
name: "raw request with log query without aggregations should pass",
request: QueryRangeRequest{
Start: 1640995200000,
End: 1640998800000,
RequestType: RequestTypeRaw,
CompositeQuery: CompositeQuery{
Queries: []QueryEnvelope{
{
Type: QueryTypeBuilder,
Spec: QueryBuilderQuery[LogAggregation]{
Name: "A",
Signal: telemetrytypes.SignalLogs,
Aggregations: []LogAggregation{},
},
},
},
},
},
wantErr: false,
},
}
for _, tt := range tests {
@@ -733,7 +784,7 @@ func TestValidateQueryEnvelope(t *testing.T) {
},
requestType: RequestTypeTimeSeries,
wantErr: true,
errMsg: "expression is required",
errMsg: "expression cannot be blank",
},
{
name: "valid join spec",

View File

@@ -103,7 +103,7 @@ type RuleCondition struct {
MatchType MatchType `json:"matchType"`
TargetUnit string `json:"targetUnit,omitempty"`
Algorithm string `json:"algorithm,omitempty"`
Seasonality string `json:"seasonality,omitempty"`
Seasonality Seasonality `json:"seasonality,omitzero"`
SelectedQuery string `json:"selectedQueryName,omitempty"`
RequireMinPoints bool `json:"requireMinPoints,omitempty"`
RequiredNumPoints int `json:"requiredNumPoints,omitempty"`
@@ -158,10 +158,6 @@ func (rc *RuleCondition) SelectedQueryName() string {
return keys[len(keys)-1]
}
func (rc *RuleCondition) IsValid() bool {
return true
}
// ShouldEval checks if the further series should be evaluated at all for alerts.
func (rc *RuleCondition) ShouldEval(series *qbtypes.TimeSeries) bool {
return !rc.RequireMinPoints || len(series.Values) >= rc.RequiredNumPoints

View File

@@ -12,6 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types/alertmanagertypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -25,7 +26,8 @@ const (
)
const (
DefaultSchemaVersion = "v1"
DefaultSchemaVersion = "v1"
SchemaVersionV2Alpha1 = "v2alpha1"
)
type RuleDataKind string
@@ -39,9 +41,9 @@ type PostableRule struct {
AlertName string `json:"alert"`
AlertType AlertType `json:"alertType,omitempty"`
Description string `json:"description,omitempty"`
RuleType RuleType `json:"ruleType,omitempty"`
EvalWindow valuer.TextDuration `json:"evalWindow,omitempty"`
Frequency valuer.TextDuration `json:"frequency,omitempty"`
RuleType RuleType `json:"ruleType,omitzero"`
EvalWindow valuer.TextDuration `json:"evalWindow,omitzero"`
Frequency valuer.TextDuration `json:"frequency,omitzero"`
RuleCondition *RuleCondition `json:"condition,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
@@ -64,7 +66,7 @@ type PostableRule struct {
type NotificationSettings struct {
GroupBy []string `json:"groupBy,omitempty"`
Renotify Renotify `json:"renotify,omitempty"`
Renotify Renotify `json:"renotify,omitzero"`
UsePolicy bool `json:"usePolicy,omitempty"`
// NewGroupEvalDelay is the grace period for new series to be excluded from alerts evaluation
NewGroupEvalDelay valuer.TextDuration `json:"newGroupEvalDelay,omitzero"`
@@ -185,15 +187,19 @@ func (r *PostableRule) processRuleDefaults() {
r.SchemaVersion = DefaultSchemaVersion
}
if r.EvalWindow.IsZero() {
r.EvalWindow = valuer.MustParseTextDuration("5m")
// v2alpha1 uses the Evaluation envelope for window/frequency;
// only default top-level fields for v1.
if r.SchemaVersion != SchemaVersionV2Alpha1 {
if r.EvalWindow.IsZero() {
r.EvalWindow = valuer.MustParseTextDuration("5m")
}
if r.Frequency.IsZero() {
r.Frequency = valuer.MustParseTextDuration("1m")
}
}
if r.Frequency.IsZero() {
r.Frequency = valuer.MustParseTextDuration("1m")
}
if r.RuleCondition != nil {
if r.RuleCondition != nil && r.RuleCondition.CompositeQuery != nil {
switch r.RuleCondition.CompositeQuery.QueryType {
case QueryTypeBuilder:
if r.RuleType.IsZero() {
@@ -259,6 +265,10 @@ func (r *PostableRule) MarshalJSON() ([]byte, error) {
aux.SchemaVersion = ""
aux.NotificationSettings = nil
return json.Marshal(aux)
case SchemaVersionV2Alpha1:
copyStruct := *r
aux := Alias(copyStruct)
return json.Marshal(aux)
default:
copyStruct := *r
aux := Alias(copyStruct)
@@ -292,23 +302,24 @@ func isValidLabelValue(v string) bool {
return utf8.ValidString(v)
}
// validate runs during UnmarshalJSON (read + write path).
// Preserves the original pre-existing checks only so that stored rules
// continue to load without errors.
func (r *PostableRule) validate() error {
var errs []error
if r.RuleCondition == nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "rule condition is required")
return errors.NewInvalidInputf(errors.CodeInvalidInput, "condition: field is required")
}
if r.Version != "v5" {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "only version v5 is supported, got %q", r.Version))
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "version: only v5 is supported, got %q", r.Version))
}
for k, v := range r.Labels {
if !isValidLabelName(k) {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid label name: %s", k))
}
if !isValidLabelValue(v) {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid label value: %s", v))
}
@@ -324,6 +335,185 @@ func (r *PostableRule) validate() error {
return errors.Join(errs...)
}
// Validate enforces all validation rules. For now, this is invoked on the write path
// (create, update, patch, test) before persisting. This is intentionally
// not called from UnmarshalJSON so that existing stored rules can always
// be loaded regardless of new validation rules.
func (r *PostableRule) Validate() error {
var errs []error
if r.AlertName == "" {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "alert: field is required"))
}
if r.RuleCondition == nil {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "condition: field is required")
}
if r.Version != "v5" {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "version: only v5 is supported, got %q", r.Version))
}
if r.AlertType != "" {
switch r.AlertType {
case AlertTypeMetric, AlertTypeTraces, AlertTypeLogs, AlertTypeExceptions:
default:
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
"alertType: unsupported value %q; must be one of %q, %q, %q, %q",
r.AlertType, AlertTypeMetric, AlertTypeTraces, AlertTypeLogs, AlertTypeExceptions))
}
}
if !r.RuleType.IsZero() {
if err := r.RuleType.Validate(); err != nil {
errs = append(errs, err)
}
}
if r.RuleType == RuleTypeAnomaly && !r.RuleCondition.Seasonality.IsZero() {
if err := r.RuleCondition.Seasonality.Validate(); err != nil {
errs = append(errs, err)
}
}
if r.RuleCondition.CompositeQuery == nil {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "condition.compositeQuery: field is required"))
} else {
if len(r.RuleCondition.CompositeQuery.Queries) == 0 {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "condition.compositeQuery.queries: must have at least one query"))
} else {
cq := &qbtypes.CompositeQuery{Queries: r.RuleCondition.CompositeQuery.Queries}
if err := cq.Validate(qbtypes.GetValidationOptions(qbtypes.RequestTypeTimeSeries)...); err != nil {
errs = append(errs, err)
}
}
}
if r.RuleCondition.SelectedQuery != "" && r.RuleCondition.CompositeQuery != nil && len(r.RuleCondition.CompositeQuery.Queries) > 0 {
found := false
for _, query := range r.RuleCondition.CompositeQuery.Queries {
if query.GetQueryName() == r.RuleCondition.SelectedQuery {
found = true
break
}
}
if !found {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
"condition.selectedQueryName: %q does not match any query in compositeQuery",
r.RuleCondition.SelectedQuery))
}
}
if r.RuleCondition.RequireMinPoints && r.RuleCondition.RequiredNumPoints <= 0 {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
"condition.requiredNumPoints: must be greater than 0 when requireMinPoints is enabled"))
}
errs = append(errs, r.validateSchemaVersion()...)
for k, v := range r.Labels {
if !isValidLabelName(k) {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid label name: %s", k))
}
if !isValidLabelValue(v) {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid label value: %s", v))
}
}
for k := range r.Annotations {
if !isValidLabelName(k) {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid annotation name: %s", k))
}
}
errs = append(errs, testTemplateParsing(r)...)
return errors.Join(errs...)
}
func (r *PostableRule) validateSchemaVersion() []error {
switch r.SchemaVersion {
case DefaultSchemaVersion:
return r.validateV1()
case SchemaVersionV2Alpha1:
return r.validateV2Alpha1()
default:
return []error{errors.NewInvalidInputf(errors.CodeInvalidInput,
"schemaVersion: unsupported value %q; must be one of %q, %q",
r.SchemaVersion, DefaultSchemaVersion, SchemaVersionV2Alpha1)}
}
}
func (r *PostableRule) validateV1() []error {
var errs []error
if r.RuleCondition.Target == nil {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
"condition.target: field is required for schemaVersion %q", DefaultSchemaVersion))
}
if r.RuleCondition.CompareOperator.IsZero() {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
"condition.op: field is required for schemaVersion %q", DefaultSchemaVersion))
} else if err := r.RuleCondition.CompareOperator.Validate(); err != nil {
errs = append(errs, err)
}
if r.RuleCondition.MatchType.IsZero() {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
"condition.matchType: field is required for schemaVersion %q", DefaultSchemaVersion))
} else if err := r.RuleCondition.MatchType.Validate(); err != nil {
errs = append(errs, err)
}
return errs
}
func (r *PostableRule) validateV2Alpha1() []error {
var errs []error
// TODO(srikanthccv): reject v1-only fields?
// if r.RuleCondition.Target != nil {
// errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
// "condition.target: field is not used in schemaVersion %q; set target in condition.thresholds entries instead",
// SchemaVersionV2Alpha1))
// }
// if !r.RuleCondition.CompareOperator.IsZero() {
// errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
// "condition.op: field is not used in schemaVersion %q; set op in condition.thresholds entries instead",
// SchemaVersionV2Alpha1))
// }
// if !r.RuleCondition.MatchType.IsZero() {
// errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
// "condition.matchType: field is not used in schemaVersion %q; set matchType in condition.thresholds entries instead",
// SchemaVersionV2Alpha1))
// }
// if len(r.PreferredChannels) > 0 {
// errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
// "preferredChannels: field is not used in schemaVersion %q; set channels in condition.thresholds entries instead",
// SchemaVersionV2Alpha1))
// }
// Require v2alpha1-specific fields
if r.RuleCondition.Thresholds == nil {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
"condition.thresholds: field is required for schemaVersion %q", SchemaVersionV2Alpha1))
}
if r.Evaluation == nil {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
"evaluation: field is required for schemaVersion %q", SchemaVersionV2Alpha1))
}
if r.NotificationSettings == nil {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
"notificationSettings: field is required for schemaVersion %q", SchemaVersionV2Alpha1))
} else {
if r.NotificationSettings.Renotify.Enabled && !r.NotificationSettings.Renotify.ReNotifyInterval.IsPositive() {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput,
"notificationSettings.renotify.interval: must be a positive duration when renotify is enabled"))
}
}
return errs
}
func testTemplateParsing(rl *PostableRule) (errs []error) {
if rl.AlertName == "" {
// Not an alerting rule.
@@ -393,6 +583,10 @@ func (g *GettableRule) MarshalJSON() ([]byte, error) {
aux.SchemaVersion = ""
aux.NotificationSettings = nil
return json.Marshal(aux)
case SchemaVersionV2Alpha1:
copyStruct := *g
aux := Alias(copyStruct)
return json.Marshal(aux)
default:
copyStruct := *g
aux := Alias(copyStruct)

View File

@@ -34,15 +34,15 @@ func TestParseIntoRule(t *testing.T) {
"condition": {
"compositeQuery": {
"queryType": "builder",
"builderQueries": {
"A": {
"expression": "A",
"disabled": false,
"aggregateAttribute": {
"key": "test_metric"
}
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "test_metric", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
}
}]
},
"target": 10.0,
"matchType": "1",
@@ -77,14 +77,15 @@ func TestParseIntoRule(t *testing.T) {
"condition": {
"compositeQuery": {
"queryType": "builder",
"builderQueries": {
"A": {
"disabled": false,
"aggregateAttribute": {
"key": "test_metric"
}
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "test_metric", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
}
}]
},
"target": 5.0,
"matchType": "1",
@@ -112,12 +113,14 @@ func TestParseIntoRule(t *testing.T) {
"condition": {
"compositeQuery": {
"queryType": "promql",
"promQueries": {
"A": {
"queries": [{
"type": "promql",
"spec": {
"name": "A",
"query": "rate(http_requests_total[5m])",
"disabled": false
}
}
}]
},
"target": 10.0,
"matchType": "1",
@@ -165,12 +168,13 @@ func TestParseIntoRule(t *testing.T) {
func TestParseIntoRuleSchemaVersioning(t *testing.T) {
tests := []struct {
name string
initRule PostableRule
content []byte
kind RuleDataKind
expectError bool
validate func(*testing.T, *PostableRule)
name string
initRule PostableRule
content []byte
kind RuleDataKind
expectError bool // unmarshal error (read path)
expectValidateError bool // Validate() error (write path only)
validate func(*testing.T, *PostableRule)
}{
{
name: "schema v1 - threshold name from severity label",
@@ -182,13 +186,15 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
"condition": {
"compositeQuery": {
"queryType": "builder",
"builderQueries": {
"A": {
"aggregateAttribute": {
"key": "cpu_usage"
}
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "cpu_usage", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
},
}],
"unit": "percent"
},
"target": 85.0,
@@ -271,13 +277,15 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
"condition": {
"compositeQuery": {
"queryType": "builder",
"builderQueries": {
"A": {
"aggregateAttribute": {
"key": "memory_usage"
}
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "memory_usage", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
}
}]
},
"target": 90.0,
"matchType": "1",
@@ -312,13 +320,15 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
"condition": {
"compositeQuery": {
"queryType": "builder",
"builderQueries": {
"A": {
"aggregateAttribute": {
"key": "cpu_usage"
}
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "cpu_usage", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
},
}],
"unit": "percent"
},
"target": 80.0,
@@ -394,49 +404,253 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
},
},
{
name: "schema v2 - does not populate thresholds and evaluation",
name: "schema v2alpha1 - uses explicit thresholds and evaluation",
initRule: PostableRule{},
content: []byte(`{
"alert": "V2Test",
"schemaVersion": "v2",
"alert": "V2Alpha1Test",
"schemaVersion": "v2alpha1",
"version": "v5",
"ruleType": "threshold_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"builderQueries": {
"A": {
"aggregateAttribute": {
"key": "test_metric"
}
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "test_metric", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
}
}]
},
"thresholds": {
"kind": "basic",
"spec": [{
"name": "critical",
"target": 100.0,
"matchType": "1",
"op": "1"
}]
}
},
"evaluation": {
"kind": "rolling",
"spec": {
"evalWindow": "5m",
"frequency": "1m"
}
},
"notificationSettings": {
"renotify": {
"enabled": true,
"interval": "4h",
"alertStates": ["firing"]
}
}
}`),
kind: RuleDataKindJson,
expectError: false,
validate: func(t *testing.T, rule *PostableRule) {
if rule.SchemaVersion != SchemaVersionV2Alpha1 {
t.Errorf("Expected schemaVersion %q, got %q", SchemaVersionV2Alpha1, rule.SchemaVersion)
}
if rule.RuleCondition.Thresholds == nil {
t.Error("Expected Thresholds to be present for v2alpha1")
}
if rule.Evaluation == nil {
t.Error("Expected Evaluation to be present for v2alpha1")
}
if rule.NotificationSettings == nil {
t.Error("Expected NotificationSettings to be present for v2alpha1")
}
if rule.RuleType != RuleTypeThreshold {
t.Error("Expected RuleType to be auto-detected")
}
},
},
{
name: "schema v2alpha1 - rejects v1-only fields with suggestions",
initRule: PostableRule{},
content: []byte(`{
"alert": "MixedFieldsTest",
"schemaVersion": "v2alpha1",
"version": "v5",
"ruleType": "threshold_rule",
"preferredChannels": ["slack"],
"condition": {
"compositeQuery": {
"queryType": "builder",
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "test_metric", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
}]
},
"target": 100.0,
"matchType": "1",
"op": "1"
}
}`),
kind: RuleDataKindJson,
expectError: false,
validate: func(t *testing.T, rule *PostableRule) {
if rule.SchemaVersion != "v2" {
t.Errorf("Expected schemaVersion 'v2', got '%s'", rule.SchemaVersion)
kind: RuleDataKindJson,
expectValidateError: true,
},
{
name: "schema v2alpha1 - requires evaluation",
initRule: PostableRule{},
content: []byte(`{
"alert": "MissingEvalTest",
"schemaVersion": "v2alpha1",
"version": "v5",
"ruleType": "threshold_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "test_metric", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
}]
},
"thresholds": {
"kind": "basic",
"spec": [{
"name": "critical",
"target": 100.0,
"matchType": "1",
"op": "1"
}]
}
},
"notificationSettings": {
"renotify": {
"enabled": true,
"interval": "4h",
"alertStates": ["firing"]
}
}
if rule.RuleCondition.Thresholds != nil {
t.Error("Expected Thresholds to be nil for v2")
}`),
kind: RuleDataKindJson,
expectValidateError: true,
},
{
name: "schema v2alpha1 - requires notificationSettings",
initRule: PostableRule{},
content: []byte(`{
"alert": "MissingNotifTest",
"schemaVersion": "v2alpha1",
"version": "v5",
"ruleType": "threshold_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "test_metric", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
}]
},
"thresholds": {
"kind": "basic",
"spec": [{
"name": "critical",
"target": 100.0,
"matchType": "1",
"op": "1"
}]
}
},
"evaluation": {
"kind": "rolling",
"spec": {
"evalWindow": "5m",
"frequency": "1m"
}
}
if rule.Evaluation != nil {
t.Error("Expected Evaluation to be nil for v2")
}`),
kind: RuleDataKindJson,
expectValidateError: true,
},
{
name: "schema v2alpha1 - requires thresholds for non-promql rules",
initRule: PostableRule{},
content: []byte(`{
"alert": "MissingThresholdsTest",
"schemaVersion": "v2alpha1",
"version": "v5",
"ruleType": "threshold_rule",
"condition": {
"compositeQuery": {
"queryType": "builder",
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "test_metric", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
}]
}
},
"evaluation": {
"kind": "rolling",
"spec": {
"evalWindow": "5m",
"frequency": "1m"
}
},
"notificationSettings": {
"renotify": {
"enabled": true,
"interval": "4h",
"alertStates": ["firing"]
}
}
if rule.EvalWindow.Duration() != 5*time.Minute {
t.Error("Expected default EvalWindow to be applied")
}`),
kind: RuleDataKindJson,
expectValidateError: true,
},
{
name: "unsupported schema version",
initRule: PostableRule{},
content: []byte(`{
"alert": "BadSchemaTest",
"schemaVersion": "v3",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "test_metric", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
}]
},
"target": 100.0,
"matchType": "1",
"op": "1"
}
if rule.RuleType != RuleTypeThreshold {
t.Error("Expected RuleType to be auto-detected")
}
},
}`),
kind: RuleDataKindJson,
expectValidateError: true,
},
{
name: "default schema version - defaults to v1 behavior",
@@ -447,13 +661,15 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
"condition": {
"compositeQuery": {
"queryType": "builder",
"builderQueries": {
"A": {
"aggregateAttribute": {
"key": "test_metric"
}
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "test_metric", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
}
}]
},
"target": 75.0,
"matchType": "1",
@@ -480,13 +696,23 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
rule := tt.initRule
err := json.Unmarshal(tt.content, &rule)
if tt.expectError && err == nil {
t.Errorf("Expected error but got none")
if tt.expectError {
if err == nil {
t.Errorf("Expected unmarshal error but got none")
}
return
}
if !tt.expectError && err != nil {
t.Errorf("Unexpected error: %v", err)
if err != nil {
t.Errorf("Unexpected unmarshal error: %v", err)
return
}
if tt.validate != nil && err == nil {
if tt.expectValidateError {
if err := rule.Validate(); err == nil {
t.Errorf("Expected Validate() error but got none")
}
return
}
if tt.validate != nil {
tt.validate(t, &rule)
}
})
@@ -500,15 +726,15 @@ func TestParseIntoRuleThresholdGeneration(t *testing.T) {
"condition": {
"compositeQuery": {
"queryType": "builder",
"builderQueries": {
"A": {
"expression": "A",
"disabled": false,
"aggregateAttribute": {
"key": "response_time"
}
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "response_time", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
}
}]
},
"target": 100.0,
"matchType": "1",
@@ -571,7 +797,7 @@ func TestParseIntoRuleThresholdGeneration(t *testing.T) {
func TestParseIntoRuleMultipleThresholds(t *testing.T) {
content := []byte(`{
"schemaVersion": "v2",
"schemaVersion": "v2alpha1",
"alert": "MultiThresholdAlert",
"ruleType": "threshold_rule",
"version": "v5",
@@ -579,19 +805,16 @@ func TestParseIntoRuleMultipleThresholds(t *testing.T) {
"compositeQuery": {
"queryType": "builder",
"unit": "%",
"builderQueries": {
"A": {
"expression": "A",
"disabled": false,
"aggregateAttribute": {
"key": "cpu_usage"
}
"queries": [{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "metrics",
"aggregations": [{"metricName": "cpu_usage", "spaceAggregation": "p50"}],
"stepInterval": "5m"
}
}
}]
},
"target": 90.0,
"matchType": "1",
"op": "1",
"selectedQuery": "A",
"thresholds": {
"kind": "basic",
@@ -616,6 +839,20 @@ func TestParseIntoRuleMultipleThresholds(t *testing.T) {
}
]
}
},
"evaluation": {
"kind": "rolling",
"spec": {
"evalWindow": "5m",
"frequency": "1m"
}
},
"notificationSettings": {
"renotify": {
"enabled": true,
"interval": "4h",
"alertStates": ["firing"]
}
}
}`)
rule := PostableRule{}

View File

@@ -54,6 +54,29 @@ func (CompareOperator) Enum() []any {
}
}
// Normalize returns the canonical (numeric) form of the operator.
// This ensures evaluation logic can use simple == checks against the canonical values.
func (c CompareOperator) Normalize() CompareOperator {
switch c {
case ValueIsAbove, ValueIsAboveLiteral, ValueIsAboveSymbol:
return ValueIsAbove
case ValueIsBelow, ValueIsBelowLiteral, ValueIsBelowSymbol:
return ValueIsBelow
case ValueIsEq, ValueIsEqLiteral, ValueIsEqLiteralShort, ValueIsEqSymbol:
return ValueIsEq
case ValueIsNotEq, ValueIsNotEqLiteral, ValueIsNotEqLiteralShort, ValueIsNotEqSymbol:
return ValueIsNotEq
case ValueAboveOrEq, ValueAboveOrEqLiteral, ValueAboveOrEqLiteralShort, ValueAboveOrEqSymbol:
return ValueAboveOrEq
case ValueBelowOrEq, ValueBelowOrEqLiteral, ValueBelowOrEqLiteralShort, ValueBelowOrEqSymbol:
return ValueBelowOrEq
case ValueOutsideBounds, ValueOutsideBoundsLiteral:
return ValueOutsideBounds
default:
return c
}
}
func (c CompareOperator) Validate() error {
switch c {
case ValueIsAbove,
@@ -70,10 +93,18 @@ func (c CompareOperator) Validate() error {
ValueIsNotEqLiteral,
ValueIsNotEqLiteralShort,
ValueIsNotEqSymbol,
ValueAboveOrEq,
ValueAboveOrEqLiteral,
ValueAboveOrEqLiteralShort,
ValueAboveOrEqSymbol,
ValueBelowOrEq,
ValueBelowOrEqLiteral,
ValueBelowOrEqLiteralShort,
ValueBelowOrEqSymbol,
ValueOutsideBounds,
ValueOutsideBoundsLiteral:
return nil
default:
return errors.NewInvalidInputf(errors.CodeInvalidInput, "unknown comparison operator, known values are: ")
return errors.NewInvalidInputf(errors.CodeInvalidInput, "condition.op: unsupported value %q; must be one of above, below, equal, not_equal, above_or_equal, below_or_equal, outside_bounds", c.StringValue())
}
}

View File

@@ -11,7 +11,7 @@ type MatchType struct {
var (
AtleastOnce = MatchType{valuer.NewString("1")}
AtleastOnceLiteral = MatchType{valuer.NewString("atleast_once")}
AtleastOnceLiteral = MatchType{valuer.NewString("at_least_once")}
AllTheTimes = MatchType{valuer.NewString("2")}
AllTheTimesLiteral = MatchType{valuer.NewString("all_the_times")}
@@ -38,6 +38,24 @@ func (MatchType) Enum() []any {
}
}
// Normalize returns the canonical (numeric) form of the match type.
func (m MatchType) Normalize() MatchType {
switch m {
case AtleastOnce, AtleastOnceLiteral:
return AtleastOnce
case AllTheTimes, AllTheTimesLiteral:
return AllTheTimes
case OnAverage, OnAverageLiteral, OnAverageShort:
return OnAverage
case InTotal, InTotalLiteral, InTotalShort:
return InTotal
case Last, LastLiteral:
return Last
default:
return m
}
}
func (m MatchType) Validate() error {
switch m {
case
@@ -55,6 +73,6 @@ func (m MatchType) Validate() error {
LastLiteral:
return nil
default:
return errors.NewInvalidInputf(errors.CodeInvalidInput, "unknown match type operator, known values are")
return errors.NewInvalidInputf(errors.CodeInvalidInput, "condition.matchType: unsupported value %q; must be one of at_least_once, all_the_times, on_average, in_total, last", m.StringValue())
}
}

View File

@@ -31,6 +31,6 @@ func (r RuleType) Validate() error {
RuleTypeAnomaly:
return nil
default:
return errors.NewInvalidInputf(errors.CodeInvalidInput, "unknown rule type, known values are")
return errors.NewInvalidInputf(errors.CodeInvalidInput, "ruleType: unsupported value %q; must be one of threshold_rule, promql_rule, anomaly_rule", r.StringValue())
}
}

View File

@@ -0,0 +1,35 @@
package ruletypes
import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Seasonality struct {
valuer.String
}
var (
SeasonalityHourly = Seasonality{valuer.NewString("hourly")}
SeasonalityDaily = Seasonality{valuer.NewString("daily")}
SeasonalityWeekly = Seasonality{valuer.NewString("weekly")}
)
func (Seasonality) Enum() []any {
return []any{
SeasonalityHourly,
SeasonalityDaily,
SeasonalityWeekly,
}
}
func (s Seasonality) Validate() error {
switch s {
case SeasonalityHourly, SeasonalityDaily, SeasonalityWeekly:
return nil
default:
return errors.NewInvalidInputf(errors.CodeInvalidInput,
"condition.seasonality: unsupported value %q; must be one of hourly, daily, weekly",
s.StringValue())
}
}

View File

@@ -189,7 +189,7 @@ func sortThresholds(thresholds []BasicRuleThreshold) {
targetI := thresholds[i].target(thresholds[i].TargetUnit) //for sorting we dont need rule unit
targetJ := thresholds[j].target(thresholds[j].TargetUnit)
switch thresholds[i].CompareOperator {
switch thresholds[i].CompareOperator.Normalize() {
case ValueIsAbove, ValueAboveOrEq, ValueOutsideBounds:
// For "above" operations, sort descending (higher values first)
return targetI > targetJ
@@ -234,16 +234,11 @@ func (b BasicRuleThreshold) Validate() error {
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "target value cannot be nil"))
}
switch b.CompareOperator {
case ValueIsAbove, ValueIsBelow, ValueIsEq, ValueIsNotEq, ValueAboveOrEq, ValueBelowOrEq, ValueOutsideBounds:
default:
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid compare operation: %s", b.CompareOperator.StringValue()))
if err := b.CompareOperator.Validate(); err != nil {
errs = append(errs, err)
}
switch b.MatchType {
case AtleastOnce, AllTheTimes, OnAverage, InTotal, Last:
default:
errs = append(errs, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid match type: %s", b.MatchType.StringValue()))
if err := b.MatchType.Validate(); err != nil {
errs = append(errs, err)
}
return errors.Join(errs...)
@@ -268,6 +263,33 @@ func PrepareSampleLabelsForRule(seriesLabels []*qbtypes.Label, thresholdName str
return lb.Labels()
}
// matchesCompareOp checks if a value matches the compare operator against target.
func matchesCompareOp(op CompareOperator, value, target float64) bool {
switch op {
case ValueIsAbove:
return value > target
case ValueIsBelow:
return value < target
case ValueIsEq:
return value == target
case ValueIsNotEq:
return value != target
case ValueAboveOrEq:
return value >= target
case ValueBelowOrEq:
return value <= target
case ValueOutsideBounds:
return math.Abs(value) >= target
default:
return false
}
}
// negatesCompareOp checks if a value does NOT match the compare operator against target.
func negatesCompareOp(op CompareOperator, value, target float64) bool {
return !matchesCompareOp(op, value, target)
}
func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, target float64) (Sample, bool) {
var shouldAlert bool
var alertSmpl Sample
@@ -278,63 +300,35 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
return alertSmpl, false
}
switch b.MatchType {
// Normalize to canonical forms so evaluation uses simple == checks
op := b.CompareOperator.Normalize()
matchType := b.MatchType.Normalize()
switch matchType {
case AtleastOnce:
// If any sample matches the condition, the rule is firing.
if b.CompareOperator == ValueIsAbove {
for _, smpl := range series.Values {
if smpl.Value > target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
} else if b.CompareOperator == ValueIsBelow {
for _, smpl := range series.Values {
if smpl.Value < target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
} else if b.CompareOperator == ValueIsEq {
for _, smpl := range series.Values {
if smpl.Value == target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
} else if b.CompareOperator == ValueIsNotEq {
for _, smpl := range series.Values {
if smpl.Value != target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
} else if b.CompareOperator == ValueOutsideBounds {
for _, smpl := range series.Values {
if math.Abs(smpl.Value) >= target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
for _, smpl := range series.Values {
if matchesCompareOp(op, smpl.Value, target) {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = true
break
}
}
case AllTheTimes:
// If all samples match the condition, the rule is firing.
shouldAlert = true
alertSmpl = Sample{Point: Point{V: target}, Metric: lbls}
if b.CompareOperator == ValueIsAbove {
for _, smpl := range series.Values {
if smpl.Value <= target {
shouldAlert = false
break
}
for _, smpl := range series.Values {
if negatesCompareOp(op, smpl.Value, target) {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = false
break
}
// use min value from the series
if shouldAlert {
}
if shouldAlert {
switch op {
case ValueIsAbove, ValueAboveOrEq, ValueOutsideBounds:
// use min value from the series
var minValue = math.Inf(1)
for _, smpl := range series.Values {
if smpl.Value < minValue {
@@ -342,15 +336,8 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
}
}
alertSmpl = Sample{Point: Point{V: minValue}, Metric: lbls}
}
} else if b.CompareOperator == ValueIsBelow {
for _, smpl := range series.Values {
if smpl.Value >= target {
shouldAlert = false
break
}
}
if shouldAlert {
case ValueIsBelow, ValueBelowOrEq:
// use max value from the series
var maxValue = math.Inf(-1)
for _, smpl := range series.Values {
if smpl.Value > maxValue {
@@ -358,23 +345,8 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
}
}
alertSmpl = Sample{Point: Point{V: maxValue}, Metric: lbls}
}
} else if b.CompareOperator == ValueIsEq {
for _, smpl := range series.Values {
if smpl.Value != target {
shouldAlert = false
break
}
}
} else if b.CompareOperator == ValueIsNotEq {
for _, smpl := range series.Values {
if smpl.Value == target {
shouldAlert = false
break
}
}
// use any non-inf or nan value from the series
if shouldAlert {
case ValueIsNotEq:
// use any non-inf and non-nan value from the series
for _, smpl := range series.Values {
if !math.IsInf(smpl.Value, 0) && !math.IsNaN(smpl.Value) {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
@@ -382,14 +354,6 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
}
}
}
} else if b.CompareOperator == ValueOutsideBounds {
for _, smpl := range series.Values {
if math.Abs(smpl.Value) < target {
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
shouldAlert = false
break
}
}
}
case OnAverage:
// If the average of all samples matches the condition, the rule is firing.
@@ -403,32 +367,10 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
}
avg := sum / count
alertSmpl = Sample{Point: Point{V: avg}, Metric: lbls}
switch b.CompareOperator {
case ValueIsAbove:
if avg > target {
shouldAlert = true
}
case ValueIsBelow:
if avg < target {
shouldAlert = true
}
case ValueIsEq:
if avg == target {
shouldAlert = true
}
case ValueIsNotEq:
if avg != target {
shouldAlert = true
}
case ValueOutsideBounds:
if math.Abs(avg) >= target {
shouldAlert = true
}
}
shouldAlert = matchesCompareOp(op, avg, target)
case InTotal:
// If the sum of all samples matches the condition, the rule is firing.
var sum float64
for _, smpl := range series.Values {
if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) {
continue
@@ -436,50 +378,12 @@ func (b BasicRuleThreshold) shouldAlertWithTarget(series *qbtypes.TimeSeries, ta
sum += smpl.Value
}
alertSmpl = Sample{Point: Point{V: sum}, Metric: lbls}
switch b.CompareOperator {
case ValueIsAbove:
if sum > target {
shouldAlert = true
}
case ValueIsBelow:
if sum < target {
shouldAlert = true
}
case ValueIsEq:
if sum == target {
shouldAlert = true
}
case ValueIsNotEq:
if sum != target {
shouldAlert = true
}
case ValueOutsideBounds:
if math.Abs(sum) >= target {
shouldAlert = true
}
}
shouldAlert = matchesCompareOp(op, sum, target)
case Last:
// If the last sample matches the condition, the rule is firing.
shouldAlert = false
alertSmpl = Sample{Point: Point{V: series.Values[len(series.Values)-1].Value}, Metric: lbls}
switch b.CompareOperator {
case ValueIsAbove:
if series.Values[len(series.Values)-1].Value > target {
shouldAlert = true
}
case ValueIsBelow:
if series.Values[len(series.Values)-1].Value < target {
shouldAlert = true
}
case ValueIsEq:
if series.Values[len(series.Values)-1].Value == target {
shouldAlert = true
}
case ValueIsNotEq:
if series.Values[len(series.Values)-1].Value != target {
shouldAlert = true
}
}
lastValue := series.Values[len(series.Values)-1].Value
alertSmpl = Sample{Point: Point{V: lastValue}, Metric: lbls}
shouldAlert = matchesCompareOp(op, lastValue, target)
}
return alertSmpl, shouldAlert
}

File diff suppressed because it is too large Load Diff

View File

@@ -42,7 +42,6 @@ type User struct {
OrgID valuer.UUID `bun:"org_id" json:"orgId"`
IsRoot bool `bun:"is_root" json:"isRoot"`
Status valuer.String `bun:"status" json:"status"`
DeletedAt time.Time `bun:"deleted_at" json:"-"`
TimeAuditable
}
@@ -136,7 +135,6 @@ func NewUserFromDeprecatedUser(deprecatedUser *DeprecatedUser) *User {
OrgID: deprecatedUser.OrgID,
IsRoot: deprecatedUser.IsRoot,
Status: deprecatedUser.Status,
DeletedAt: deprecatedUser.DeletedAt,
TimeAuditable: deprecatedUser.TimeAuditable,
}
}

View File

@@ -18,13 +18,13 @@ def test_unique_index_allows_multiple_deleted_rows(
get_token: Callable[[str, str], str],
):
"""
Verify that the composite unique index on (org_id, email, deleted_at) allows multiple
deleted rows for the same (org_id, email) while still enforcing uniqueness among
non-deleted rows.
Verify that the partial unique index on (email, org_id) WHERE status != 'deleted'
allows multiple deleted rows for the same (org_id, email) while still enforcing
uniqueness among non-deleted rows.
Non-deleted users share deleted_at=zero-time, so the unique index prevents duplicates.
Soft-deleted users each have a distinct deleted_at timestamp, so the index allows
multiple deleted rows for the same (org_id, email).
The partial unique index only covers rows where status != 'deleted', so active
users cannot share the same (org_id, email). Deleted users are excluded from
the index, allowing multiple deleted rows for the same (org_id, email).
Steps:
1. Invite and soft-delete a user via the API (first deleted row).
@@ -32,9 +32,9 @@ def test_unique_index_allows_multiple_deleted_rows(
3. Assert via SQL that exactly two deleted rows exist for the email.
4. Assert via SQL that inserting one active row succeeds (no conflict — only
deleted rows exist), then inserting a second active row for the same
(org_id, email) fails with a unique constraint error (both have deleted_at=zero-time).
(org_id, email) fails with a unique constraint error.
5. Assert via SQL that inserting a third deleted row for the same (org_id, email)
with a unique deleted_at succeeds — confirming the index does not cover deleted rows.
succeeds — confirming the index does not cover deleted rows.
6. Assert via SQL that the final count of deleted rows is 3.
"""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
@@ -86,10 +86,9 @@ def test_unique_index_allows_multiple_deleted_rows(
with signoz.sqlstore.conn.connect() as conn:
result = conn.execute(
sql.text(
"SELECT id, deleted_at FROM users"
" WHERE email = :email AND deleted_at != :zero_time"
"SELECT id FROM users WHERE email = :email AND status = 'deleted'"
),
{"email": UNIQUE_INDEX_USER_EMAIL, "zero_time": "0001-01-01 00:00:00"},
{"email": UNIQUE_INDEX_USER_EMAIL},
)
deleted_rows = result.fetchall()
@@ -109,24 +108,24 @@ def test_unique_index_allows_multiple_deleted_rows(
org_id = result.fetchone()[0]
# Step 4: the unique index must still block a duplicate non-deleted row.
# Both active rows have deleted_at=zero-time, so they share the same (org_id, email, zero-time)
# tuple. First insert must succeed (only deleted rows exist so far).
# Second insert for the same (org_id, email) with deleted_at=zero-time must fail.
# The partial unique index covers rows WHERE status != 'deleted', so two active
# rows with the same (org_id, email) must conflict.
# First insert must succeed (only deleted rows exist so far).
# Second insert for the same (org_id, email) with status='active' must fail.
active_id = str(uuid.uuid4())
with signoz.sqlstore.conn.connect() as conn:
conn.execute(
sql.text(
"INSERT INTO users"
" (id, display_name, email, org_id, is_root, status, created_at, updated_at, deleted_at)"
" (id, display_name, email, org_id, is_root, status, created_at, updated_at)"
" VALUES (:id, :display_name, :email, :org_id,"
" false, 'active', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, :zero_time)"
" false, 'active', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"
),
{
"id": active_id,
"display_name": "first active row",
"email": UNIQUE_INDEX_USER_EMAIL,
"org_id": org_id,
"zero_time": "0001-01-01 00:00:00",
},
)
conn.commit()
@@ -136,27 +135,26 @@ def test_unique_index_allows_multiple_deleted_rows(
conn.execute(
sql.text(
"INSERT INTO users"
" (id, display_name, email, org_id, is_root, status, created_at, updated_at, deleted_at)"
" (id, display_name, email, org_id, is_root, status, created_at, updated_at)"
" VALUES (:id, :display_name, :email, :org_id,"
" false, 'active', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, :zero_time)"
" false, 'active', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"
),
{
"id": str(uuid.uuid4()),
"display_name": "should violate index",
"email": UNIQUE_INDEX_USER_EMAIL,
"org_id": org_id,
"zero_time": "0001-01-01 00:00:00",
},
)
# Step 5: a third deleted row with a unique deleted_at must be accepted
# Step 5: a third deleted row must be accepted (excluded from partial index)
with signoz.sqlstore.conn.connect() as conn:
conn.execute(
sql.text(
"INSERT INTO users"
" (id, display_name, email, org_id, is_root, status, created_at, updated_at, deleted_at)"
" (id, display_name, email, org_id, is_root, status, created_at, updated_at)"
" VALUES (:id, :display_name, :email, :org_id,"
" false, 'deleted', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"
" false, 'deleted', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"
),
{
"id": str(uuid.uuid4()),
@@ -172,9 +170,9 @@ def test_unique_index_allows_multiple_deleted_rows(
result = conn.execute(
sql.text(
"SELECT COUNT(*) FROM users"
" WHERE email = :email AND deleted_at != :zero_time"
" WHERE email = :email AND status = 'deleted'"
),
{"email": UNIQUE_INDEX_USER_EMAIL, "zero_time": "0001-01-01 00:00:00"},
{"email": UNIQUE_INDEX_USER_EMAIL},
)
count = result.fetchone()[0]