Compare commits

..

26 Commits

Author SHA1 Message Date
Nikhil Soni
b4b2d7bb66 test: add test to show cross context matching 2026-06-24 18:34:38 +05:30
Nikhil Soni
e16416475b refactor: drop unused fields 2026-06-24 18:07:47 +05:30
Nikhil Soni
0ea7c1ae6e test: add more cases for scope name 2026-06-24 18:05:50 +05:30
Nikhil Soni
a023c8ed4a test: add integration test for scope fields 2026-06-24 15:25:17 +05:30
Nikhil Soni
a73ae62cd1 Merge remote-tracking branch 'origin/main' into ns/scope 2026-06-24 13:02:18 +05:30
Nikhil Soni
ec6fb58052 chore: add more tests 2026-06-24 12:37:42 +05:30
Nikhil Soni
d3d13eb7ff fix: remove handling of normalized properties for scope
Otherwise it will be impossible to query if scope attribute also
exists with same name - name and version
2026-05-21 11:19:22 +05:30
Nikhil Soni
782de2b210 fix: use correct error type for internal issues 2026-05-20 15:32:26 +05:30
Nikhil Soni
d3c38693f3 fix: allow 'scope.' prefix for keys with other context 2026-05-20 15:30:25 +05:30
Nikhil Soni
8791df3697 fix: avoid removing context prefix to support attr with prefix 2026-05-19 19:14:23 +05:30
Nikhil Soni
eb719c3d0d fix: use key selector with context prefix 2026-05-19 18:56:00 +05:30
Nikhil Soni
f10435c210 Merge remote-tracking branch 'origin' into ns/scope 2026-05-19 13:55:36 +05:30
Nikhil Soni
f3f1e9cb59 chore: add tests for denormalized field name as well 2026-05-19 13:55:25 +05:30
Nikhil Soni
d0370ce3ef fix: handle fields with included context for scope (select clause) 2026-05-14 17:02:56 +05:30
Nikhil Soni
d169761e65 Merge remote-tracking branch 'origin/main' into ns/scope 2026-05-14 11:50:33 +05:30
Nikhil Soni
87864ef5d4 chore: remove duplicates from .gitignore 2026-05-11 15:45:32 +05:30
Nikhil Soni
2e0bc8998e chore: use name as key name for scope instead of scope.name 2026-05-11 15:40:45 +05:30
Nikhil Soni
7e1f4aa50d Merge remote-tracking branch 'origin/main' into ns/scope 2026-05-11 14:27:19 +05:30
Nikhil Soni
35da39247c Merge branch 'main' into ns/scope 2026-05-07 17:41:11 +05:30
Nikhil Soni
ceccc47a34 fix: fix test for case without resource filter 2026-05-07 16:04:03 +05:30
Nikhil Soni
23da5e22ec Merge branch 'main' into ns/scope 2026-05-07 13:27:34 +05:30
Nikhil Soni
4c1b479149 chore: add tests for scope fields 2026-04-28 20:27:10 +05:30
Nikhil Soni
f72204a8b2 refactor: simplify field mapper for scope 2026-04-28 20:26:37 +05:30
Nikhil Soni
deb3f385fa chore: remove underscore version of scope fields 2026-04-23 10:26:55 +05:30
Nikhil Soni
77ce5f86b1 fix: use scope as json field instead with name and version 2026-04-23 01:15:02 +05:30
Nikhil Soni
ff211de441 feat: add support for scope fields in traces 2026-04-14 10:45:08 +05:30
41 changed files with 596 additions and 894 deletions

3
.gitignore vendored
View File

@@ -231,4 +231,5 @@ cython_debug/
# LSP config files
pyrightconfig.json
# agents
.claude/settings.local.json

View File

@@ -370,7 +370,7 @@ func (provider *provider) Delete(ctx context.Context, orgID valuer.UUID, id valu
}
for _, cb := range provider.onBeforeRoleDelete {
if err := cb(ctx, orgID, id, role.Name); err != nil {
if err := cb(ctx, orgID, id); err != nil {
return err
}
}

View File

@@ -48,15 +48,8 @@ export const createVariableSelectionSlice: StateCreator<
},
});
/**
* Stable empty map for dashboards with no stored selections. Returning an inline
* `{}` here would hand zustand's useSyncExternalStore a new reference every call,
* which it reads as a changed snapshot → infinite re-render loop.
*/
const EMPTY_SELECTION_MAP: VariableSelectionMap = {};
/** Selector: the selection map for a dashboard (empty if none). */
export const selectVariableValues =
(dashboardId: string) =>
(state: DashboardStore): VariableSelectionMap =>
state.variableValues[dashboardId] ?? EMPTY_SELECTION_MAP;
state.variableValues[dashboardId] ?? {};

View File

@@ -92,7 +92,7 @@ type AuthZ interface {
}
// OnBeforeRoleDelete is a callback invoked before a role is deleted.
type OnBeforeRoleDelete func(ctx context.Context, orgID valuer.UUID, roleID valuer.UUID, roleName string) error
type OnBeforeRoleDelete func(context.Context, valuer.UUID, valuer.UUID) error
type Handler interface {
Create(http.ResponseWriter, *http.Request)

View File

@@ -3,17 +3,16 @@ package flagger
import "github.com/SigNoz/signoz/pkg/types/featuretypes"
var (
FeatureUseSpanMetrics = featuretypes.MustNewName("use_span_metrics")
FeatureKafkaSpanEval = featuretypes.MustNewName("kafka_span_eval")
FeatureHideRootUser = featuretypes.MustNewName("hide_root_user")
FeatureGetMetersFromZeus = featuretypes.MustNewName("get_meters_from_zeus")
FeaturePutMetersInZeus = featuretypes.MustNewName("put_meters_in_zeus")
FeatureUseMeterReporter = featuretypes.MustNewName("use_meter_reporter")
FeatureUseJSONBody = featuretypes.MustNewName("use_json_body")
FeatureUseFineGrainedAuthz = featuretypes.MustNewName("use_fine_grained_authz")
FeatureUseDashboardV2 = featuretypes.MustNewName("use_dashboard_v2")
FeatureEnableAIObservability = featuretypes.MustNewName("enable_ai_observability")
FeatureEnableMetricsReduction = featuretypes.MustNewName("enable_metrics_reduction")
FeatureUseSpanMetrics = featuretypes.MustNewName("use_span_metrics")
FeatureKafkaSpanEval = featuretypes.MustNewName("kafka_span_eval")
FeatureHideRootUser = featuretypes.MustNewName("hide_root_user")
FeatureGetMetersFromZeus = featuretypes.MustNewName("get_meters_from_zeus")
FeaturePutMetersInZeus = featuretypes.MustNewName("put_meters_in_zeus")
FeatureUseMeterReporter = featuretypes.MustNewName("use_meter_reporter")
FeatureUseJSONBody = featuretypes.MustNewName("use_json_body")
FeatureUseFineGrainedAuthz = featuretypes.MustNewName("use_fine_grained_authz")
FeatureUseDashboardV2 = featuretypes.MustNewName("use_dashboard_v2")
FeatureEnableAIObservability = featuretypes.MustNewName("enable_ai_observability")
)
func MustNewRegistry() featuretypes.Registry {
@@ -98,14 +97,6 @@ func MustNewRegistry() featuretypes.Registry {
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
&featuretypes.Feature{
Name: FeatureEnableMetricsReduction,
Kind: featuretypes.KindBoolean,
Stage: featuretypes.StageExperimental,
Description: "Controls whether metrics cardinality reduction (buffer/reduced tables) is read by the querier",
DefaultVariant: featuretypes.MustNewName("disabled"),
Variants: featuretypes.NewBooleanVariants(),
},
)
if err != nil {
panic(err)

View File

@@ -44,7 +44,3 @@ type Handler interface {
Update(http.ResponseWriter, *http.Request)
Delete(http.ResponseWriter, *http.Request)
}
type Getter interface {
OnBeforeRoleDelete(ctx context.Context, orgID valuer.UUID, roleID valuer.UUID, roleName string) error
}

View File

@@ -1,45 +0,0 @@
package implauthdomain
import (
"context"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type getter struct {
store authtypes.AuthDomainStore
}
func NewGetter(store authtypes.AuthDomainStore) authdomain.Getter {
return &getter{store: store}
}
func (getter *getter) OnBeforeRoleDelete(ctx context.Context, orgID valuer.UUID, roleID valuer.UUID, roleName string) error {
domains, err := getter.store.ListByOrgID(ctx, orgID)
if err != nil {
return err
}
referencedBy := make([]string, 0)
for _, domain := range domains {
for _, mappedRole := range domain.AuthDomainConfig().RoleMapping.RoleNames() {
if mappedRole == roleName {
referencedBy = append(referencedBy, domain.StorableAuthDomain().Name)
break
}
}
}
if len(referencedBy) > 0 {
return errors.WithAdditionalf(
errors.New(errors.TypeInvalidInput, authtypes.ErrCodeRoleHasAuthDomainMappings, "role is referenced by an SSO role mapping, remove it before deleting"),
"referenced by auth domain(s): %s", strings.Join(referencedBy, ", "),
)
}
return nil
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"github.com/SigNoz/signoz/pkg/authn"
"github.com/SigNoz/signoz/pkg/authz"
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -13,18 +12,13 @@ import (
type module struct {
store authtypes.AuthDomainStore
authNs map[authtypes.AuthNProvider]authn.AuthN
authz authz.AuthZ
}
func NewModule(store authtypes.AuthDomainStore, authNs map[authtypes.AuthNProvider]authn.AuthN, authz authz.AuthZ) authdomain.Module {
return &module{store: store, authNs: authNs, authz: authz}
func NewModule(store authtypes.AuthDomainStore, authNs map[authtypes.AuthNProvider]authn.AuthN) authdomain.Module {
return &module{store: store, authNs: authNs}
}
func (module *module) Create(ctx context.Context, domain *authtypes.AuthDomain) error {
if err := module.validateRoleMapping(ctx, domain); err != nil {
return err
}
return module.store.Create(ctx, domain)
}
@@ -56,10 +50,6 @@ func (module *module) ListByOrgID(ctx context.Context, orgID valuer.UUID) ([]*au
}
func (module *module) Update(ctx context.Context, domain *authtypes.AuthDomain) error {
if err := module.validateRoleMapping(ctx, domain); err != nil {
return err
}
return module.store.Update(ctx, domain)
}
@@ -84,13 +74,3 @@ func (module *module) Collect(ctx context.Context, orgID valuer.UUID) (map[strin
return stats, nil
}
func (module *module) validateRoleMapping(ctx context.Context, domain *authtypes.AuthDomain) error {
roleNames := domain.AuthDomainConfig().RoleMapping.RoleNames()
if len(roleNames) == 0 {
return nil
}
_, err := module.authz.ListByOrgIDAndNames(ctx, domain.StorableAuthDomain().OrgID, roleNames)
return err
}

View File

@@ -341,12 +341,12 @@ func alignedMetricWindow(startMs, endMs int64) (
}
tsAdjustedStartMs, _, distributedTSTable, localTSTable := telemetrymetrics.WhichTSTableToUse(
samplesAdjustedStartMs, flooredEndMs, false, nil,
samplesAdjustedStartMs, flooredEndMs, nil,
)
distributedSamplesTable, localSamplesTable := telemetrymetrics.WhichSamplesTableToUse(
samplesAdjustedStartMs, flooredEndMs,
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, false, nil,
metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil,
)
return samplesAdjustedStartMs, flooredEndMs, tsAdjustedStartMs, distributedTSTable, localTSTable, distributedSamplesTable, localSamplesTable

View File

@@ -141,7 +141,7 @@ func (m *module) listMetrics(ctx context.Context, orgID valuer.UUID, params *met
sb.Select("DISTINCT metric_name")
if params.Start != nil && params.End != nil {
start, end, distributedTsTable, _ := telemetrymetrics.WhichTSTableToUse(uint64(*params.Start), uint64(*params.End), false, nil)
start, end, distributedTsTable, _ := telemetrymetrics.WhichTSTableToUse(uint64(*params.Start), uint64(*params.End), nil)
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, distributedTsTable))
sb.Where(sb.Between("unix_milli", start, end))
} else {
@@ -527,7 +527,7 @@ func (m *module) InspectMetrics(
return nil, err
}
tsStart, _, tsTable, _ := telemetrymetrics.WhichTSTableToUse(start, end, false, nil)
tsStart, _, tsTable, _ := telemetrymetrics.WhichTSTableToUse(start, end, nil)
tsSb := sqlbuilder.NewSelectBuilder()
tsSb.Select("fingerprint", "labels")
tsSb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, tsTable))
@@ -971,8 +971,8 @@ func (m *module) fetchMetricsStatsWithSamples(
}
}
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), false, nil)
distributedSamplesTable, _ := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, false, nil)
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
distributedSamplesTable, _ := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
countExp := telemetrymetrics.CountExpressionForSamplesTable(distributedSamplesTable)
// Timeseries counts per metric
@@ -1100,7 +1100,7 @@ func (m *module) computeTimeseriesTreemap(ctx context.Context, req *metricsexplo
}
}
start, end, distributedTsTable, _ := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), false, nil)
start, end, distributedTsTable, _ := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
totalTSBuilder := sqlbuilder.NewSelectBuilder()
totalTSBuilder.Select("uniq(fingerprint) AS total_time_series")
@@ -1176,8 +1176,8 @@ func (m *module) computeSamplesTreemap(ctx context.Context, req *metricsexplorer
}
}
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), false, nil)
distributedSamplesTable, _ := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, false, nil)
start, end, distributedTsTable, localTsTable := telemetrymetrics.WhichTSTableToUse(uint64(req.Start), uint64(req.End), nil)
distributedSamplesTable, _ := telemetrymetrics.WhichSamplesTableToUse(uint64(req.Start), uint64(req.End), metrictypes.UnspecifiedType, metrictypes.TimeAggregationUnspecified, nil)
countExp := telemetrymetrics.CountExpressionForSamplesTable(distributedSamplesTable)
candidateLimit := req.Limit + 50

View File

@@ -114,10 +114,6 @@ func validateAndApplyDefaultExportLimits(queries []qbtypes.QueryEnvelope) error
return errors.NewInvalidInputf(errors.CodeInvalidInput, "limit cannot be more than %d", MaxExportRowCountLimit)
}
queries[idx].SetLimit(limit)
if queries[idx].GetOffset() < 0 {
return errors.NewInvalidInputf(errors.CodeInvalidInput, "offset must be non-negative")
}
}
return nil
}

View File

@@ -70,13 +70,12 @@ func exportRawDataForSingleQuery(querier querier.Querier, ctx context.Context, o
queries := rangeRequest.CompositeQuery.Queries
rowCountLimit := queries[queryIndex].GetLimit()
startingOffset := queries[queryIndex].GetOffset()
rowCount := 0
for rowCount < rowCountLimit {
chunkSize := min(ChunkSize, rowCountLimit-rowCount)
queries[queryIndex].SetLimit(chunkSize)
queries[queryIndex].SetOffset(startingOffset + rowCount)
queries[queryIndex].SetOffset(rowCount)
response, err := querier.QueryRange(ctx, orgID, rangeRequest)
if err != nil {

View File

@@ -18,7 +18,7 @@ func NewGetter(store serviceaccounttypes.Store) serviceaccount.Getter {
return &getter{store: store}
}
func (getter *getter) OnBeforeRoleDelete(ctx context.Context, orgID valuer.UUID, roleID valuer.UUID, _ string) error {
func (getter *getter) OnBeforeRoleDelete(ctx context.Context, orgID valuer.UUID, roleID valuer.UUID) error {
serviceAccounts, err := getter.store.GetServiceAccountsByOrgIDAndRoleID(ctx, orgID, roleID)
if err != nil {
return err

View File

@@ -13,7 +13,7 @@ import (
type Getter interface {
// OnBeforeRoleDelete checks if any service accounts are assigned to the role and rejects deletion if so.
OnBeforeRoleDelete(ctx context.Context, orgID valuer.UUID, roleID valuer.UUID, roleName string) error
OnBeforeRoleDelete(ctx context.Context, orgID valuer.UUID, roleID valuer.UUID) error
}
type Module interface {

View File

@@ -9,7 +9,6 @@ import (
"time"
"github.com/SigNoz/signoz/pkg/authn"
"github.com/SigNoz/signoz/pkg/authz"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/authdomain"
@@ -30,10 +29,9 @@ type module struct {
authDomain authdomain.Module
tokenizer tokenizer.Tokenizer
orgGetter organization.Getter
authz authz.AuthZ
}
func NewModule(providerSettings factory.ProviderSettings, authNs map[authtypes.AuthNProvider]authn.AuthN, userSetter user.Setter, userGetter user.Getter, authDomain authdomain.Module, tokenizer tokenizer.Tokenizer, orgGetter organization.Getter, authz authz.AuthZ) session.Module {
func NewModule(providerSettings factory.ProviderSettings, authNs map[authtypes.AuthNProvider]authn.AuthN, userSetter user.Setter, userGetter user.Getter, authDomain authdomain.Module, tokenizer tokenizer.Tokenizer, orgGetter organization.Getter) session.Module {
return &module{
settings: factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/modules/session/implsession"),
authNs: authNs,
@@ -42,7 +40,6 @@ func NewModule(providerSettings factory.ProviderSettings, authNs map[authtypes.A
authDomain: authDomain,
tokenizer: tokenizer,
orgGetter: orgGetter,
authz: authz,
}
}
@@ -146,23 +143,15 @@ func (module *module) CreateCallbackAuthNSession(ctx context.Context, authNProvi
}
roleMapping := authDomain.AuthDomainConfig().RoleMapping
roleAttributeExists := false
if roleMapping != nil && roleMapping.UseRoleAttribute && callbackIdentity.Role != "" {
_, err := module.authz.GetByOrgIDAndName(ctx, callbackIdentity.OrgID, authtypes.NormalizeRoleName(callbackIdentity.Role))
if err == nil {
roleAttributeExists = true
}
}
roleNames := roleMapping.NewRolesFromCallbackIdentity(callbackIdentity, roleAttributeExists)
role := roleMapping.NewRoleFromCallbackIdentity(callbackIdentity)
signozManagedRole := authtypes.MustGetSigNozManagedRoleFromExistingRole(role)
newUser, err := types.NewUser(callbackIdentity.Name, callbackIdentity.Email, callbackIdentity.OrgID, types.UserStatusActive)
if err != nil {
return "", err
}
newUser, err = module.userSetter.GetOrCreateUser(ctx, newUser, user.WithRoleNames(roleNames))
newUser, err = module.userSetter.GetOrCreateUser(ctx, newUser, user.WithRoleNames([]string{signozManagedRole}))
if err != nil {
return "", err
}

View File

@@ -239,7 +239,7 @@ func (module *getter) VerifyResetPasswordToken(ctx context.Context, token string
return nil
}
func (module *getter) OnBeforeRoleDelete(ctx context.Context, orgID valuer.UUID, roleID valuer.UUID, _ string) error {
func (module *getter) OnBeforeRoleDelete(ctx context.Context, orgID valuer.UUID, roleID valuer.UUID) error {
users, err := module.GetUsersByOrgIDAndRoleID(ctx, orgID, roleID)
if err != nil {
return err

View File

@@ -96,7 +96,7 @@ type Getter interface {
GetUsersByOrgIDAndRoleID(ctx context.Context, orgID valuer.UUID, roleID valuer.UUID) ([]*types.User, error)
// OnBeforeRoleDelete checks if any users are assigned to the role and rejects deletion if so.
OnBeforeRoleDelete(ctx context.Context, orgID valuer.UUID, roleID valuer.UUID, roleName string) error
OnBeforeRoleDelete(ctx context.Context, orgID valuer.UUID, roleID valuer.UUID) error
// VerifyResetPasswordToken checks if a reset password token exists and is not expired.
VerifyResetPasswordToken(ctx context.Context, token string) error

View File

@@ -91,22 +91,13 @@ func (q *builderQuery[T]) Fingerprint() string {
if a.ComparisonSpaceAggregationParam != nil {
spaceAggParamStr = a.ComparisonSpaceAggregationParam.StringValue()
}
part := fmt.Sprintf("%s:%s:%s:%s:%s",
aggParts = append(aggParts, fmt.Sprintf("%s:%s:%s:%s:%s",
a.MetricName,
a.Temporality.StringValue(),
a.TimeAggregation.StringValue(),
a.SpaceAggregation.StringValue(),
spaceAggParamStr,
)
if a.Reduced {
oneDay := uint64(24 * time.Hour.Milliseconds())
route := "reduced"
if q.toMS-q.fromMS < oneDay && q.fromMS >= uint64(time.Now().UnixMilli())-oneDay {
route = "buffer"
}
part += ":" + route
}
aggParts = append(aggParts, part)
))
}
}
parts = append(parts, fmt.Sprintf("aggs=[%s]", strings.Join(aggParts, ",")))

View File

@@ -111,7 +111,7 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
// We need to set if it is unspecified or adjust it if value is not within recommended range
intervalWarnings := q.adjustStepInterval(req.CompositeQuery.Queries, req.Start, req.End)
missingMetricQueries, metricWarnings, err := q.resolveMetricMetadata(ctx, orgID, req.CompositeQuery.Queries, req.Start, req.End)
missingMetricQueries, metricWarnings, err := q.resolveMetricMetadata(ctx, req.CompositeQuery.Queries, req.Start, req.End)
if err != nil {
return nil, err
}
@@ -320,7 +320,7 @@ func (q *querier) populateQBEvent(event *qbtypes.QBEvent, queries []qbtypes.Quer
// resolved: never-seen metrics and dormant metrics (seen but no data in
// the query window).
// - err: Internal when a metadata fetch fails.
func (q *querier) resolveMetricMetadata(ctx context.Context, orgID valuer.UUID, queries []qbtypes.QueryEnvelope, start, end uint64) (missingMetricQueries []string, metricWarnings []string, err error) {
func (q *querier) resolveMetricMetadata(ctx context.Context, queries []qbtypes.QueryEnvelope, start, end uint64) (missingMetricQueries []string, metricWarnings []string, err error) {
metricNames := make([]string, 0)
for idx := range queries {
if queries[idx].Type != qbtypes.QueryTypeBuilder {
@@ -341,7 +341,7 @@ func (q *querier) resolveMetricMetadata(ctx context.Context, orgID valuer.UUID,
return nil, nil, nil
}
metricTemporality, metricTypes, reducedMetricsSet, err := q.metadataStore.FetchTemporalityAndTypeMulti(ctx, orgID, start, end, metricNames...)
metricTemporality, metricTypes, err := q.metadataStore.FetchTemporalityAndTypeMulti(ctx, start, end, metricNames...)
if err != nil {
q.logger.WarnContext(ctx, "failed to fetch metric temporality", errors.Attr(err), slog.Any("metrics", metricNames))
return nil, nil, errors.NewInternalf(errors.CodeInternal, "failed to fetch metrics temporality")
@@ -378,9 +378,6 @@ func (q *querier) resolveMetricMetadata(ctx context.Context, orgID valuer.UUID,
if err := spec.Aggregations[i].ValidateForType(); err != nil {
return nil, nil, err
}
if reducedMetricsSet[spec.Aggregations[i].MetricName] {
spec.Aggregations[i].Reduced = true
}
presentAggregations = append(presentAggregations, spec.Aggregations[i])
}
if len(presentAggregations) == 0 {

View File

@@ -56,6 +56,17 @@ func QueryStringToKeysSelectors(query string) []*telemetrytypes.FieldKeySelector
FieldDataType: key.FieldDataType,
})
}
// todo(tushar): consider reverting changes done to this method in below PR to avoid scope specific checks
// https://github.com/SigNoz/signoz/issues/11374
if key.FieldContext == telemetrytypes.FieldContextScope {
keys = append(keys, &telemetrytypes.FieldKeySelector{
Name: key.FieldContext.StringValue() + "." + key.Name,
Signal: key.Signal,
FieldContext: telemetrytypes.FieldContextUnspecified, // this allows 'scope.' prefix for keys with other context as well
FieldDataType: key.FieldDataType,
})
}
}
}

View File

@@ -72,6 +72,23 @@ func TestQueryToKeys(t *testing.T) {
},
},
},
{
query: `scope.version = '1.0.0'`,
expectedKeys: []telemetrytypes.FieldKeySelector{
{
Name: "version",
Signal: telemetrytypes.SignalUnspecified,
FieldContext: telemetrytypes.FieldContextScope,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
{
Name: "scope.version",
Signal: telemetrytypes.SignalUnspecified,
FieldContext: telemetrytypes.FieldContextUnspecified,
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
},
},
},
}
for _, testCase := range testCases {

View File

@@ -128,7 +128,6 @@ func NewModules(
}
userSetter := impluser.NewSetter(impluser.NewStore(sqlstore, providerSettings), tokenizer, emailing, providerSettings, orgSetter, authz, analytics, config.User, userRoleStore, userGetter, onDeleteUser)
ruleStore := sqlrulestore.NewRuleStore(sqlstore, queryParser, providerSettings)
authDomainModule := implauthdomain.NewModule(implauthdomain.NewStore(sqlstore), authNs, authz)
return Modules{
OrgGetter: orgGetter,
@@ -143,8 +142,8 @@ func NewModules(
QuickFilter: quickfilter,
TraceFunnel: impltracefunnel.NewModule(impltracefunnel.NewStore(sqlstore)),
RawDataExport: implrawdataexport.NewModule(querier),
AuthDomain: authDomainModule,
Session: implsession.NewModule(providerSettings, authNs, userSetter, userGetter, authDomainModule, tokenizer, orgGetter, authz),
AuthDomain: implauthdomain.NewModule(implauthdomain.NewStore(sqlstore), authNs),
Session: implsession.NewModule(providerSettings, authNs, userSetter, userGetter, implauthdomain.NewModule(implauthdomain.NewStore(sqlstore), authNs), tokenizer, orgGetter),
SpanPercentile: implspanpercentile.NewModule(querier, providerSettings),
Services: implservices.NewModule(querier, telemetryStore),
MetricsExplorer: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, ruleStore, dashboard, providerSettings, config.MetricsExplorer),

View File

@@ -215,7 +215,6 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewRecreateUserDashboardPreferenceFactory(sqlstore, sqlschema),
sqlmigration.NewMigrateRecurrenceBoundsFactory(sqlstore),
sqlmigration.NewAddDashboardViewFactory(sqlstore, sqlschema),
sqlmigration.NewMigrateSSORoleMappingNamesFactory(sqlstore),
)
}

View File

@@ -24,7 +24,6 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/meterreporter"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/cloudintegration"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -350,13 +349,10 @@ func New(
// Initialize service account getter
serviceAccountGetter := implserviceaccount.NewGetter(implserviceaccount.NewStore(sqlstore))
authDomainGetter := implauthdomain.NewGetter(implauthdomain.NewStore(sqlstore))
// Build pre-delete callbacks from modules
onBeforeRoleDelete := []authz.OnBeforeRoleDelete{
userGetter.OnBeforeRoleDelete,
serviceAccountGetter.OnBeforeRoleDelete,
authDomainGetter.OnBeforeRoleDelete,
}
// Initialize authz

View File

@@ -1,127 +0,0 @@
package sqlmigration
import (
"context"
"encoding/json"
"log/slog"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type migrateSSORoleMappingNames struct {
sqlstore sqlstore.SQLStore
logger *slog.Logger
}
type authDomainRow struct {
bun.BaseModel `bun:"table:auth_domain"`
ID string `bun:"id"`
Data string `bun:"data"`
}
var legacyRoleToManagedRoleName = map[string]string{
"ADMIN": "signoz-admin",
"EDITOR": "signoz-editor",
"VIEWER": "signoz-viewer",
}
type ssoRoleMapping struct {
DefaultRole string `json:"defaultRole"`
GroupMappings map[string]string `json:"groupMappings"`
UseRoleAttribute bool `json:"useRoleAttribute"`
}
func NewMigrateSSORoleMappingNamesFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(
factory.MustNewName("migrate_sso_role_mapping_names"),
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return &migrateSSORoleMappingNames{sqlstore: sqlstore, logger: ps.Logger}, nil
},
)
}
func (migration *migrateSSORoleMappingNames) Register(migrations *migrate.Migrations) error {
return migrations.Register(migration.Up, migration.Down)
}
func (migration *migrateSSORoleMappingNames) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
rows := make([]*authDomainRow, 0)
if err := tx.NewSelect().Model(&rows).Scan(ctx); err != nil {
return err
}
for _, row := range rows {
config := make(map[string]json.RawMessage)
if err := json.Unmarshal([]byte(row.Data), &config); err != nil {
migration.logger.WarnContext(ctx, "skipping auth domain with unreadable data", slog.String("auth_domain_id", row.ID), errors.Attr(err))
continue
}
roleMappingRaw, ok := config["roleMapping"]
if !ok || string(roleMappingRaw) == "null" {
continue
}
var roleMapping ssoRoleMapping
if err := json.Unmarshal(roleMappingRaw, &roleMapping); err != nil {
migration.logger.WarnContext(ctx, "skipping auth domain with unreadable role mapping", slog.String("auth_domain_id", row.ID), errors.Attr(err))
continue
}
changed := false
if managed, ok := legacyRoleToManagedRoleName[strings.ToUpper(roleMapping.DefaultRole)]; ok {
roleMapping.DefaultRole = managed
changed = true
}
for group, role := range roleMapping.GroupMappings {
if managed, ok := legacyRoleToManagedRoleName[strings.ToUpper(role)]; ok {
roleMapping.GroupMappings[group] = managed
changed = true
}
}
if !changed {
continue
}
newRoleMapping, err := json.Marshal(roleMapping)
if err != nil {
return err
}
config["roleMapping"] = newRoleMapping
newData, err := json.Marshal(config)
if err != nil {
return err
}
if _, err := tx.NewUpdate().
Model((*authDomainRow)(nil)).
Set("data = ?", string(newData)).
Where("id = ?", row.ID).
Exec(ctx); err != nil {
return err
}
}
return tx.Commit()
}
func (migration *migrateSSORoleMappingNames) Down(context.Context, *bun.DB) error {
return nil
}

View File

@@ -200,7 +200,7 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
`CASE
// WHEN tagType = 'spanfield' THEN 1
WHEN tagType = 'resource' THEN 2
// WHEN tagType = 'scope' THEN 3
WHEN tagType = 'scope' THEN 3
WHEN tagType = 'tag' THEN 4
ELSE 5
END as priority`,
@@ -2136,12 +2136,12 @@ func (t *telemetryMetaStore) GetAllValues(ctx context.Context, fieldValueSelecto
return values, complete, nil
}
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error) {
func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error) {
if metricName == "" {
return metrictypes.Unknown, errors.Newf(errors.TypeInternal, errors.CodeInternal, "metric name cannot be empty")
}
temporalityMap, err := t.FetchTemporalityMulti(ctx, orgID, queryTimeRangeStartTs, queryTimeRangeEndTs, metricName)
temporalityMap, err := t.FetchTemporalityMulti(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, metricName)
if err != nil {
return metrictypes.Unknown, err
}
@@ -2154,27 +2154,25 @@ func (t *telemetryMetaStore) FetchTemporality(ctx context.Context, orgID valuer.
return temporality, nil
}
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error) {
temporalities, _, _, err := t.FetchTemporalityAndTypeMulti(ctx, orgID, queryTimeRangeStartTs, queryTimeRangeEndTs, metricNames...)
func (t *telemetryMetaStore) FetchTemporalityMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error) {
temporalities, _, err := t.FetchTemporalityAndTypeMulti(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, metricNames...)
return temporalities, err
}
func (t *telemetryMetaStore) FetchTemporalityAndTypeMulti(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, map[string]bool, error) {
func (t *telemetryMetaStore) FetchTemporalityAndTypeMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error) {
if len(metricNames) == 0 {
return make(map[string]metrictypes.Temporality), make(map[string]metrictypes.Type), make(map[string]bool), nil
return make(map[string]metrictypes.Temporality), make(map[string]metrictypes.Type), nil
}
reductionEnabled := t.fl.BooleanOrEmpty(ctx, flagger.FeatureEnableMetricsReduction, featuretypes.NewFlaggerEvaluationContext(orgID))
temporalities := make(map[string]metrictypes.Temporality)
types := make(map[string]metrictypes.Type)
metricsTemporality, metricTypes, reduced, err := t.fetchMetricsTemporalityAndType(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, reductionEnabled, metricNames...)
metricsTemporality, metricTypes, err := t.fetchMetricsTemporalityAndType(ctx, queryTimeRangeStartTs, queryTimeRangeEndTs, metricNames...)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
meterMetricsTemporality, meterMetricsTypes, err := t.fetchMeterSourceMetricsTemporalityAndType(ctx, metricNames...)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
// For metrics not found in the database, set to Unknown
@@ -2199,10 +2197,10 @@ func (t *telemetryMetaStore) FetchTemporalityAndTypeMulti(ctx context.Context, o
}
}
return temporalities, types, reduced, nil
return temporalities, types, nil
}
func (t *telemetryMetaStore) fetchMetricsTemporalityAndType(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, reductionEnabled bool, metricNames ...string) (map[string][]metrictypes.Temporality, map[string]metrictypes.Type, map[string]bool, error) {
func (t *telemetryMetaStore) fetchMetricsTemporalityAndType(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string][]metrictypes.Temporality, map[string]metrictypes.Type, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.TelemetrySignal: telemetrytypes.SignalMetrics.StringValue(),
instrumentationtypes.CodeNamespace: "metadata",
@@ -2210,58 +2208,48 @@ func (t *telemetryMetaStore) fetchMetricsTemporalityAndType(ctx context.Context,
})
temporalities := make(map[string][]metrictypes.Temporality)
types := make(map[string]metrictypes.Type)
reduced := make(map[string]bool)
adjustedStartTs, adjustedEndTs, tsTableName, _ := telemetrymetrics.WhichTSTableToUse(queryTimeRangeStartTs, queryTimeRangeEndTs, false, nil)
adjustedStartTs, adjustedEndTs, tsTableName, _ := telemetrymetrics.WhichTSTableToUse(queryTimeRangeStartTs, queryTimeRangeEndTs, nil)
cols := []string{"metric_name", "temporality", "any(type) AS type", "any(is_monotonic) as is_monotonic"}
// Build query to fetch temporality for all metrics
// We use attr_string_value where attr_name = '__temporality__'
// Note: The columns are mixed in the current data - temporality column contains metric_name
// and metric_name column contains temporality value, so we use the correct mapping
sb := sqlbuilder.Select(
"metric_name",
"temporality",
"any(type) AS type",
"any(is_monotonic) as is_monotonic",
).
From(t.metricsDBName + "." + tsTableName)
// When reduction is enabled, fold the reduced-catalog presence check into the
// same query so a metric's reduced status comes back in one round trip.
var reducedArgs []any
if reductionEnabled {
rs := sqlbuilder.NewSelectBuilder()
rs.Select("metric_name")
rs.From(t.metricsDBName + "." + telemetrymetrics.TimeseriesV4ReducedTableName)
rs.Where(rs.In("metric_name", metricNames), rs.GTE("unix_milli", adjustedStartTs), rs.LT("unix_milli", adjustedEndTs))
rs.GroupBy("metric_name")
rsQuery, rsArgs := rs.BuildWithFlavor(sqlbuilder.ClickHouse)
cols = append(cols, fmt.Sprintf("metric_name GLOBAL IN (%s) AS reduced", rsQuery))
reducedArgs = rsArgs
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select(cols...)
sb.From(t.metricsDBName + "." + tsTableName)
// Filter by metric names (in the temporality column due to data mix-up)
sb.Where(
sb.In("metric_name", metricNames),
sb.GTE("unix_milli", adjustedStartTs),
sb.LT("unix_milli", adjustedEndTs),
)
sb.GroupBy("metric_name", "temporality")
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, reducedArgs...)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
t.logger.DebugContext(ctx, "fetching metric temporality", slog.String("query", query), slog.Any("args", args))
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to fetch metric temporality")
return nil, nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to fetch metric temporality")
}
defer rows.Close()
// Process results
for rows.Next() {
var metricName string
var temporality metrictypes.Temporality
var metricType metrictypes.Type
var isMonotonic bool
var isReduced uint8
dest := []any{&metricName, &temporality, &metricType, &isMonotonic}
if reductionEnabled {
dest = append(dest, &isReduced)
}
if err := rows.Scan(dest...); err != nil {
return nil, nil, nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan temporality result")
if err := rows.Scan(&metricName, &temporality, &metricType, &isMonotonic); err != nil {
return nil, nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to scan temporality result")
}
if temporality != metrictypes.Unknown {
temporalities[metricName] = append(temporalities[metricName], temporality)
@@ -2270,15 +2258,12 @@ func (t *telemetryMetaStore) fetchMetricsTemporalityAndType(ctx context.Context,
metricType = metrictypes.GaugeType
}
types[metricName] = metricType
if isReduced != 0 {
reduced[metricName] = true
}
}
if err := rows.Err(); err != nil {
return nil, nil, nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "error iterating over metrics temporality rows")
return nil, nil, errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "error iterating over metrics temporality rows")
}
return temporalities, types, reduced, nil
return temporalities, types, nil
}
func (t *telemetryMetaStore) fetchMeterSourceMetricsTemporalityAndType(ctx context.Context, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error) {

View File

@@ -1,157 +0,0 @@
package telemetrymetrics
import (
"context"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
)
func reducedQuery(metric string, ty metrictypes.Type, temp metrictypes.Temporality, ta metrictypes.TimeAggregation, sa metrictypes.SpaceAggregation) qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation] {
return qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
StepInterval: qbtypes.Step{Duration: 5 * time.Minute},
Aggregations: []qbtypes.MetricAggregation{{
MetricName: metric,
Type: ty,
Temporality: temp,
TimeAggregation: ta,
SpaceAggregation: sa,
Reduced: true,
}},
}
}
func TestReducedStatementBuilder(t *testing.T) {
cases := []struct {
name string
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
expected qbtypes.Statement
}{
{
name: "gauge_sum_latest",
query: reducedQuery("test.metric", metrictypes.GaugeType, metrictypes.Unspecified, metrictypes.TimeAggregationLatest, metrictypes.SpaceAggregationSum),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, anyLast(last) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, argMax(value, unix_milli) AS per_series_value FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`sum_last`, computed_at) AS value FROM signoz_metrics.distributed_samples_v4_reduced_last_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "unspecified", false, "test.metric", uint64(1746999900000), uint64(1747172760000), 0, "test.metric", uint64(1746999900000), uint64(1747172760000), "test.metric", uint64(1746999900000), uint64(1747172760000), false},
},
},
{
name: "gauge_avg_avg",
query: reducedQuery("test.metric", metrictypes.GaugeType, metrictypes.Unspecified, metrictypes.TimeAggregationAvg, metrictypes.SpaceAggregationAvg),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, sum(sum) / sum(count) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, avg(value) AS per_series_value, avg(weight) AS per_series_weight FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`sum_last`, computed_at) AS value, argMax(`count_series`, computed_at) AS weight FROM signoz_metrics.distributed_samples_v4_reduced_last_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) / sum(per_series_weight) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "unspecified", false, "test.metric", uint64(1746999900000), uint64(1747172760000), 0, "test.metric", uint64(1746999900000), uint64(1747172760000), "test.metric", uint64(1746999900000), uint64(1747172760000), false},
},
},
{
name: "gauge_min_min",
query: reducedQuery("test.metric", metrictypes.GaugeType, metrictypes.Unspecified, metrictypes.TimeAggregationMin, metrictypes.SpaceAggregationMin),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, min(min) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, min(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, min(value) AS per_series_value FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`min`, computed_at) AS value FROM signoz_metrics.distributed_samples_v4_reduced_last_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, min(per_series_value) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "unspecified", false, "test.metric", uint64(1746999900000), uint64(1747172760000), 0, "test.metric", uint64(1746999900000), uint64(1747172760000), "test.metric", uint64(1746999900000), uint64(1747172760000), false},
},
},
{
name: "gauge_max_max",
query: reducedQuery("test.metric", metrictypes.GaugeType, metrictypes.Unspecified, metrictypes.TimeAggregationMax, metrictypes.SpaceAggregationMax),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, max(max) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, max(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, max(value) AS per_series_value FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`max`, computed_at) AS value FROM signoz_metrics.distributed_samples_v4_reduced_last_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, max(per_series_value) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "unspecified", false, "test.metric", uint64(1746999900000), uint64(1747172760000), 0, "test.metric", uint64(1746999900000), uint64(1747172760000), "test.metric", uint64(1746999900000), uint64(1747172760000), false},
},
},
{
name: "counter_sum_rate",
query: reducedQuery("test.metric.sum", metrictypes.SumType, metrictypes.Cumulative, metrictypes.TimeAggregationRate, metrictypes.SpaceAggregationSum),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT ts, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, max(max) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, sum(value) / 300 AS per_series_value FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`sum`, computed_at) AS value FROM signoz_metrics.distributed_samples_v4_reduced_sum_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric.sum", uint64(1746921600000), uint64(1747172760000), "cumulative", false, "test.metric.sum", uint64(1746999600000), uint64(1747172760000), 0, "test.metric.sum", uint64(1746999600000), uint64(1747172760000), "test.metric.sum", uint64(1746999600000), uint64(1747172760000), false},
},
},
{
name: "counter_avg_increase",
query: reducedQuery("test.metric", metrictypes.SumType, metrictypes.Cumulative, metrictypes.TimeAggregationIncrease, metrictypes.SpaceAggregationAvg),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT ts, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value, per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, max(max) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, sum(value) AS per_series_value, avg(weight) AS per_series_weight FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`sum`, computed_at) AS value, argMax(`count_series`, computed_at) AS weight FROM signoz_metrics.distributed_samples_v4_reduced_sum_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) / sum(per_series_weight) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "cumulative", false, "test.metric", uint64(1746999600000), uint64(1747172760000), 0, "test.metric", uint64(1746999600000), uint64(1747172760000), "test.metric", uint64(1746999600000), uint64(1747172760000), false},
},
},
{
name: "counter_min_omitted",
query: reducedQuery("test.metric", metrictypes.SumType, metrictypes.Cumulative, metrictypes.TimeAggregationRate, metrictypes.SpaceAggregationMin),
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, max(max) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, min(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "cumulative", false, "test.metric", uint64(1746999600000), uint64(1747172760000), 0},
},
},
{
name: "counter_max_omitted",
query: reducedQuery("test.metric", metrictypes.SumType, metrictypes.Cumulative, metrictypes.TimeAggregationRate, metrictypes.SpaceAggregationMax),
expected: qbtypes.Statement{
Query: "WITH __temporal_aggregation_cte AS (SELECT ts, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, max(max) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, max(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "cumulative", false, "test.metric", uint64(1746999600000), uint64(1747172760000), 0},
},
},
{
name: "histogram_p99",
query: reducedQuery("test.metric.bucket", metrictypes.HistogramType, metrictypes.Cumulative, metrictypes.TimeAggregationUnspecified, metrictypes.SpaceAggregationPercentile99),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT ts, `le`, multiIf(row_number() OVER rate_window = 1, nan, (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) < 0, per_series_value / (ts - lagInFrame(ts, 1) OVER rate_window), (per_series_value - lagInFrame(per_series_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, `le`, max(max) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts, `le` ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts, `le`) SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) AS value FROM __spatial_aggregation_cte GROUP BY ts ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, `le`, sum(value) / 300 AS per_series_value FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`sum`, computed_at) AS value FROM signoz_metrics.distributed_samples_v4_reduced_sum_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint, `le`) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts, `le`), __spatial_aggregation_cte AS (SELECT ts, `le`, sum(per_series_value) AS value FROM __temporal_aggregation_cte GROUP BY ts, `le`) SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) AS value FROM __spatial_aggregation_cte GROUP BY ts ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric.bucket", uint64(1746921600000), uint64(1747172760000), "cumulative", false, "test.metric.bucket", uint64(1746999900000), uint64(1747172760000), 0, "test.metric.bucket", uint64(1746999900000), uint64(1747172760000), "test.metric.bucket", uint64(1746999900000), uint64(1747172760000), false},
},
},
{
name: "summary_avg",
query: reducedQuery("test.metric", metrictypes.SummaryType, metrictypes.Unspecified, metrictypes.TimeAggregationAvg, metrictypes.SpaceAggregationAvg),
expected: qbtypes.Statement{
Query: "SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, sum(sum) / sum(count) AS per_series_value FROM signoz_metrics.distributed_samples_v4_agg_5m AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY fingerprint, ts ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, avg(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) UNION ALL SELECT * FROM (WITH __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(300)) AS ts, avg(value) AS per_series_value, avg(weight) AS per_series_weight FROM (SELECT reduced_fingerprint AS fingerprint, unix_milli, argMax(`sum_last`, computed_at) AS value, argMax(`count_series`, computed_at) AS weight FROM signoz_metrics.distributed_samples_v4_reduced_last_60s WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY reduced_fingerprint, unix_milli) AS points INNER JOIN (SELECT fingerprint FROM signoz_metrics.distributed_time_series_v4_reduced WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? GROUP BY fingerprint) AS filtered_time_series ON points.fingerprint = filtered_time_series.fingerprint GROUP BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, sum(per_series_value) / sum(per_series_weight) AS value FROM __temporal_aggregation_cte GROUP BY ts) SELECT * FROM __spatial_aggregation_cte ORDER BY ts) ORDER BY ts",
Args: []any{"test.metric", uint64(1746921600000), uint64(1747172760000), "unspecified", false, "test.metric", uint64(1746999900000), uint64(1747172760000), 0, "test.metric", uint64(1746999900000), uint64(1747172760000), "test.metric", uint64(1746999900000), uint64(1747172760000), false},
},
},
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
fl, err := flagger.New(context.Background(), instrumentationtest.New().ToProviderSettings(), flagger.Config{}, flagger.MustNewRegistry())
require.NoError(t, err)
sb := NewMetricQueryStatementBuilder(instrumentationtest.New().ToProviderSettings(), telemetrytypestest.NewMockMetadataStore(), fm, cb, fl)
const start, end = uint64(1747000000000), uint64(1747172800000)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got, err := sb.Build(context.Background(), start, end, qbtypes.RequestTypeTimeSeries, c.query, nil)
require.NoError(t, err)
require.Equal(t, c.expected.Query, got.Query)
require.Equal(t, c.expected.Args, got.Args)
})
}
t.Run("buffer_recent_window", func(t *testing.T) {
now := time.Now().UnixMilli()
q := reducedQuery("test.metric", metrictypes.GaugeType, metrictypes.Unspecified, metrictypes.TimeAggregationLatest, metrictypes.SpaceAggregationSum)
got, err := sb.Build(context.Background(), uint64(now-2*time.Hour.Milliseconds()), uint64(now), qbtypes.RequestTypeTimeSeries, q, nil)
require.NoError(t, err)
require.Contains(t, got.Query, "signoz_metrics.distributed_samples_v4_buffer")
require.Contains(t, got.Query, "signoz_metrics.time_series_v4_buffer")
require.Contains(t, got.Query, "is_reduced")
require.NotContains(t, got.Query, "UNION ALL")
})
t.Run("not_reduced", func(t *testing.T) {
q := reducedQuery("test.metric", metrictypes.GaugeType, metrictypes.Unspecified, metrictypes.TimeAggregationLatest, metrictypes.SpaceAggregationSum)
q.Aggregations[0].Reduced = false
got, err := sb.Build(context.Background(), start, end, qbtypes.RequestTypeTimeSeries, q, nil)
require.NoError(t, err)
require.NotContains(t, got.Query, "UNION ALL")
require.NotContains(t, got.Query, "reduced")
require.NotContains(t, got.Query, "buffer")
})
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"time"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/flagger"
@@ -178,30 +177,19 @@ func (b *MetricQueryStatementBuilder) buildPipelineStatement(
query.Aggregations[0].SpaceAggregation = metrictypes.SpaceAggregationSum
}
agg := query.Aggregations[0]
// A reduced metric reads the raw buffer for recent short windows, and
// samples_v4/agg (unioned with the reduced tables) otherwise. The buffer is
// shaped exactly like samples_v4 / time_series_v4, so once the table names are
// chosen the rest of the pipeline is unchanged.
useBuffer := agg.Reduced &&
end-start < oneDayInMilliseconds &&
start >= uint64(time.Now().UnixMilli())-oneDayInMilliseconds
samplesTable, _ := WhichSamplesTableToUse(start, end, agg.Type, agg.TimeAggregation, useBuffer, agg.TableHints)
tsStart, tsEnd, _, tsTable := WhichTSTableToUse(start, end, useBuffer, agg.TableHints)
var timeSeriesCTE string
var timeSeriesCTEArgs []any
var err error
if timeSeriesCTE, timeSeriesCTEArgs, err = b.buildTimeSeriesCTE(ctx, tsStart, tsEnd, query, keys, variables, tsTable); err != nil {
// time_series_cte
// this is applicable for all the queries
if timeSeriesCTE, timeSeriesCTEArgs, err = b.buildTimeSeriesCTE(ctx, start, end, query, keys, variables); err != nil {
return nil, err
}
if qbtypes.CanShortCircuitDelta(query.Aggregations[0]) {
// spatial_aggregation_cte directly for certain delta queries
if frag, args, err := b.buildTemporalAggDeltaFastPath(start, end, query, samplesTable, timeSeriesCTE, timeSeriesCTEArgs); err != nil {
if frag, args, err := b.buildTemporalAggDeltaFastPath(start, end, query, timeSeriesCTE, timeSeriesCTEArgs); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
@@ -209,7 +197,7 @@ func (b *MetricQueryStatementBuilder) buildPipelineStatement(
}
} else {
// temporal_aggregation_cte
if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys, samplesTable, timeSeriesCTE, timeSeriesCTEArgs); err != nil {
if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys, timeSeriesCTE, timeSeriesCTEArgs); err != nil {
return nil, err
} else if frag != "" {
cteFragments = append(cteFragments, frag)
@@ -223,188 +211,18 @@ func (b *MetricQueryStatementBuilder) buildPipelineStatement(
}
}
var reducedFragments []string
var reducedArgs [][]any
if agg.Reduced && !useBuffer {
var tsCTE string
var tsArgs []any
if tsCTE, tsArgs, err = b.buildReducedTimeSeriesCTE(ctx, start, end, query, keys, variables); err != nil {
return nil, err
}
if temporalFrag, temporalArgs, ok := b.buildReducedTemporalAggregationCTE(start, end, query, tsCTE, tsArgs); ok {
spatialFrag, spatialArgs := b.buildReducedSpatialAggregationCTE(query)
reducedFragments = []string{temporalFrag, spatialFrag}
reducedArgs = [][]any{temporalArgs, spatialArgs}
}
}
// reset the query to the original state
query.Aggregations[0].SpaceAggregation = origSpaceAgg
query.Aggregations[0].TimeAggregation = origTimeAgg
query.GroupBy = origGroupBy
mainStmt, err := b.BuildFinalSelect(cteFragments, cteArgs, query)
if err != nil {
return nil, err
}
if reducedFragments == nil {
return mainStmt, nil
}
reducedStmt, err := b.BuildFinalSelect(reducedFragments, reducedArgs, query)
if err != nil {
return nil, err
}
return unionStatements(mainStmt, reducedStmt, query)
}
func unionStatements(main, reduced *qbtypes.Statement, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) (*qbtypes.Statement, error) {
orderBy := "ts"
for _, g := range query.GroupBy {
orderBy = fmt.Sprintf("`%s`, ", g.Name) + orderBy
}
q := fmt.Sprintf("SELECT * FROM (%s) UNION ALL SELECT * FROM (%s) ORDER BY %s", main.Query, reduced.Query, orderBy)
args := append(append([]any{}, main.Args...), reduced.Args...)
warnings := append(append([]string{}, main.Warnings...), reduced.Warnings...)
return &qbtypes.Statement{Query: q, Args: args, Warnings: warnings}, nil
}
func (b *MetricQueryStatementBuilder) buildReducedTimeSeriesCTE(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
) (string, []any, error) {
sb := sqlbuilder.NewSelectBuilder()
var preparedWhereClause querybuilder.PreparedWhereClause
var err error
if query.Filter != nil && query.Filter.Expression != "" {
preparedWhereClause, err = querybuilder.PrepareWhereClause(query.Filter.Expression, querybuilder.FilterExprVisitorOpts{
Context: ctx,
Logger: b.logger,
FieldMapper: b.fm,
ConditionBuilder: b.cb,
FieldKeys: keys,
FullTextColumn: &telemetrytypes.TelemetryFieldKey{Name: "labels"},
Variables: variables,
StartNs: start,
EndNs: end,
})
if err != nil {
return "", nil, err
}
}
sb.From(fmt.Sprintf("%s.%s", DBName, TimeseriesV4ReducedTableName))
sb.Select("fingerprint")
for _, g := range query.GroupBy {
col, err := b.fm.ColumnExpressionFor(ctx, start, end, &g.TelemetryFieldKey, keys)
if err != nil {
return "", nil, err
}
sb.SelectMore(col)
}
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
sb.GTE("unix_milli", start),
sb.LTE("unix_milli", end),
sb.EQ("__normalized", false),
)
if !preparedWhereClause.IsEmpty() {
sb.AddWhereClause(preparedWhereClause.WhereClause)
}
sb.GroupBy("fingerprint")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("(%s) AS filtered_time_series", q), args, nil
}
func (b *MetricQueryStatementBuilder) buildReducedTemporalAggregationCTE(
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, bool) {
agg := query.Aggregations[0]
stepSec := int64(query.StepInterval.Seconds())
value, weight, ok := ReducedValueColumn(agg.Type, agg.SpaceAggregation)
if !ok {
return "", nil, false
}
// dedup recomputed buckets: latest computed_at wins per (series, 60s bucket)
dedup := sqlbuilder.NewSelectBuilder()
dedup.Select("reduced_fingerprint AS fingerprint", "unix_milli")
dedup.SelectMore(fmt.Sprintf("argMax(%s, computed_at) AS value", value))
if weight != "" {
dedup.SelectMore(fmt.Sprintf("argMax(%s, computed_at) AS weight", weight))
}
dedup.From(fmt.Sprintf("%s.%s", DBName, WhichReducedSamplesTableToUse(agg.Type)))
dedup.Where(
dedup.In("metric_name", agg.MetricName),
dedup.GTE("unix_milli", start),
dedup.LT("unix_milli", end),
)
dedup.GroupBy("reduced_fingerprint", "unix_milli")
dedupQuery, dedupArgs := dedup.BuildWithFlavor(sqlbuilder.ClickHouse)
sb := sqlbuilder.NewSelectBuilder()
sb.Select("fingerprint")
sb.SelectMore(fmt.Sprintf("toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts", stepSec))
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.Name))
}
sb.SelectMore(fmt.Sprintf("%s AS per_series_value", ReducedTimeAggregationColumn(agg.TimeAggregation, stepSec)))
if weight != "" {
// count_series is a series count, not additive over time, so the avg
// denominator is reduced with avg
sb.SelectMore("avg(weight) AS per_series_weight")
}
sb.From(fmt.Sprintf("(%s) AS points", dedupQuery))
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
sb.GroupBy("fingerprint", "ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
initArgs := append(append([]any{}, dedupArgs...), timeSeriesCTEArgs...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse, initArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, true
}
func (b *MetricQueryStatementBuilder) buildReducedSpatialAggregationCTE(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
) (string, []any) {
spatial := "sum(per_series_value)"
switch query.Aggregations[0].SpaceAggregation {
case metrictypes.SpaceAggregationAvg:
spatial = "sum(per_series_value) / sum(per_series_weight)"
case metrictypes.SpaceAggregationMin:
spatial = "min(per_series_value)"
case metrictypes.SpaceAggregationMax:
spatial = "max(per_series_value)"
}
sb := sqlbuilder.NewSelectBuilder()
sb.Select("ts")
for _, g := range query.GroupBy {
sb.SelectMore(fmt.Sprintf("`%s`", g.Name))
}
sb.SelectMore(spatial + " AS value")
sb.From("__temporal_aggregation_cte")
sb.GroupBy("ts")
sb.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
q, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return fmt.Sprintf("__spatial_aggregation_cte AS (%s)", q), args
// final SELECT
return b.BuildFinalSelect(cteFragments, cteArgs, query)
}
func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
samplesTable string,
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
@@ -421,7 +239,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
}
aggCol, err := AggregationColumnForSamplesTable(
samplesTable, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation,
start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality,
query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints,
)
if err != nil {
return "", nil, err
@@ -438,7 +257,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDeltaFastPath(
sb.SelectMore(fmt.Sprintf("%s AS value", aggCol))
sb.From(fmt.Sprintf("%s.%s AS points", DBName, samplesTable))
tbl, _ := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
@@ -458,7 +278,6 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
keys map[string][]*telemetrytypes.TelemetryFieldKey,
variables map[string]qbtypes.VariableItem,
tsTable string,
) (string, []any, error) {
sb := sqlbuilder.NewSelectBuilder()
@@ -482,7 +301,8 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
}
}
sb.From(fmt.Sprintf("%s.%s", DBName, tsTable))
start, end, _, tbl := WhichTSTableToUse(start, end, query.Aggregations[0].TableHints)
sb.From(fmt.Sprintf("%s.%s", DBName, tbl))
sb.Select("fingerprint")
for _, g := range query.GroupBy {
@@ -508,12 +328,6 @@ func (b *MetricQueryStatementBuilder) buildTimeSeriesCTE(
sb.EQ("__normalized", false),
)
// the buffer holds both raw rows and the reduced catalog rows; the raw read
// only wants the original series
if tsTable == TimeseriesV4BufferLocalTableName {
sb.Where(sb.EQ("is_reduced", false))
}
if !preparedWhereClause.IsEmpty() {
sb.AddWhereClause(preparedWhereClause.WhereClause)
}
@@ -530,23 +344,21 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggregationCTE(
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
_ map[string][]*telemetrytypes.TelemetryFieldKey,
samplesTable string,
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
if query.Aggregations[0].Temporality == metrictypes.Delta {
return b.buildTemporalAggDelta(ctx, start, end, query, samplesTable, timeSeriesCTE, timeSeriesCTEArgs)
return b.buildTemporalAggDelta(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
} else if query.Aggregations[0].Temporality != metrictypes.Multiple {
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, samplesTable, timeSeriesCTE, timeSeriesCTEArgs)
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
}
return b.buildTemporalAggForMultipleTemporalities(ctx, start, end, query, samplesTable, timeSeriesCTE, timeSeriesCTEArgs)
return b.buildTemporalAggForMultipleTemporalities(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
}
func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
_ context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
samplesTable string,
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
@@ -563,7 +375,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
sb.SelectMore(fmt.Sprintf("`%s`", g.Name))
}
aggCol, err := AggregationColumnForSamplesTable(samplesTable, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation)
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", nil, err
}
@@ -574,7 +386,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
sb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
sb.From(fmt.Sprintf("%s.%s AS points", DBName, samplesTable))
tbl, _ := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),
@@ -593,7 +406,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
_ context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
samplesTable string,
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
@@ -609,13 +421,14 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
baseSb.SelectMore(fmt.Sprintf("`%s`", g.Name))
}
aggCol, err := AggregationColumnForSamplesTable(samplesTable, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation)
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", nil, err
}
baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, samplesTable))
tbl, _ := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
baseSb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
baseSb.Where(
baseSb.In("metric_name", query.Aggregations[0].MetricName),
@@ -659,7 +472,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
_ context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
samplesTable string,
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
@@ -674,11 +486,11 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
sb.SelectMore(fmt.Sprintf("`%s`", g.Name))
}
aggForDeltaTemporality, err := AggregationColumnForSamplesTable(samplesTable, metrictypes.Delta, query.Aggregations[0].TimeAggregation)
aggForDeltaTemporality, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, metrictypes.Delta, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", nil, err
}
aggForCumulativeTemporality, err := AggregationColumnForSamplesTable(samplesTable, metrictypes.Cumulative, query.Aggregations[0].TimeAggregation)
aggForCumulativeTemporality, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, metrictypes.Cumulative, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", nil, err
}
@@ -706,7 +518,8 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
sb.SelectMore(expr)
}
sb.From(fmt.Sprintf("%s.%s AS points", DBName, samplesTable))
tbl, _ := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
sb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
sb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
sb.Where(
sb.In("metric_name", query.Aggregations[0].MetricName),

View File

@@ -30,17 +30,6 @@ const (
TimeseriesV41weekLocalTableName = "time_series_v4_1week"
AttributesMetadataTableName = "distributed_metadata"
AttributesMetadataLocalTableName = "metadata"
// The buffer holds raw points for ~24h; the reduced tables hold 60s
// aggregates of dropped-label series.
SamplesV4BufferTableName = "distributed_samples_v4_buffer"
SamplesV4BufferLocalTableName = "samples_v4_buffer"
TimeseriesV4BufferTableName = "distributed_time_series_v4_buffer"
TimeseriesV4BufferLocalTableName = "time_series_v4_buffer"
SamplesV4ReducedLastTableName = "distributed_samples_v4_reduced_last_60s"
SamplesV4ReducedSumTableName = "distributed_samples_v4_reduced_sum_60s"
TimeseriesV4ReducedTableName = "distributed_time_series_v4_reduced"
TimeseriesV4ReducedLocalTableName = "time_series_v4_reduced"
)
var (
@@ -60,16 +49,8 @@ var (
// in that order.
func WhichTSTableToUse(
start, end uint64,
useBuffer bool,
tableHints *metrictypes.MetricTableHints,
) (uint64, uint64, string, string) {
// the buffer holds the recent raw window for reduced metrics and has the same
// shape as time_series_v4; round the start to the hour like the v4 table.
if useBuffer {
start = start - (start % (oneHourInMilliseconds))
return start, end, TimeseriesV4BufferTableName, TimeseriesV4BufferLocalTableName
}
// if we have a hint for the table, we need to use it
// the hint will be used to override the default table selection logic
if tableHints != nil {
@@ -168,20 +149,14 @@ func WhichSamplesTableToUse(
start, end uint64,
metricType metrictypes.Type,
timeAggregation metrictypes.TimeAggregation,
useBuffer bool,
tableHints *metrictypes.MetricTableHints,
) (string, string) {
// the buffer holds the recent raw window for reduced metrics; same shape as samples_v4
if useBuffer {
return SamplesV4BufferTableName, SamplesV4BufferLocalTableName
}
// if we have a hint for the table, we need to use it
// the hint will be used to override the default table selection logic.
// SamplesTableName is the distributed name; derive the local via switch.
if tableHints != nil && tableHints.SamplesTableName != "" {
switch tableHints.SamplesTableName {
case SamplesV4TableName, SamplesV4BufferTableName:
case SamplesV4TableName:
return SamplesV4TableName, SamplesV4LocalTableName
case SamplesV4Agg5mTableName:
return SamplesV4Agg5mTableName, SamplesV4Agg5mLocalTableName
@@ -213,10 +188,13 @@ func WhichSamplesTableToUse(
}
func AggregationColumnForSamplesTable(
tableName string,
start, end uint64,
metricType metrictypes.Type,
temporality metrictypes.Temporality,
timeAggregation metrictypes.TimeAggregation,
tableHints *metrictypes.MetricTableHints,
) (string, error) {
tableName, _ := WhichSamplesTableToUse(start, end, metricType, timeAggregation, tableHints)
var aggregationColumn string
switch temporality {
case metrictypes.Delta:
@@ -224,7 +202,7 @@ func AggregationColumnForSamplesTable(
// although it doesn't make sense to use anyLast, avg, min, max, count on delta metrics,
// we are keeping it here to make sure that query will not be invalid
switch tableName {
case SamplesV4TableName, SamplesV4BufferTableName:
case SamplesV4TableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(value)"
@@ -266,7 +244,7 @@ func AggregationColumnForSamplesTable(
// for cumulative metrics, we only support `RATE`/`INCREASE`. The max value in window is
// used to calculate the sum which is then divided by the window size to get the rate
switch tableName {
case SamplesV4TableName, SamplesV4BufferTableName:
case SamplesV4TableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(value)"
@@ -306,7 +284,7 @@ func AggregationColumnForSamplesTable(
}
case metrictypes.Unspecified:
switch tableName {
case SamplesV4TableName, SamplesV4BufferTableName:
case SamplesV4TableName:
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
aggregationColumn = "anyLast(value)"
@@ -354,65 +332,6 @@ func AggregationColumnForSamplesTable(
return aggregationColumn, nil
}
// WhichReducedSamplesTableToUse returns the 60s reduced samples table for a metric
// type: the last_60s table for gauge-like series, the sum_60s table for counters
// and histograms.
func WhichReducedSamplesTableToUse(metricType metrictypes.Type) string {
if metricType == metrictypes.SumType || metricType == metrictypes.HistogramType {
return SamplesV4ReducedSumTableName
}
return SamplesV4ReducedLastTableName
}
// ReducedValueColumn returns the reduced value column (and the avg-denominator
// weight) for a space aggregation. The reduced columns are pre-aggregated across
// the original series, so the space aggregation picks the underlying value; the
// sum table only has `sum`, so min/max across series have no column (ok=false).
func ReducedValueColumn(metricType metrictypes.Type, space metrictypes.SpaceAggregation) (value, weight string, ok bool) {
if metricType == metrictypes.SumType || metricType == metrictypes.HistogramType {
switch space {
case metrictypes.SpaceAggregationSum:
return "`sum`", "", true
case metrictypes.SpaceAggregationAvg:
return "`sum`", "`count_series`", true
}
return "", "", false
}
switch space {
case metrictypes.SpaceAggregationSum:
return "`sum_last`", "", true
case metrictypes.SpaceAggregationAvg:
return "`sum_last`", "`count_series`", true
case metrictypes.SpaceAggregationMin:
return "`min`", "", true
case metrictypes.SpaceAggregationMax:
return "`max`", "", true
}
return "", "", false
}
// ReducedTimeAggregationColumn applies the time aggregation to the reduced `value`
// column over the step's 60s buckets. latest uses argMax over the bucket timestamp
// (the buckets have no read order); rate divides the per-step sum by the step.
func ReducedTimeAggregationColumn(timeAggregation metrictypes.TimeAggregation, stepSec int64) string {
switch timeAggregation {
case metrictypes.TimeAggregationLatest:
return "argMax(value, unix_milli)"
case metrictypes.TimeAggregationAvg:
return "avg(value)"
case metrictypes.TimeAggregationMin:
return "min(value)"
case metrictypes.TimeAggregationMax:
return "max(value)"
case metrictypes.TimeAggregationCount:
return "count(value)"
case metrictypes.TimeAggregationRate:
return fmt.Sprintf("sum(value) / %d", stepSec)
default: // sum, increase
return "sum(value)"
}
}
func AggregationQueryForHistogramCountWithParams(param *metrictypes.ComparisonSpaceAggregationParam) (string, error) {
if param == nil {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "no aggregation param provided for histogram count")

View File

@@ -51,6 +51,7 @@ var (
ValueType: schema.ColumnTypeString,
}},
"resource": {Name: "resource", Type: schema.JSONColumnType{}},
"scope": {Name: "scope", Type: schema.JSONColumnType{}},
"events": {Name: "events", Type: schema.ArrayColumnType{
ElementType: schema.ColumnTypeString,
@@ -176,7 +177,7 @@ func (m *defaultFieldMapper) getColumn(
case telemetrytypes.FieldContextResource:
return []*schema.Column{indexV3Columns["resources_string"], indexV3Columns["resource"]}, nil
case telemetrytypes.FieldContextScope:
return []*schema.Column{}, qbtypes.ErrColumnNotFound
return []*schema.Column{indexV3Columns["scope"]}, nil
case telemetrytypes.FieldContextAttribute:
switch key.FieldDataType {
case telemetrytypes.FieldDataTypeString:
@@ -278,14 +279,24 @@ func (m *defaultFieldMapper) FieldFor(
switch column.Type.GetType() {
case schema.ColumnTypeEnumJSON:
// json is only supported for resource context as of now
if key.FieldContext != telemetrytypes.FieldContextResource {
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
}
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
// once clickHouse dependency is updated, we need to check if we can remove it.
exprs = append(exprs, fmt.Sprintf("%s.`%s`::String", columnName, key.Name))
existExpr = append(existExpr, fmt.Sprintf("%s.`%s` IS NOT NULL", columnName, key.Name))
switch key.FieldContext {
case telemetrytypes.FieldContextResource:
exprs = append(exprs, fmt.Sprintf("%s.`%s`::String", columnName, key.Name))
existExpr = append(existExpr, fmt.Sprintf("%s.`%s` IS NOT NULL", columnName, key.Name))
case telemetrytypes.FieldContextScope:
switch key.Name {
case "scope.name", "scope.version":
exprs = append(exprs, fmt.Sprintf("%s::String", key.Name))
existExpr = append(existExpr, fmt.Sprintf("%s IS NOT NULL", key.Name))
default:
exprs = append(exprs, fmt.Sprintf("%s.attributes.`%s`::String", columnName, key.Name))
existExpr = append(existExpr, fmt.Sprintf("%s.attributes.`%s` IS NOT NULL", columnName, key.Name))
}
default:
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource and scope context fields are supported for json columns, got %s", key.FieldContext.String)
}
case schema.ColumnTypeEnumString,
schema.ColumnTypeEnumUInt64,
schema.ColumnTypeEnumUInt32,

View File

@@ -82,6 +82,33 @@ func TestGetFieldKeyName(t *testing.T) {
expectedResult: "multiIf(resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, `resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, NULL)",
expectedError: nil,
},
{
name: "Scope field - scope.name",
key: telemetrytypes.TelemetryFieldKey{
Name: "scope.name",
FieldContext: telemetrytypes.FieldContextScope,
},
expectedResult: "scope.name::String",
expectedError: nil,
},
{
name: "Scope field - scope.version",
key: telemetrytypes.TelemetryFieldKey{
Name: "scope.version",
FieldContext: telemetrytypes.FieldContextScope,
},
expectedResult: "scope.version::String",
expectedError: nil,
},
{
name: "Scope field - custom attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "custom.attr",
FieldContext: telemetrytypes.FieldContextScope,
},
expectedResult: "scope.attributes.`custom.attr`::String",
expectedError: nil,
},
{
// Query like `attribute.attribute_string:string` should resolve to `attributes_string['attribute_string']`.
name: "Attribute key whose name collides with contextual map column resolves as a map lookup",

View File

@@ -370,6 +370,94 @@ func TestStatementBuilder(t *testing.T) {
},
expectedErr: nil,
},
{
name: "scope.name filter and group by",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.TraceAggregation{
{
Expression: "count()",
},
},
Filter: &qbtypes.Filter{
Expression: "scope.name = 'opentelemetry-io'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "scope.name",
FieldContext: telemetrytypes.FieldContextScope,
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH __limit_cte AS (SELECT toString(multiIf(scope.name::String IS NOT NULL, scope.name::String, NULL)) AS `scope.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.name::String = ? AND scope.name::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `scope.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(scope.name::String IS NOT NULL, scope.name::String, NULL)) AS `scope.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.name::String = ? AND scope.name::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`scope.name`) GLOBAL IN (SELECT `scope.name` FROM __limit_cte) GROUP BY ts, `scope.name`",
Args: []any{"opentelemetry-io", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, "opentelemetry-io", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
},
expectedErr: nil,
},
{
name: "scope.version filter with scope.name group by",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.TraceAggregation{
{
Expression: "count()",
},
},
Filter: &qbtypes.Filter{
Expression: "scope.version = '1.0.0'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "scope.name",
FieldContext: telemetrytypes.FieldContextScope,
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH __limit_cte AS (SELECT toString(multiIf(scope.name::String IS NOT NULL, scope.name::String, NULL)) AS `scope.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.version::String = ? AND scope.version::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `scope.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(scope.name::String IS NOT NULL, scope.name::String, NULL)) AS `scope.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.version::String = ? AND scope.version::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`scope.name`) GLOBAL IN (SELECT `scope.name` FROM __limit_cte) GROUP BY ts, `scope.name`",
Args: []any{"1.0.0", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, "1.0.0", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
},
expectedErr: nil,
},
{
name: "scope.version filter only (no scope field in group by)",
requestType: qbtypes.RequestTypeTimeSeries,
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Aggregations: []qbtypes.TraceAggregation{
{
Expression: "count()",
},
},
Filter: &qbtypes.Filter{
Expression: "scope.version = '1.0.0'",
},
Limit: 10,
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
},
},
},
},
expected: qbtypes.Statement{
Query: "WITH __limit_cte AS (SELECT toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.version::String = ? AND scope.version::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY `service.name` ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(timestamp, INTERVAL 30 SECOND) AS ts, toString(multiIf(multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL), NULL)) AS `service.name`, count() AS __result_0 FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.version::String = ? AND scope.version::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) GLOBAL IN (SELECT `service.name` FROM __limit_cte) GROUP BY ts, `service.name`",
Args: []any{"1.0.0", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, "1.0.0", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)},
},
},
}
fm := NewFieldMapper()
@@ -793,6 +881,32 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
},
expectedErr: nil,
},
{
name: "List query with scope filter only (no scope in select or group by)",
requestType: qbtypes.RequestTypeRaw,
keysMap: map[string][]*telemetrytypes.TelemetryFieldKey{
"scope.version": {
{
Name: "scope.version",
Signal: telemetrytypes.SignalTraces,
FieldContext: telemetrytypes.FieldContextScope,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
},
query: qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]{
Signal: telemetrytypes.SignalTraces,
StepInterval: qbtypes.Step{Duration: 30 * time.Second},
Filter: &qbtypes.Filter{
Expression: "scope.version = '1.0.0'",
},
Limit: 10,
},
expected: qbtypes.Statement{
Query: "SELECT timestamp AS `timestamp`, trace_id AS `trace_id`, span_id AS `span_id`, trace_state AS `trace_state`, parent_span_id AS `parent_span_id`, flags AS `flags`, name AS `name`, kind AS `kind`, kind_string AS `kind_string`, duration_nano AS `duration_nano`, status_code AS `status_code`, status_message AS `status_message`, status_code_string AS `status_code_string`, events AS `events`, links AS `links`, response_status_code AS `response_status_code`, external_http_url AS `external_http_url`, http_url AS `http_url`, external_http_method AS `external_http_method`, http_method AS `http_method`, http_host AS `http_host`, db_name AS `db_name`, db_operation AS `db_operation`, has_error AS `has_error`, is_remote AS `is_remote`, attributes_string, attributes_number, attributes_bool, resources_string FROM signoz_traces.distributed_signoz_index_v3 WHERE (scope.version::String = ? AND scope.version::String IS NOT NULL) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
Args: []any{"1.0.0", "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
},
},
}
for _, c := range cases {

View File

@@ -113,6 +113,20 @@ func buildCompleteFieldKeyMap(releaseTime time.Time) map[string][]*telemetrytype
FieldDataType: telemetrytypes.FieldDataTypeBool,
},
},
"scope.name": {
{
Name: "scope.name",
FieldContext: telemetrytypes.FieldContextScope,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
"scope.version": {
{
Name: "scope.version",
FieldContext: telemetrytypes.FieldContextScope,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
},
}
for _, keys := range keysMap {
for _, key := range keys {

View File

@@ -2,6 +2,10 @@ package authtypes
import (
"encoding/json"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
)
type AttributeMapping struct {
@@ -47,95 +51,83 @@ func (attr *AttributeMapping) UnmarshalJSON(data []byte) error {
}
type RoleMapping struct {
// Default role assigned to new SSO users when no group mapping applies.
// Default role any new SSO users. Defaults to "VIEWER"
DefaultRole string `json:"defaultRole"`
// Map of IDP group name to SigNoz role name.
// Map of IDP group names to SigNoz roles. Key is group name, value is SigNoz role
GroupMappings map[string]string `json:"groupMappings"`
// If true, use the role claim directly from IDP instead of group mappings.
// If true, use the role claim directly from IDP instead of group mappings
UseRoleAttribute bool `json:"useRoleAttribute"`
}
func (roleMapping *RoleMapping) UnmarshalJSON(data []byte) error {
type alias RoleMapping
func (typ *RoleMapping) UnmarshalJSON(data []byte) error {
type Alias RoleMapping
var temp alias
var temp Alias
if err := json.Unmarshal(data, &temp); err != nil {
return err
}
temp.DefaultRole = NormalizeRoleName(temp.DefaultRole)
for group, role := range temp.GroupMappings {
temp.GroupMappings[group] = NormalizeRoleName(role)
if temp.DefaultRole != "" {
if _, err := types.NewRole(strings.ToUpper(temp.DefaultRole)); err != nil {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid default role %s", temp.DefaultRole)
}
}
*roleMapping = RoleMapping(temp)
for group, role := range temp.GroupMappings {
if _, err := types.NewRole(strings.ToUpper(role)); err != nil {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid role %s for group %s", role, group)
}
}
*typ = RoleMapping(temp)
return nil
}
func (roleMapping *RoleMapping) NewRolesFromCallbackIdentity(callbackIdentity *CallbackIdentity, roleAttributeExists bool) []string {
func (roleMapping *RoleMapping) NewRoleFromCallbackIdentity(callbackIdentity *CallbackIdentity) types.Role {
if roleMapping == nil {
return []string{SigNozViewerRoleName}
return types.RoleViewer
}
if roleAttributeExists {
return []string{NormalizeRoleName(callbackIdentity.Role)}
if roleMapping.UseRoleAttribute && callbackIdentity.Role != "" {
if role, err := types.NewRole(strings.ToUpper(callbackIdentity.Role)); err == nil {
return role
}
}
if len(roleMapping.GroupMappings) > 0 && len(callbackIdentity.Groups) > 0 {
roleNames := make([]string, 0)
seen := make(map[string]struct{})
highestRole := types.RoleViewer
found := false
for _, group := range callbackIdentity.Groups {
roleName, exists := roleMapping.GroupMappings[group]
if !exists {
continue
if mappedRole, exists := roleMapping.GroupMappings[group]; exists {
found = true
if role, err := types.NewRole(strings.ToUpper(mappedRole)); err == nil {
if compareRoles(role, highestRole) > 0 {
highestRole = role
}
}
}
if _, duplicate := seen[roleName]; duplicate {
continue
}
seen[roleName] = struct{}{}
roleNames = append(roleNames, roleName)
}
if len(roleNames) > 0 {
return roleNames
if found {
return highestRole
}
}
return []string{roleMapping.DefaultRoleName()}
}
func (roleMapping *RoleMapping) DefaultRoleName() string {
if roleMapping.DefaultRole != "" {
return roleMapping.DefaultRole
}
return SigNozViewerRoleName
}
func (roleMapping *RoleMapping) RoleNames() []string {
if roleMapping == nil {
return nil
}
seen := make(map[string]struct{})
roleNames := make([]string, 0, len(roleMapping.GroupMappings)+1)
if roleMapping.DefaultRole != "" {
seen[roleMapping.DefaultRole] = struct{}{}
roleNames = append(roleNames, roleMapping.DefaultRole)
if role, err := types.NewRole(strings.ToUpper(roleMapping.DefaultRole)); err == nil {
return role
}
}
for _, roleName := range roleMapping.GroupMappings {
if roleName == "" {
continue
}
if _, duplicate := seen[roleName]; duplicate {
continue
}
seen[roleName] = struct{}{}
roleNames = append(roleNames, roleName)
}
return roleNames
return types.RoleViewer
}
func compareRoles(a, b types.Role) int {
order := map[types.Role]int{
types.RoleViewer: 0,
types.RoleEditor: 1,
types.RoleAdmin: 2,
}
return order[a] - order[b]
}

View File

@@ -25,7 +25,6 @@ var (
ErrCodeRoleUnsupported = errors.MustNewCode("role_unsupported")
ErrCodeRoleHasUserAssignees = errors.MustNewCode("role_has_user_assignees")
ErrCodeRoleHasServiceAccountAssignees = errors.MustNewCode("role_has_service_account_assignees")
ErrCodeRoleHasAuthDomainMappings = errors.MustNewCode("role_has_auth_domain_mappings")
)
var (
@@ -299,20 +298,6 @@ func MustGetSigNozManagedRoleFromExistingRole(role types.Role) string {
return managedRole
}
func NormalizeRoleName(role string) string {
legacyRole, err := types.NewRole(strings.ToUpper(role))
if err != nil {
return role
}
managedRole, ok := ExistingRoleToSigNozManagedRoleMap[legacyRole]
if !ok {
return role
}
return managedRole
}
type RoleStore interface {
Create(context.Context, *Role) error
Get(context.Context, valuer.UUID, valuer.UUID) (*Role, error)

View File

@@ -480,9 +480,7 @@ type MetricAggregation struct {
// value filter to apply to the query
ValueFilter *metrictypes.MetricValueFilter `json:"-"`
// reduce to operator for metric scalar requests
ReduceTo ReduceTo `json:"reduceTo,omitzero"`
Reduced bool `json:"-"`
ReduceTo ReduceTo `json:"reduceTo,omitempty"`
}
// Copy creates a deep copy of MetricAggregation.

View File

@@ -4,7 +4,6 @@ import (
"context"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MetadataStore is the interface for the telemetry metadata store.
@@ -27,12 +26,12 @@ type MetadataStore interface {
GetAllValues(ctx context.Context, fieldValueSelector *FieldValueSelector) (*TelemetryFieldValues, bool, error)
// FetchTemporality fetches the temporality for metric
FetchTemporality(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error)
FetchTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error)
// FetchTemporalityMulti fetches the temporality for multiple metrics
FetchTemporalityMulti(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error)
FetchTemporalityMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error)
FetchTemporalityAndTypeMulti(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, map[string]bool, error)
FetchTemporalityAndTypeMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error)
// ListLogsJSONIndexes lists the JSON indexes for the logs table.
ListLogsJSONIndexes(ctx context.Context, filters ...string) ([]TelemetryFieldKeySkipIndex, error)

View File

@@ -6,7 +6,6 @@ import (
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// MockMetadataStore implements the MetadataStore interface for testing purposes.
@@ -17,7 +16,6 @@ type MockMetadataStore struct {
AllValuesMap map[string]*telemetrytypes.TelemetryFieldValues
TemporalityMap map[string]metrictypes.Temporality
TypeMap map[string]metrictypes.Type
ReducedMap map[string]bool
PromotedPathsMap map[string]bool
LogsJSONIndexes []telemetrytypes.TelemetryFieldKeySkipIndex
ColumnEvolutionMetadataMap map[string][]*telemetrytypes.EvolutionEntry
@@ -308,7 +306,7 @@ func (m *MockMetadataStore) SetAllValues(lookupKey string, values *telemetrytype
}
// FetchTemporality fetches the temporality for a metric.
func (m *MockMetadataStore) FetchTemporality(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error) {
func (m *MockMetadataStore) FetchTemporality(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricName string) (metrictypes.Temporality, error) {
if temporality, exists := m.TemporalityMap[metricName]; exists {
return temporality, nil
}
@@ -316,7 +314,7 @@ func (m *MockMetadataStore) FetchTemporality(ctx context.Context, orgID valuer.U
}
// FetchTemporalityMulti fetches the temporality for multiple metrics.
func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error) {
func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, error) {
result := make(map[string]metrictypes.Temporality)
for _, metricName := range metricNames {
@@ -331,10 +329,9 @@ func (m *MockMetadataStore) FetchTemporalityMulti(ctx context.Context, orgID val
}
// FetchTemporalityMulti fetches the temporality for multiple metrics.
func (m *MockMetadataStore) FetchTemporalityAndTypeMulti(ctx context.Context, orgID valuer.UUID, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, map[string]bool, error) {
func (m *MockMetadataStore) FetchTemporalityAndTypeMulti(ctx context.Context, queryTimeRangeStartTs, queryTimeRangeEndTs uint64, metricNames ...string) (map[string]metrictypes.Temporality, map[string]metrictypes.Type, error) {
temporalities := make(map[string]metrictypes.Temporality)
types := make(map[string]metrictypes.Type)
reduced := make(map[string]bool)
for _, metricName := range metricNames {
if temporality, exists := m.TemporalityMap[metricName]; exists {
@@ -347,12 +344,9 @@ func (m *MockMetadataStore) FetchTemporalityAndTypeMulti(ctx context.Context, or
} else {
types[metricName] = metrictypes.UnspecifiedType
}
if m.ReducedMap[metricName] {
reduced[metricName] = true
}
}
return temporalities, types, reduced, nil
return temporalities, types, nil
}
// SetTemporality sets the temporality for a metric in the mock store.

View File

@@ -862,6 +862,8 @@ def generate_traces_with_corrupt_metadata() -> list[Traces]:
"cloud.provider": "integration",
"cloud.account.id": "000",
"trace_id": "corrupt_data",
"scope_name": "corrupt_data",
"scope.scope.name": "corrupt_data",
},
attributes={
"net.transport": "IP.TCP",
@@ -870,7 +872,10 @@ def generate_traces_with_corrupt_metadata() -> list[Traces]:
"http.request.method": "POST",
"http.response.status_code": "200",
"timestamp": "corrupt_data",
"version": "1.0.0",
"scope.scope.version": "1.0.0",
},
scope={"name": "io.signoz.http.server", "version": "2.0.0"},
),
Traces(
timestamp=now - timedelta(seconds=3.5),
@@ -890,12 +895,24 @@ def generate_traces_with_corrupt_metadata() -> list[Traces]:
"cloud.provider": "integration",
"cloud.account.id": "000",
"timestamp": "corrupt_data",
"scope.attributes.name": "corrupt_data",
},
attributes={
"db.name": "integration",
"db.operation": "SELECT",
"db.statement": "SELECT * FROM integration",
"trace_d": "corrupt_data",
"scope.attributes.version": "corrupt_data",
},
scope={
"name": "io.opentelemetry.contrib.http",
"version": "1.0.0",
"attributes": {
"telemetry.sdk.language": "cpp",
"name": "not-the-real-name",
"version": "not-the-real-version",
"attributes": "literally-a-key-named-attributes",
},
},
),
Traces(
@@ -916,12 +933,15 @@ def generate_traces_with_corrupt_metadata() -> list[Traces]:
"cloud.provider": "integration",
"cloud.account.id": "000",
"duration_nano": "corrupt_data",
"scope.scope.attributes.version": "corrupt_data",
},
attributes={
"http.request.method": "PATCH",
"http.status_code": "404",
"id": "1",
"scope.scope.version": "corrupt_data",
},
scope={"name": "io.signoz.http.client", "version": "2.0.0"},
),
Traces(
timestamp=now - timedelta(seconds=1),
@@ -940,6 +960,7 @@ def generate_traces_with_corrupt_metadata() -> list[Traces]:
"host.name": "linux-001",
"cloud.provider": "integration",
"cloud.account.id": "001",
"scope.scope.version": "corrupt_data",
},
attributes={
"message.type": "SENT",
@@ -947,6 +968,9 @@ def generate_traces_with_corrupt_metadata() -> list[Traces]:
"messaging.message.id": "001",
"duration_nano": "corrupt_data",
"id": 1,
"scope": "corrupt_data",
"scope.attributes.name": "corrupt_data",
},
scope={"name": "io.signoz.messaging", "version": "3.0.0"},
),
]

View File

@@ -286,6 +286,7 @@ class Traces(ABC):
db_operation: str
has_error: bool
is_remote: str
scope_json: dict[str, Any]
resource: list[TracesResource]
tag_attributes: list[TracesTagAttributes]
@@ -311,6 +312,7 @@ class Traces(ABC):
links: list[TracesLink] = [],
trace_state: str = "",
flags: np.uint32 = 0,
scope: dict[str, Any] = {},
resource_write_mode: Literal["legacy_only", "dual_write"] = "dual_write",
) -> None:
if timestamp is None:
@@ -392,6 +394,35 @@ class Traces(ABC):
# Calculate resource fingerprint
self.resource_fingerprint = LogsOrTracesFingerprint(self.resources_string).calculate()
# Process scope mirroring the InstrumentationScope on the OTLP span.
scope_name = scope.get("name", "")
scope_version = scope.get("version", "")
scope_string = {k: str(v) for k, v in scope.get("attributes", {}).items()}
self.scope_json = {
"name": scope_name,
"version": scope_version,
"attributes": scope_string,
}
scope_keys = {"scope.name": scope_name, "scope.version": scope_version}
scope_keys.update(scope_string)
for k, v in scope_keys.items():
if v == "":
continue
self.tag_attributes.append(
TracesTagAttributes(
timestamp=timestamp,
tag_key=k,
tag_type="scope",
tag_data_type="string",
string_value=v,
number_value=None,
)
)
self.attribute_keys.append(
TracesResourceOrAttributeKeys(name=k, datatype="string", tag_type="scope")
)
# Process attributes by type and populate custom fields
self.attribute_string = {}
self.attributes_number = {}
@@ -644,6 +675,7 @@ class Traces(ABC):
self.has_error,
self.is_remote,
self.resource_json,
self.scope_json,
],
dtype=object,
)
@@ -675,6 +707,7 @@ class Traces(ABC):
attributes=data.get("attributes", {}),
trace_state=data.get("trace_state", ""),
flags=data.get("flags", 0),
scope=data.get("scope", {}),
)
@classmethod
@@ -814,6 +847,7 @@ def insert_traces_to_clickhouse(conn, traces: list[Traces]) -> None:
"has_error",
"is_remote",
"resource",
"scope",
],
data=[trace.np_arr() for trace in traces],
)

View File

@@ -709,6 +709,26 @@ def test_traces_list(
x[1].trace_id,
], # type: Callable[[List[Traces]], List[Any]]
),
# Case 9: filter on the intrinsic scope.version. Only x[1] should match
pytest.param(
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"disabled": False,
"selectFields": [{"name": "timestamp"}],
"filter": {"expression": "scope.version = '1.0.0'"},
"limit": 1,
},
},
HTTPStatus.OK,
lambda x: [
x[1].span_id,
format_timestamp(x[1].timestamp),
x[1].trace_id,
], # type: Callable[[List[Traces]], List[Any]]
),
],
)
def test_traces_list_with_corrupt_data(
@@ -755,6 +775,153 @@ def test_traces_list_with_corrupt_data(
assert data[key] == value
@pytest.mark.parametrize(
"filter_expression,expected_indices",
[
# Intrinsic scope.name / scope.version resolve to the JSON sub-columns.
pytest.param("scope.name = 'io.signoz.payment'", [1]),
pytest.param("scope.version = '2.3.1'", [0]),
# A scope attribute resolves against the scope JSON column's attributes.
pytest.param("scope.telemetry.sdk.language = 'python'", [1]),
# `env.tier` is a span attribute on span 0 and a scope attribute on
# span 1. Unprefixed -> no explicit context, so it is checked in every
# applicable context (attribute OR scope) and both spans match.
pytest.param("env.tier = 'gold'", [0, 1]),
# The explicit `scope.` prefix forces scope context only, so span 0's
# span attribute is ignored — only span 1 matches.
pytest.param("scope.env.tier = 'gold'", [1]),
# `scope.name` matches BOTH the intrinsic scope.name field (span 0) and a
# scope attribute literally named `name` (span 1's scope attribute
# name='io.signoz.checkout').
pytest.param("scope.name = 'io.signoz.checkout'", [0, 1]),
# `scope.name` also matches a span attribute literally named `scope.name`
# (attribute context) — span 2 carries attribute scope.name='attr-scope-name'.
pytest.param("scope.name = 'attr-scope-name'", [2]),
# An unprefixed `name` resolves to the intrinsic span `name` column and a
# `name` scope attribute, but NOT the scope.name field. Span 2's span
# name and span 1's scope attribute `name` both equal 'io.signoz.checkout';
# span 0's scope.name field equals it too but is NOT matched.
pytest.param("name = 'io.signoz.checkout'", [1, 2]),
# A value that no resolvable key holds (scope.name/scope.version field,
# a `name`/`version` scope attribute, or a same-named attribute/resource)
# returns nothing.
pytest.param("scope.version = 'corrupt_data'", []),
pytest.param("scope.name = 'corrupt_data'", []),
],
)
def test_traces_list_with_scope_filter(
signoz: types.SigNoz,
create_user_admin: None, # pylint: disable=unused-argument
get_token: Callable[[str, str], str],
insert_traces: Callable[[list[Traces]], None],
filter_expression: str,
expected_indices: list[int],
) -> None:
"""
Setup three spans that have different scope key resolution.
Tests:
- Filtering on scope.name / scope.version / a scope attribute.
- An unprefixed key is resolved across contexts (scope checked alongside
attribute / intrinsic), while a `scope.`-prefixed key is scope-only.
- `scope.name` hits the intrinsic field, a `name` scope attribute, and a
span attribute `scope.name` (cross-context), while a bare
`name` hits the span name column (and a `name` scope attribute) but never
the scope.name field.
"""
trace_id = TraceIdGenerator.trace_id()
span_ids = [TraceIdGenerator.span_id() for _ in range(3)]
now = datetime.now(tz=UTC).replace(second=0, microsecond=0)
traces = [
Traces(
timestamp=now - timedelta(seconds=4),
duration=timedelta(seconds=2),
trace_id=trace_id,
span_id=span_ids[0],
parent_span_id="",
name="GET /checkout",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
resources={"service.name": "checkout"},
attributes={"http.request.method": "GET", "env.tier": "gold"},
scope={
"name": "io.signoz.checkout",
"version": "2.3.1",
"attributes": {"telemetry.sdk.language": "go"},
},
),
Traces(
timestamp=now - timedelta(seconds=2),
duration=timedelta(seconds=1),
trace_id=trace_id,
span_id=span_ids[1],
parent_span_id="",
name="POST /pay",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
resources={"service.name": "payment"},
attributes={"http.request.method": "POST"},
# env.tier is a scope attribute here (cross-context with span 0);
# `name` is a scope attribute colliding with span 0's scope.name.
scope={
"name": "io.signoz.payment",
"version": "4.5.6",
"attributes": {
"telemetry.sdk.language": "python",
"env.tier": "gold",
"name": "io.signoz.checkout",
},
},
),
Traces(
timestamp=now - timedelta(seconds=1),
duration=timedelta(seconds=1),
trace_id=trace_id,
span_id=span_ids[2],
parent_span_id="",
# span name collides with span 0's scope.name value
name="io.signoz.checkout",
kind=TracesKind.SPAN_KIND_SERVER,
status_code=TracesStatusCode.STATUS_CODE_OK,
resources={"service.name": "probe"},
# a span attribute named `scope.name`
attributes={"scope.name": "attr-scope-name"},
scope={"name": "span-gamma", "version": "9.9.9"},
),
]
insert_traces(traces)
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
response = make_query_request(
signoz,
token,
start_ms=int((datetime.now(tz=UTC) - timedelta(minutes=5)).timestamp() * 1000),
end_ms=int(datetime.now(tz=UTC).timestamp() * 1000),
request_type="raw",
queries=[
{
"type": "builder_query",
"spec": {
"name": "A",
"signal": "traces",
"disabled": False,
"selectFields": [{"name": "timestamp"}],
"filter": {"expression": filter_expression},
"limit": 10,
},
}
],
)
assert response.status_code == HTTPStatus.OK
rows = response.json()["data"]["data"]["results"][0]["rows"] or []
got_span_ids = {row["data"]["span_id"] for row in rows}
expected_span_ids = {traces[i].span_id for i in expected_indices}
assert got_span_ids == expected_span_ids
def _verify_events_links_full(rows: list[dict], traces: list[Traces]) -> None:
"""Empty-selectFields case: events/links arrive parsed into structured objects.
Every row's events/links should match the fixture's stored parsed shape