Compare commits

...

11 Commits

Author SHA1 Message Date
Swapnil Nakade
10b55bb00f Merge branch 'main' into refactor/update-cloud-integration-version 2026-01-08 19:57:01 +05:30
Amlan Kumar Nandy
e9afbede24 chore: add fillZero function to query builder functions list (#9651) 2026-01-08 21:19:04 +07:00
swapnil-signoz
479cba7dd2 chore: update cloud integration agent version to v0.0.8 (#9956) 2026-01-08 12:55:52 +00:00
Vikrant Gupta
5449374ad8 fix(alertmanager): remove ambiguous reference to org_id (#9955)
* fix(alertmanager): remove ambiguous reference to org_id

* fix(alertmanager): remove ambiguous reference to org_id
2026-01-08 16:23:36 +05:30
Swapnil Nakade
0f6cce2c24 chore: update cloud integration agent version to v0.0.8 2026-01-08 16:10:23 +05:30
Abhishek Kumar Singh
68b9cc2b81 fix: test alert should open valid link with more info (#9896) 2026-01-08 15:48:05 +05:30
Srikanth Chekuri
0f62a04f92 chore: add query step intervals in response meta (#9954)
* chore: add query step intervals in response meta

* chore: regenerate api spec
2026-01-08 15:04:55 +05:30
Piyush Singariya
b4706743ba chore: JSON Logs Query Experience (#9381) 2026-01-07 20:28:03 +00:00
Nikhil Mantri
1eba57b250 chore: add Open API spec defs for metrics explorer (#9934) 2026-01-08 01:48:28 +05:30
Jatinderjit Singh
c9cbc8d9ad chore: ignore logs for context.Canceled errors (#9945) 2026-01-08 01:25:41 +05:30
Pandey
23ba9dacd1 fix(alertmanager): disallow creating invalid channels (#9946) 2026-01-08 00:13:46 +05:30
66 changed files with 2861 additions and 317 deletions

View File

@@ -51,6 +51,7 @@ jobs:
- sqlite
clickhouse-version:
- 25.5.6
- 25.10.1
schema-migrator-version:
- v0.129.7
postgres-version:

View File

@@ -1,6 +1,10 @@
package cmd
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"go.uber.org/zap" //nolint:depguard
"go.uber.org/zap/zapcore" //nolint:depguard
)
@@ -10,6 +14,97 @@ func newZapLogger() *zap.Logger {
config := zap.NewProductionConfig()
config.EncoderConfig.TimeKey = "timestamp"
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
// Extract sampling config before building the logger.
// We need to disable sampling in the config and apply it manually later
// to ensure correct core ordering. See filteringCore documentation for details.
samplerConfig := config.Sampling
config.Sampling = nil
logger, _ := config.Build()
return logger
// Wrap with custom core wrapping to filter certain log entries.
// The order of wrapping is important:
// 1. First wrap with filteringCore
// 2. Then wrap with sampler
//
// This creates the call chain: sampler -> filteringCore -> ioCore
//
// During logging:
// - sampler.Check decides whether to sample the log entry
// - If sampled, filteringCore.Check is called
// - filteringCore adds itself to CheckedEntry.cores
// - All cores in CheckedEntry.cores have their Write method called
// - filteringCore.Write can now filter the entry before passing to ioCore
//
// If we didn't disable the sampler above, filteringCore would have wrapped
// sampler. By calling sampler.Check we would have allowed it to call
// ioCore.Check that adds itself to CheckedEntry.cores. Then ioCore.Write
// would have bypassed our checks, making filtering impossible.
return logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
core = &filteringCore{core}
if samplerConfig != nil {
core = zapcore.NewSamplerWithOptions(
core,
time.Second,
samplerConfig.Initial,
samplerConfig.Thereafter,
)
}
return core
}))
}
// filteringCore wraps a zapcore.Core to filter out log entries based on a
// custom logic.
//
// Note: This core must be positioned before the sampler in the core chain
// to ensure Write is called. See newZapLogger for ordering details.
type filteringCore struct {
zapcore.Core
}
// filter determines whether a log entry should be written based on its fields.
// Returns false if the entry should be suppressed, true otherwise.
//
// Current filters:
// - context.Canceled: These are expected errors from cancelled operations,
// and create noise in logs.
func (c *filteringCore) filter(fields []zapcore.Field) bool {
for _, field := range fields {
if field.Type == zapcore.ErrorType {
if loggedErr, ok := field.Interface.(error); ok {
// Suppress logs containing context.Canceled errors
if errors.Is(loggedErr, context.Canceled) {
return false
}
}
}
}
return true
}
// With implements zapcore.Core.With
// It returns a new copy with the added context.
func (c *filteringCore) With(fields []zapcore.Field) zapcore.Core {
return &filteringCore{c.Core.With(fields)}
}
// Check implements zapcore.Core.Check.
// It adds this core to the CheckedEntry if the log level is enabled,
// ensuring that Write will be called for this entry.
func (c *filteringCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if c.Enabled(ent.Level) {
return ce.AddCore(ent, c)
}
return ce
}
// Write implements zapcore.Core.Write.
// It filters log entries based on their fields before delegating to the wrapped core.
func (c *filteringCore) Write(ent zapcore.Entry, fields []zapcore.Field) error {
if !c.filter(fields) {
return nil
}
return c.Core.Write(ent, fields)
}

View File

@@ -2067,6 +2067,431 @@ paths:
summary: Get features
tags:
- features
/api/v2/metric/alerts:
get:
deprecated: false
description: This endpoint returns associated alerts for a specified metric
operationId: GetMetricAlerts
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/MetricsexplorertypesMetricAlertsResponse'
status:
type: string
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Get metric alerts
tags:
- metrics
/api/v2/metric/dashboards:
get:
deprecated: false
description: This endpoint returns associated dashboards for a specified metric
operationId: GetMetricDashboards
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/MetricsexplorertypesMetricDashboardsResponse'
status:
type: string
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Get metric dashboards
tags:
- metrics
/api/v2/metric/highlights:
get:
deprecated: false
description: This endpoint returns highlights like number of datapoints, totaltimeseries,
active time series, last received time for a specified metric
operationId: GetMetricHighlights
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/MetricsexplorertypesMetricHighlightsResponse'
status:
type: string
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Get metric highlights
tags:
- metrics
/api/v2/metrics/{metric_name}/metadata:
post:
deprecated: false
description: This endpoint helps to update metadata information like metric
description, unit, type, temporality, monotonicity for a specified metric
operationId: UpdateMetricMetadata
parameters:
- in: path
name: metric_name
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/MetricsexplorertypesUpdateMetricMetadataRequest'
responses:
"200":
content:
application/json:
schema:
type: string
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- EDITOR
- tokenizer:
- EDITOR
summary: Update metric metadata
tags:
- metrics
/api/v2/metrics/attributes:
post:
deprecated: false
description: This endpoint returns attribute keys and their unique values for
a specified metric
operationId: GetMetricAttributes
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/MetricsexplorertypesMetricAttributesRequest'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/MetricsexplorertypesMetricAttributesResponse'
status:
type: string
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Get metric attributes
tags:
- metrics
/api/v2/metrics/metadata:
get:
deprecated: false
description: This endpoint returns metadata information like metric description,
unit, type, temporality, monotonicity for a specified metric
operationId: GetMetricMetadata
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/MetricsexplorertypesMetricMetadata'
status:
type: string
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"404":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Not Found
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Get metric metadata
tags:
- metrics
/api/v2/metrics/stats:
post:
deprecated: false
description: This endpoint provides list of metrics with their number of samples
and timeseries for the given time range
operationId: GetMetricsStats
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/MetricsexplorertypesStatsRequest'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/MetricsexplorertypesStatsResponse'
status:
type: string
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Get metrics statistics
tags:
- metrics
/api/v2/metrics/treemap:
post:
deprecated: false
description: This endpoint returns a treemap visualization showing the proportional
distribution of metrics by sample count or time series count
operationId: GetMetricsTreemap
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/MetricsexplorertypesTreemapRequest'
responses:
"200":
content:
application/json:
schema:
properties:
data:
$ref: '#/components/schemas/MetricsexplorertypesTreemapResponse'
status:
type: string
type: object
description: OK
"400":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Bad Request
"401":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Unauthorized
"403":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Forbidden
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/RenderErrorResponse'
description: Internal Server Error
security:
- api_key:
- VIEWER
- tokenizer:
- VIEWER
summary: Get metrics treemap
tags:
- metrics
/api/v2/orgs/me:
get:
deprecated: false
@@ -2586,6 +3011,202 @@ components:
nullable: true
type: object
type: object
MetricsexplorertypesMetricAlert:
properties:
alertId:
type: string
alertName:
type: string
type: object
MetricsexplorertypesMetricAlertsResponse:
properties:
alerts:
items:
$ref: '#/components/schemas/MetricsexplorertypesMetricAlert'
nullable: true
type: array
type: object
MetricsexplorertypesMetricAttribute:
properties:
key:
type: string
valueCount:
minimum: 0
type: integer
values:
items:
type: string
nullable: true
type: array
type: object
MetricsexplorertypesMetricAttributesRequest:
properties:
end:
nullable: true
type: integer
metricName:
type: string
start:
nullable: true
type: integer
type: object
MetricsexplorertypesMetricAttributesResponse:
properties:
attributes:
items:
$ref: '#/components/schemas/MetricsexplorertypesMetricAttribute'
nullable: true
type: array
totalKeys:
format: int64
type: integer
type: object
MetricsexplorertypesMetricDashboard:
properties:
dashboardId:
type: string
dashboardName:
type: string
widgetId:
type: string
widgetName:
type: string
type: object
MetricsexplorertypesMetricDashboardsResponse:
properties:
dashboards:
items:
$ref: '#/components/schemas/MetricsexplorertypesMetricDashboard'
nullable: true
type: array
type: object
MetricsexplorertypesMetricHighlightsResponse:
properties:
activeTimeSeries:
minimum: 0
type: integer
dataPoints:
minimum: 0
type: integer
lastReceived:
minimum: 0
type: integer
totalTimeSeries:
minimum: 0
type: integer
type: object
MetricsexplorertypesMetricMetadata:
properties:
description:
type: string
isMonotonic:
type: boolean
temporality:
type: string
type:
type: string
unit:
type: string
type: object
MetricsexplorertypesStat:
properties:
description:
type: string
metricName:
type: string
samples:
minimum: 0
type: integer
timeseries:
minimum: 0
type: integer
type:
type: string
unit:
type: string
type: object
MetricsexplorertypesStatsRequest:
properties:
end:
format: int64
type: integer
filter:
$ref: '#/components/schemas/Querybuildertypesv5Filter'
limit:
type: integer
offset:
type: integer
orderBy:
$ref: '#/components/schemas/Querybuildertypesv5OrderBy'
start:
format: int64
type: integer
type: object
MetricsexplorertypesStatsResponse:
properties:
metrics:
items:
$ref: '#/components/schemas/MetricsexplorertypesStat'
nullable: true
type: array
total:
minimum: 0
type: integer
type: object
MetricsexplorertypesTreemapEntry:
properties:
metricName:
type: string
percentage:
format: double
type: number
totalValue:
minimum: 0
type: integer
type: object
MetricsexplorertypesTreemapRequest:
properties:
end:
format: int64
type: integer
filter:
$ref: '#/components/schemas/Querybuildertypesv5Filter'
limit:
type: integer
mode:
type: string
start:
format: int64
type: integer
type: object
MetricsexplorertypesTreemapResponse:
properties:
samples:
items:
$ref: '#/components/schemas/MetricsexplorertypesTreemapEntry'
nullable: true
type: array
timeseries:
items:
$ref: '#/components/schemas/MetricsexplorertypesTreemapEntry'
nullable: true
type: array
type: object
MetricsexplorertypesUpdateMetricMetadataRequest:
properties:
description:
type: string
isMonotonic:
type: boolean
metricName:
type: string
temporality:
type: string
type:
type: string
unit:
type: string
type: object
PreferencetypesPreference:
properties:
allowedScopes:
@@ -2646,6 +3267,38 @@ components:
rowsScanned:
minimum: 0
type: integer
stepIntervals:
additionalProperties:
minimum: 0
type: integer
type: object
type: object
Querybuildertypesv5Filter:
properties:
expression:
type: string
type: object
Querybuildertypesv5OrderBy:
properties:
direction:
type: string
key:
$ref: '#/components/schemas/Querybuildertypesv5OrderByKey'
type: object
Querybuildertypesv5OrderByKey:
properties:
description:
type: string
fieldContext:
type: string
fieldDataType:
type: string
name:
type: string
signal:
type: string
unit:
type: string
type: object
Querybuildertypesv5QueryData:
properties:

View File

@@ -257,18 +257,20 @@ func (aH *APIHandler) queryRangeV5(rw http.ResponseWriter, req *http.Request) {
results = append(results, item)
}
// Build step intervals from the anomaly query
stepIntervals := make(map[string]uint64)
if anomalyQuery.StepInterval.Duration > 0 {
stepIntervals[anomalyQuery.Name] = uint64(anomalyQuery.StepInterval.Duration.Seconds())
}
finalResp := &qbtypes.QueryRangeResponse{
Type: queryRangeRequest.RequestType,
Data: struct {
Results []any `json:"results"`
}{
Data: qbtypes.QueryData{
Results: results,
},
Meta: struct {
RowsScanned uint64 `json:"rowsScanned"`
BytesScanned uint64 `json:"bytesScanned"`
DurationMS uint64 `json:"durationMs"`
}{},
Meta: qbtypes.ExecStats{
StepIntervals: stepIntervals,
},
}
render.Success(rw, http.StatusOK, finalResp)

View File

@@ -68,8 +68,8 @@ export const metricQueryFunctionOptions: SelectOption<string, string>[] = [
label: 'Time Shift',
},
{
value: QueryFunctionsTypes.TIME_SHIFT,
label: 'Time Shift',
value: QueryFunctionsTypes.FILL_ZERO,
label: 'Fill Zero',
},
];
@@ -156,4 +156,7 @@ export const queryFunctionsTypesConfig: QueryFunctionConfigType = {
showInput: true,
inputType: 'text',
},
fillZero: {
showInput: false,
},
};

View File

@@ -208,6 +208,7 @@ export enum QueryFunctionsTypes {
MEDIAN_5 = 'median5',
MEDIAN_7 = 'median7',
TIME_SHIFT = 'timeShift',
FILL_ZERO = 'fillZero',
}
export type PanelTypeKeys =

View File

@@ -59,6 +59,9 @@ describe('functionNameNormalizer', () => {
expect(normalizeFunctionName('median5')).toBe(QueryFunctionsTypes.MEDIAN_5);
expect(normalizeFunctionName('median7')).toBe(QueryFunctionsTypes.MEDIAN_7);
expect(normalizeFunctionName('anomaly')).toBe(QueryFunctionsTypes.ANOMALY);
expect(normalizeFunctionName('fillzero')).toBe(
QueryFunctionsTypes.FILL_ZERO,
);
});
});

View File

@@ -27,6 +27,7 @@ export const normalizeFunctionName = (functionName: string): string => {
median5: QueryFunctionsTypes.MEDIAN_5,
median7: QueryFunctionsTypes.MEDIAN_7,
anomaly: QueryFunctionsTypes.ANOMALY,
fillzero: QueryFunctionsTypes.FILL_ZERO,
};
// Convert to lowercase for case-insensitive matching

2
go.mod
View File

@@ -11,6 +11,7 @@ require (
github.com/SigNoz/signoz-otel-collector v0.129.10-rc.9
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/antonmedv/expr v1.15.3
github.com/bytedance/sonic v1.14.1
github.com/cespare/xxhash/v2 v2.3.0
github.com/coreos/go-oidc/v3 v3.14.1
github.com/dgraph-io/ristretto/v2 v2.3.0
@@ -89,7 +90,6 @@ require (
require (
github.com/bytedance/gopkg v0.1.3 // indirect
github.com/bytedance/sonic v1.14.1 // indirect
github.com/bytedance/sonic/loader v0.3.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect

View File

@@ -157,7 +157,7 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
return 0, err
}
return c, server.stateStore.Set(ctx, server.orgID, storableSilences)
return c, server.stateStore.Set(ctx, storableSilences)
})
}()
@@ -186,7 +186,7 @@ func New(ctx context.Context, logger *slog.Logger, registry prometheus.Registere
return 0, err
}
return c, server.stateStore.Set(ctx, server.orgID, storableNFLog)
return c, server.stateStore.Set(ctx, storableNFLog)
})
}()

View File

@@ -40,7 +40,7 @@ func (store *state) Get(ctx context.Context, orgID string) (*alertmanagertypes.S
}
// Set implements alertmanagertypes.StateStore.
func (store *state) Set(ctx context.Context, orgID string, storeableState *alertmanagertypes.StoreableState) error {
func (store *state) Set(ctx context.Context, storeableState *alertmanagertypes.StoreableState) error {
tx, err := store.sqlstore.BunDB().BeginTx(ctx, nil)
if err != nil {
return err
@@ -55,7 +55,6 @@ func (store *state) Set(ctx context.Context, orgID string, storeableState *alert
Set("silences = EXCLUDED.silences").
Set("nflog = EXCLUDED.nflog").
Set("updated_at = EXCLUDED.updated_at").
Where("org_id = ?", orgID).
Exec(ctx)
if err != nil {
return err

View File

@@ -2,9 +2,10 @@ package signozalertmanager
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/prometheus/common/model"
"time"
amConfig "github.com/prometheus/alertmanager/config"
@@ -191,7 +192,11 @@ func (provider *provider) CreateChannel(ctx context.Context, orgID string, recei
return err
}
channel := alertmanagertypes.NewChannelFromReceiver(receiver, orgID)
channel, err := alertmanagertypes.NewChannelFromReceiver(receiver, orgID)
if err != nil {
return err
}
return provider.configStore.CreateChannel(ctx, channel, alertmanagertypes.WithCb(func(ctx context.Context) error {
return provider.configStore.Set(ctx, config)
}))

View File

@@ -0,0 +1,166 @@
package signozapiserver
import (
"net/http"
"github.com/SigNoz/signoz/pkg/http/handler"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/metricsexplorertypes"
"github.com/gorilla/mux"
)
func (provider *provider) addMetricsExplorerV2Routes(router *mux.Router) error {
if err := router.Handle("/api/v2/metrics/stats", handler.New(
provider.authZ.ViewAccess(provider.metricsExplorerHandler.GetStats),
handler.OpenAPIDef{
ID: "GetMetricsStats",
Tags: []string{"metrics"},
Summary: "Get metrics statistics",
Description: "This endpoint provides list of metrics with their number of samples and timeseries for the given time range",
Request: new(metricsexplorertypes.StatsRequest),
RequestContentType: "application/json",
Response: new(metricsexplorertypes.StatsResponse),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized, http.StatusInternalServerError},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v2/metrics/treemap", handler.New(
provider.authZ.ViewAccess(provider.metricsExplorerHandler.GetTreemap),
handler.OpenAPIDef{
ID: "GetMetricsTreemap",
Tags: []string{"metrics"},
Summary: "Get metrics treemap",
Description: "This endpoint returns a treemap visualization showing the proportional distribution of metrics by sample count or time series count",
Request: new(metricsexplorertypes.TreemapRequest),
RequestContentType: "application/json",
Response: new(metricsexplorertypes.TreemapResponse),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized, http.StatusInternalServerError},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v2/metrics/attributes", handler.New(
provider.authZ.ViewAccess(provider.metricsExplorerHandler.GetMetricAttributes),
handler.OpenAPIDef{
ID: "GetMetricAttributes",
Tags: []string{"metrics"},
Summary: "Get metric attributes",
Description: "This endpoint returns attribute keys and their unique values for a specified metric",
Request: new(metricsexplorertypes.MetricAttributesRequest),
RequestContentType: "application/json",
Response: new(metricsexplorertypes.MetricAttributesResponse),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized, http.StatusInternalServerError},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v2/metrics/metadata", handler.New(
provider.authZ.ViewAccess(provider.metricsExplorerHandler.GetMetricMetadata),
handler.OpenAPIDef{
ID: "GetMetricMetadata",
Tags: []string{"metrics"},
Summary: "Get metric metadata",
Description: "This endpoint returns metadata information like metric description, unit, type, temporality, monotonicity for a specified metric",
Request: nil,
RequestContentType: "",
Response: new(metricsexplorertypes.MetricMetadata),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized, http.StatusNotFound, http.StatusInternalServerError},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodGet).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v2/metrics/{metric_name}/metadata", handler.New(
provider.authZ.EditAccess(provider.metricsExplorerHandler.UpdateMetricMetadata),
handler.OpenAPIDef{
ID: "UpdateMetricMetadata",
Tags: []string{"metrics"},
Summary: "Update metric metadata",
Description: "This endpoint helps to update metadata information like metric description, unit, type, temporality, monotonicity for a specified metric",
Request: new(metricsexplorertypes.UpdateMetricMetadataRequest),
RequestContentType: "application/json",
Response: nil,
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized, http.StatusInternalServerError},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleEditor),
})).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v2/metric/highlights", handler.New(
provider.authZ.ViewAccess(provider.metricsExplorerHandler.GetMetricHighlights),
handler.OpenAPIDef{
ID: "GetMetricHighlights",
Tags: []string{"metrics"},
Summary: "Get metric highlights",
Description: "This endpoint returns highlights like number of datapoints, totaltimeseries, active time series, last received time for a specified metric",
Request: nil,
RequestContentType: "",
Response: new(metricsexplorertypes.MetricHighlightsResponse),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized, http.StatusInternalServerError},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodGet).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v2/metric/alerts", handler.New(
provider.authZ.ViewAccess(provider.metricsExplorerHandler.GetMetricAlerts),
handler.OpenAPIDef{
ID: "GetMetricAlerts",
Tags: []string{"metrics"},
Summary: "Get metric alerts",
Description: "This endpoint returns associated alerts for a specified metric",
Request: nil,
RequestContentType: "",
Response: new(metricsexplorertypes.MetricAlertsResponse),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized, http.StatusInternalServerError},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodGet).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v2/metric/dashboards", handler.New(
provider.authZ.ViewAccess(provider.metricsExplorerHandler.GetMetricDashboards),
handler.OpenAPIDef{
ID: "GetMetricDashboards",
Tags: []string{"metrics"},
Summary: "Get metric dashboards",
Description: "This endpoint returns associated dashboards for a specified metric",
Request: nil,
RequestContentType: "",
Response: new(metricsexplorertypes.MetricDashboardsResponse),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized, http.StatusInternalServerError},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodGet).GetError(); err != nil {
return err
}
return nil
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/modules/promote"
@@ -23,20 +24,21 @@ import (
)
type provider struct {
config apiserver.Config
settings factory.ScopedProviderSettings
router *mux.Router
authZ *middleware.AuthZ
orgHandler organization.Handler
userHandler user.Handler
sessionHandler session.Handler
authDomainHandler authdomain.Handler
preferenceHandler preference.Handler
globalHandler global.Handler
promoteHandler promote.Handler
flaggerHandler flagger.Handler
dashboardModule dashboard.Module
dashboardHandler dashboard.Handler
config apiserver.Config
settings factory.ScopedProviderSettings
router *mux.Router
authZ *middleware.AuthZ
orgHandler organization.Handler
userHandler user.Handler
sessionHandler session.Handler
authDomainHandler authdomain.Handler
preferenceHandler preference.Handler
globalHandler global.Handler
promoteHandler promote.Handler
flaggerHandler flagger.Handler
dashboardModule dashboard.Module
dashboardHandler dashboard.Handler
metricsExplorerHandler metricsexplorer.Handler
}
func NewFactory(
@@ -52,9 +54,10 @@ func NewFactory(
flaggerHandler flagger.Handler,
dashboardModule dashboard.Module,
dashboardHandler dashboard.Handler,
metricsExplorerHandler metricsexplorer.Handler,
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
return newProvider(ctx, providerSettings, config, orgGetter, authz, orgHandler, userHandler, sessionHandler, authDomainHandler, preferenceHandler, globalHandler, promoteHandler, flaggerHandler, dashboardModule, dashboardHandler)
return newProvider(ctx, providerSettings, config, orgGetter, authz, orgHandler, userHandler, sessionHandler, authDomainHandler, preferenceHandler, globalHandler, promoteHandler, flaggerHandler, dashboardModule, dashboardHandler, metricsExplorerHandler)
})
}
@@ -74,24 +77,26 @@ func newProvider(
flaggerHandler flagger.Handler,
dashboardModule dashboard.Module,
dashboardHandler dashboard.Handler,
metricsExplorerHandler metricsexplorer.Handler,
) (apiserver.APIServer, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
router := mux.NewRouter().UseEncodedPath()
provider := &provider{
config: config,
settings: settings,
router: router,
orgHandler: orgHandler,
userHandler: userHandler,
sessionHandler: sessionHandler,
authDomainHandler: authDomainHandler,
preferenceHandler: preferenceHandler,
globalHandler: globalHandler,
promoteHandler: promoteHandler,
flaggerHandler: flaggerHandler,
dashboardModule: dashboardModule,
dashboardHandler: dashboardHandler,
config: config,
settings: settings,
router: router,
orgHandler: orgHandler,
userHandler: userHandler,
sessionHandler: sessionHandler,
authDomainHandler: authDomainHandler,
preferenceHandler: preferenceHandler,
globalHandler: globalHandler,
promoteHandler: promoteHandler,
flaggerHandler: flaggerHandler,
dashboardModule: dashboardModule,
dashboardHandler: dashboardHandler,
metricsExplorerHandler: metricsExplorerHandler,
}
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
@@ -144,6 +149,10 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
return err
}
if err := provider.addMetricsExplorerV2Routes(router); err != nil {
return err
}
return nil
}

View File

@@ -61,7 +61,7 @@ func (m *module) ListPromotedAndIndexedPaths(ctx context.Context) ([]promotetype
response := []promotetypes.PromotePath{}
for _, path := range promotedPaths {
fullPath := telemetrylogs.BodyPromotedColumnPrefix + path
path = telemetrylogs.BodyJSONStringSearchPrefix + path
path = telemetrytypes.BodyJSONStringSearchPrefix + path
item := promotetypes.PromotePath{
Path: path,
Promote: true,
@@ -77,7 +77,7 @@ func (m *module) ListPromotedAndIndexedPaths(ctx context.Context) ([]promotetype
// add the paths that are not promoted but have indexes
for path, indexes := range aggr {
path := strings.TrimPrefix(path, telemetrylogs.BodyJSONColumnPrefix)
path = telemetrylogs.BodyJSONStringSearchPrefix + path
path = telemetrytypes.BodyJSONStringSearchPrefix + path
response = append(response, promotetypes.PromotePath{
Path: path,
Indexes: indexes,

View File

@@ -10,9 +10,11 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/telemetrystore"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/bytedance/sonic"
)
type builderQuery[T any] struct {
@@ -248,6 +250,40 @@ func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string,
return nil, err
}
// merge body_json and promoted into body
if q.spec.Signal == telemetrytypes.SignalLogs {
switch typedPayload := payload.(type) {
case *qbtypes.RawData:
for _, rr := range typedPayload.Rows {
seeder := func() error {
body, ok := rr.Data[telemetrylogs.LogsV2BodyJSONColumn].(map[string]any)
if !ok {
return nil
}
promoted, ok := rr.Data[telemetrylogs.LogsV2BodyPromotedColumn].(map[string]any)
if !ok {
return nil
}
seed(promoted, body)
str, err := sonic.MarshalString(body)
if err != nil {
return errors.Wrapf(err, errors.TypeInternal, errors.CodeInternal, "failed to marshal body")
}
rr.Data["body"] = str
return nil
}
err := seeder()
if err != nil {
return nil, err
}
delete(rr.Data, telemetrylogs.LogsV2BodyJSONColumn)
delete(rr.Data, telemetrylogs.LogsV2BodyPromotedColumn)
}
payload = typedPayload
}
}
return &qbtypes.Result{
Type: q.kind,
Value: payload,
@@ -375,3 +411,18 @@ func decodeCursor(cur string) (int64, error) {
}
return strconv.ParseInt(string(b), 10, 64)
}
func seed(promoted map[string]any, body map[string]any) {
for key, fromValue := range promoted {
if toValue, ok := body[key]; !ok {
body[key] = fromValue
} else {
if fromValue, ok := fromValue.(map[string]any); ok {
if toValue, ok := toValue.(map[string]any); ok {
seed(fromValue, toValue)
body[key] = toValue
}
}
}
}
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/bytedance/sonic"
)
var (
@@ -51,7 +52,6 @@ func consume(rows driver.Rows, kind qbtypes.RequestType, queryWindow *qbtypes.Ti
}
func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbtypes.Step, queryName string) (*qbtypes.TimeSeriesData, error) {
colTypes := rows.ColumnTypes()
colNames := rows.Columns()
@@ -354,10 +354,22 @@ func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
colTypes := rows.ColumnTypes()
colCnt := len(colNames)
// Helper that decides scan target per column based on DB type
makeScanTarget := func(i int) any {
dbt := strings.ToUpper(colTypes[i].DatabaseTypeName())
if strings.HasPrefix(dbt, "JSON") {
// Since the driver fails to decode JSON/Dynamic into native Go values, we read it as raw bytes
// TODO: check in future if fixed in the driver
var v []byte
return &v
}
return reflect.New(colTypes[i].ScanType()).Interface()
}
// Build a template slice of correctly-typed pointers once
scanTpl := make([]any, colCnt)
for i, ct := range colTypes {
scanTpl[i] = reflect.New(ct.ScanType()).Interface()
for i := range colTypes {
scanTpl[i] = makeScanTarget(i)
}
var outRows []*qbtypes.RawRow
@@ -366,7 +378,7 @@ func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
// fresh copy of the scan slice (otherwise the driver reuses pointers)
scan := make([]any, colCnt)
for i := range scanTpl {
scan[i] = reflect.New(colTypes[i].ScanType()).Interface()
scan[i] = makeScanTarget(i)
}
if err := rows.Scan(scan...); err != nil {
@@ -383,6 +395,21 @@ func readAsRaw(rows driver.Rows, queryName string) (*qbtypes.RawData, error) {
// de-reference the typed pointer to any
val := reflect.ValueOf(cellPtr).Elem().Interface()
// Post-process JSON columns: normalize into structured values
if strings.HasPrefix(strings.ToUpper(colTypes[i].DatabaseTypeName()), "JSON") {
switch x := val.(type) {
case []byte:
if len(x) > 0 {
var v any
if err := sonic.Unmarshal(x, &v); err == nil {
val = v
}
}
default:
// already a structured type (map[string]any, []any, etc.)
}
}
// special-case: timestamp column
if name == "timestamp" || name == "timestamp_datetime" {
switch t := val.(type) {

View File

@@ -593,19 +593,31 @@ func (q *querier) run(
return nil, err
}
// attach step interval to metadata so client can make informed decisions, ex: width of the bar
// or go to related logs/traces from a point in line/bar chart with correct time range
stepIntervals := make(map[string]uint64, len(steps))
for name, step := range steps {
stepIntervals[name] = uint64(step.Duration.Seconds())
}
for _, query := range req.CompositeQuery.Queries {
if query.Type == qbtypes.QueryTypeFormula {
if formula, ok := query.Spec.(qbtypes.QueryBuilderFormula); ok {
formulaStepMs := q.calculateFormulaStep(formula.Expression, req)
stepIntervals[formula.Name] = uint64(formulaStepMs / 1000) // convert ms to seconds
}
}
}
resp := &qbtypes.QueryRangeResponse{
Type: req.RequestType,
Data: qbtypes.QueryData{
Results: maps.Values(processedResults),
},
Meta: struct {
RowsScanned uint64 `json:"rowsScanned"`
BytesScanned uint64 `json:"bytesScanned"`
DurationMS uint64 `json:"durationMs"`
}{
RowsScanned: stats.RowsScanned,
BytesScanned: stats.BytesScanned,
DurationMS: stats.DurationMS,
Meta: qbtypes.ExecStats{
RowsScanned: stats.RowsScanned,
BytesScanned: stats.BytesScanned,
DurationMS: stats.DurationMS,
StepIntervals: stepIntervals,
},
}

View File

@@ -81,7 +81,7 @@ func newProvider(
telemetryMetadataStore,
)
traceAggExprRewriter := querybuilder.NewAggExprRewriter(settings, nil, traceFieldMapper, traceConditionBuilder, "", nil)
traceAggExprRewriter := querybuilder.NewAggExprRewriter(settings, nil, traceFieldMapper, traceConditionBuilder, nil)
traceStmtBuilder := telemetrytraces.NewTraceQueryStatementBuilder(
settings,
telemetryMetadataStore,
@@ -105,14 +105,13 @@ func newProvider(
// Create log statement builder
logFieldMapper := telemetrylogs.NewFieldMapper()
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper)
logConditionBuilder := telemetrylogs.NewConditionBuilder(logFieldMapper, telemetryMetadataStore)
logResourceFilterStmtBuilder := resourcefilter.NewLogResourceFilterStatementBuilder(
settings,
resourceFilterFieldMapper,
resourceFilterConditionBuilder,
telemetryMetadataStore,
telemetrylogs.DefaultFullTextColumn,
telemetrylogs.BodyJSONStringSearchPrefix,
telemetrylogs.GetBodyJSONKey,
)
logAggExprRewriter := querybuilder.NewAggExprRewriter(
@@ -120,7 +119,6 @@ func newProvider(
telemetrylogs.DefaultFullTextColumn,
logFieldMapper,
logConditionBuilder,
telemetrylogs.BodyJSONStringSearchPrefix,
telemetrylogs.GetBodyJSONKey,
)
logStmtBuilder := telemetrylogs.NewLogQueryStatementBuilder(
@@ -131,7 +129,6 @@ func newProvider(
logResourceFilterStmtBuilder,
logAggExprRewriter,
telemetrylogs.DefaultFullTextColumn,
telemetrylogs.BodyJSONStringSearchPrefix,
telemetrylogs.GetBodyJSONKey,
)

View File

@@ -116,7 +116,7 @@ func (c *Controller) GenerateConnectionUrl(ctx context.Context, orgId string, cl
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}
agentVersion := "v0.0.7"
agentVersion := "v0.0.8"
if req.AgentConfig.Version != "" {
agentVersion = req.AgentConfig.Version
}

View File

@@ -626,15 +626,6 @@ func (ah *APIHandler) MetricExplorerRoutes(router *mux.Router, am *middleware.Au
router.HandleFunc("/api/v1/metrics/{metric_name}/metadata",
am.ViewAccess(ah.UpdateMetricsMetadata)).
Methods(http.MethodPost)
// v2 endpoints
router.HandleFunc("/api/v2/metrics/stats", am.ViewAccess(ah.Signoz.Handlers.MetricsExplorer.GetStats)).Methods(http.MethodPost)
router.HandleFunc("/api/v2/metrics/treemap", am.ViewAccess(ah.Signoz.Handlers.MetricsExplorer.GetTreemap)).Methods(http.MethodPost)
router.HandleFunc("/api/v2/metrics/attributes", am.ViewAccess(ah.Signoz.Handlers.MetricsExplorer.GetMetricAttributes)).Methods(http.MethodPost)
router.HandleFunc("/api/v2/metrics/metadata", am.ViewAccess(ah.Signoz.Handlers.MetricsExplorer.GetMetricMetadata)).Methods(http.MethodGet)
router.HandleFunc("/api/v2/metrics/{metric_name}/metadata", am.EditAccess(ah.Signoz.Handlers.MetricsExplorer.UpdateMetricMetadata)).Methods(http.MethodPost)
router.HandleFunc("/api/v2/metric/highlights", am.ViewAccess(ah.Signoz.Handlers.MetricsExplorer.GetMetricHighlights)).Methods(http.MethodGet)
router.HandleFunc("/api/v2/metric/alerts", am.ViewAccess(ah.Signoz.Handlers.MetricsExplorer.GetMetricAlerts)).Methods(http.MethodGet)
router.HandleFunc("/api/v2/metric/dashboards", am.ViewAccess(ah.Signoz.Handlers.MetricsExplorer.GetMetricDashboards)).Methods(http.MethodGet)
}
func Intersection(a, b []int) (c []int) {

View File

@@ -11,13 +11,16 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"go.uber.org/zap"
)
@@ -128,6 +131,40 @@ func (ic *LogParsingPipelineController) ValidatePipelines(ctx context.Context,
return err
}
func (ic *LogParsingPipelineController) getDefaultPipelines() ([]pipelinetypes.GettablePipeline, error) {
defaultPipelines := []pipelinetypes.GettablePipeline{}
if querybuilder.BodyJSONQueryEnabled {
preprocessingPipeline := pipelinetypes.GettablePipeline{
StoreablePipeline: pipelinetypes.StoreablePipeline{
Name: "Default Pipeline - PreProcessing Body",
Alias: "NormalizeBodyDefault",
Enabled: true,
},
Filter: &v3.FilterSet{
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "body",
},
Operator: v3.FilterOperatorExists,
},
},
},
Config: []pipelinetypes.PipelineOperator{
{
ID: uuid.NewString(),
Type: "normalize",
Enabled: true,
If: "body != nil",
},
},
}
defaultPipelines = append(defaultPipelines, preprocessingPipeline)
}
return defaultPipelines, nil
}
// Returns effective list of pipelines including user created
// pipelines and pipelines for installed integrations
func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion(
@@ -258,6 +295,13 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
return nil, "", err
}
// recommend default pipelines along with user created pipelines
defaultPipelines, err := pc.getDefaultPipelines()
if err != nil {
return nil, "", model.InternalError(fmt.Errorf("failed to get default pipelines: %w", err))
}
pipelinesResp.Pipelines = append(pipelinesResp.Pipelines, defaultPipelines...)
updatedConf, err := GenerateCollectorConfigWithPipelines(currentConfYaml, pipelinesResp.Pipelines)
if err != nil {
return nil, "", err

View File

@@ -132,7 +132,7 @@ func SignozLogsToPLogs(logs []model.SignozLog) []plog.Logs {
slRecord.SetSeverityText(log.SeverityText)
slRecord.SetSeverityNumber(plog.SeverityNumber(log.SeverityNumber))
slRecord.Body().SetStr(log.Body)
slRecord.Body().FromRaw(log.Body)
slAttribs := slRecord.Attributes()
for k, v := range log.Attributes_int64 {

View File

@@ -764,8 +764,10 @@ func (m *Manager) prepareTestNotifyFunc() NotifyFunc {
a := &alertmanagertypes.PostableAlert{}
a.Annotations = alert.Annotations.Map()
a.StartsAt = strfmt.DateTime(alert.FiredAt)
labelsMap := alert.Labels.Map()
labelsMap[labels.TestAlertLabel] = "true"
a.Alert = alertmanagertypes.AlertModel{
Labels: alert.Labels.Map(),
Labels: labelsMap,
GeneratorURL: strfmt.URI(generatorURL),
}
if !alert.ResolvedAt.IsZero() {

View File

@@ -17,6 +17,7 @@ const (
MetricNameLabel = "__name__"
TemporalityLabel = "__temporality__"
AlertNameLabel = "alertname"
TestAlertLabel = "testalert"
NoDataLabel = "nodata"
// AlertStateLabel is the label name indicating the state of an alert.

View File

@@ -20,7 +20,6 @@ type aggExprRewriter struct {
fullTextColumn *telemetrytypes.TelemetryFieldKey
fieldMapper qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
jsonBodyPrefix string
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
}
@@ -31,7 +30,6 @@ func NewAggExprRewriter(
fullTextColumn *telemetrytypes.TelemetryFieldKey,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
jsonBodyPrefix string,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *aggExprRewriter {
set := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querybuilder/agg_rewrite")
@@ -41,7 +39,6 @@ func NewAggExprRewriter(
fullTextColumn: fullTextColumn,
fieldMapper: fieldMapper,
conditionBuilder: conditionBuilder,
jsonBodyPrefix: jsonBodyPrefix,
jsonKeyToKey: jsonKeyToKey,
}
}
@@ -81,7 +78,6 @@ func (r *aggExprRewriter) Rewrite(
r.fullTextColumn,
r.fieldMapper,
r.conditionBuilder,
r.jsonBodyPrefix,
r.jsonKeyToKey,
)
// Rewrite the first select item (our expression)
@@ -129,7 +125,6 @@ type exprVisitor struct {
fullTextColumn *telemetrytypes.TelemetryFieldKey
fieldMapper qbtypes.FieldMapper
conditionBuilder qbtypes.ConditionBuilder
jsonBodyPrefix string
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
Modified bool
chArgs []any
@@ -142,7 +137,6 @@ func newExprVisitor(
fullTextColumn *telemetrytypes.TelemetryFieldKey,
fieldMapper qbtypes.FieldMapper,
conditionBuilder qbtypes.ConditionBuilder,
jsonBodyPrefix string,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *exprVisitor {
return &exprVisitor{
@@ -151,7 +145,6 @@ func newExprVisitor(
fullTextColumn: fullTextColumn,
fieldMapper: fieldMapper,
conditionBuilder: conditionBuilder,
jsonBodyPrefix: jsonBodyPrefix,
jsonKeyToKey: jsonKeyToKey,
}
}
@@ -190,7 +183,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
if aggFunc.FuncCombinator {
// Map the predicate (last argument)
origPred := args[len(args)-1].String()
whereClause, err := PrepareWhereClause(
whereClause, err := PrepareWhereClause(
origPred,
FilterExprVisitorOpts{
Logger: v.logger,
@@ -199,7 +192,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
ConditionBuilder: v.conditionBuilder,
FullTextColumn: v.fullTextColumn,
JsonKeyToKey: v.jsonKeyToKey,
}, 0, 0,
}, 0, 0,
)
if err != nil {
return err
@@ -219,7 +212,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
for i := 0; i < len(args)-1; i++ {
origVal := args[i].String()
fieldKey := telemetrytypes.GetFieldKeyFromKeyText(origVal)
expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonBodyPrefix, v.jsonKeyToKey)
expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonKeyToKey)
if err != nil {
return errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "failed to get table field name for %q", origVal)
}
@@ -237,7 +230,7 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
for i, arg := range args {
orig := arg.String()
fieldKey := telemetrytypes.GetFieldKeyFromKeyText(orig)
expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonBodyPrefix, v.jsonKeyToKey)
expr, exprArgs, err := CollisionHandledFinalExpr(context.Background(), &fieldKey, v.fieldMapper, v.conditionBuilder, v.fieldKeys, dataType, v.jsonKeyToKey)
if err != nil {
return err
}

View File

@@ -24,7 +24,6 @@ func CollisionHandledFinalExpr(
cb qbtypes.ConditionBuilder,
keys map[string][]*telemetrytypes.TelemetryFieldKey,
requiredDataType telemetrytypes.FieldDataType,
jsonBodyPrefix string,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) (string, []any, error) {
@@ -45,7 +44,7 @@ func CollisionHandledFinalExpr(
addCondition := func(key *telemetrytypes.TelemetryFieldKey) error {
sb := sqlbuilder.NewSelectBuilder()
condition, err := cb.ConditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb, 0, 0)
condition, err := cb.ConditionFor(ctx, key, qbtypes.FilterOperatorExists, nil, sb, 0, 0)
if err != nil {
return err
}
@@ -58,8 +57,8 @@ func CollisionHandledFinalExpr(
return nil
}
colName, err := fm.FieldFor(ctx, field)
if errors.Is(err, qbtypes.ErrColumnNotFound) {
colName, fieldForErr := fm.FieldFor(ctx, field)
if errors.Is(fieldForErr, qbtypes.ErrColumnNotFound) {
// the key didn't have the right context to be added to the query
// we try to use the context we know of
keysForField := keys[field.Name]
@@ -82,10 +81,10 @@ func CollisionHandledFinalExpr(
correction, found := telemetrytypes.SuggestCorrection(field.Name, maps.Keys(keys))
if found {
// we found a close match, in the error message send the suggestion
return "", nil, errors.Wrap(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction)
return "", nil, errors.WithAdditionalf(fieldForErr, "%s", correction)
} else {
// not even a close match, return an error
return "", nil, errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field `%s` not found", field.Name)
return "", nil, errors.WithAdditionalf(fieldForErr, "field `%s` not found", field.Name)
}
} else {
for _, key := range keysForField {
@@ -104,10 +103,11 @@ func CollisionHandledFinalExpr(
return "", nil, err
}
if strings.HasPrefix(field.Name, jsonBodyPrefix) && jsonBodyPrefix != "" && jsonKeyToKey != nil {
// TODO(nitya): enable group by on body column?
// first if condition covers the older tests and second if condition covers the array conditions
if !BodyJSONQueryEnabled && field.FieldContext == telemetrytypes.FieldContextBody && jsonKeyToKey != nil {
return "", nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "Group by/Aggregation isn't available for the body column")
// colName, _ = jsonKeyToKey(context.Background(), field, qbtypes.FilterOperatorUnknown, dummyValue)
} else if strings.Contains(field.Name, telemetrytypes.ArraySep) || strings.Contains(field.Name, telemetrytypes.ArrayAnyIndex) {
return "", nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "Group by/Aggregation isn't available for the Array Paths: %s", field.Name)
} else {
colName, _ = DataTypeCollisionHandledFieldName(field, dummyValue, colName, qbtypes.FilterOperatorUnknown)
}
@@ -204,7 +204,7 @@ func DataTypeCollisionHandledFieldName(key *telemetrytypes.TelemetryFieldKey, va
// While we expect user not to send the mixed data types, it inevitably happens
// So we handle the data type collisions here
switch key.FieldDataType {
case telemetrytypes.FieldDataTypeString:
case telemetrytypes.FieldDataTypeString, telemetrytypes.FieldDataTypeArrayString:
switch v := value.(type) {
case float64:
// try to convert the string value to to number
@@ -220,7 +220,12 @@ func DataTypeCollisionHandledFieldName(key *telemetrytypes.TelemetryFieldKey, va
value = fmt.Sprintf("%t", v)
}
case telemetrytypes.FieldDataTypeFloat64, telemetrytypes.FieldDataTypeInt64, telemetrytypes.FieldDataTypeNumber:
case telemetrytypes.FieldDataTypeInt64,
telemetrytypes.FieldDataTypeArrayInt64,
telemetrytypes.FieldDataTypeNumber,
telemetrytypes.FieldDataTypeArrayNumber,
telemetrytypes.FieldDataTypeFloat64,
telemetrytypes.FieldDataTypeArrayFloat64:
switch v := value.(type) {
// why? ; CH returns an error for a simple check
// attributes_number['http.status_code'] = 200 but not for attributes_number['http.status_code'] >= 200
@@ -258,7 +263,8 @@ func DataTypeCollisionHandledFieldName(key *telemetrytypes.TelemetryFieldKey, va
}
}
case telemetrytypes.FieldDataTypeBool:
case telemetrytypes.FieldDataTypeBool,
telemetrytypes.FieldDataTypeArrayBool:
switch v := value.(type) {
case string:
tblFieldName = castString(tblFieldName)

View File

@@ -43,7 +43,6 @@ type resourceFilterStatementBuilder[T any] struct {
signal telemetrytypes.Signal
fullTextColumn *telemetrytypes.TelemetryFieldKey
jsonBodyPrefix string
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
}
@@ -76,7 +75,6 @@ func NewLogResourceFilterStatementBuilder(
conditionBuilder qbtypes.ConditionBuilder,
metadataStore telemetrytypes.MetadataStore,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
jsonBodyPrefix string,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *resourceFilterStatementBuilder[qbtypes.LogAggregation] {
set := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/querybuilder/resourcefilter")
@@ -87,7 +85,6 @@ func NewLogResourceFilterStatementBuilder(
metadataStore: metadataStore,
signal: telemetrytypes.SignalLogs,
fullTextColumn: fullTextColumn,
jsonBodyPrefix: jsonBodyPrefix,
jsonKeyToKey: jsonKeyToKey,
}
}
@@ -100,12 +97,18 @@ func (b *resourceFilterStatementBuilder[T]) getKeySelectors(query qbtypes.QueryB
keySelectors = append(keySelectors, whereClauseSelectors...)
}
// exclude out the body related key selectors
filteredKeySelectors := []*telemetrytypes.FieldKeySelector{}
for idx := range keySelectors {
if keySelectors[idx].FieldContext == telemetrytypes.FieldContextBody {
continue
}
keySelectors[idx].Signal = b.signal
keySelectors[idx].SelectorMatchType = telemetrytypes.FieldSelectorMatchTypeExact
filteredKeySelectors = append(filteredKeySelectors, keySelectors[idx])
}
return keySelectors
return filteredKeySelectors
}
// Build builds a SQL query based on the given parameters
@@ -168,7 +171,7 @@ func (b *resourceFilterStatementBuilder[T]) addConditions(
// there is no need for "key" not found error for resource filtering
IgnoreNotFoundKeys: true,
Variables: variables,
}, start, end)
}, start, end)
if err != nil {
return err

View File

@@ -16,14 +16,13 @@ type provider struct {
ruleStore ruletypes.RuleStore
}
func NewFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[ruler.Ruler, ruler.Config] {
func NewFactory(sqlstore sqlstore.SQLStore, queryParser queryparser.QueryParser) factory.ProviderFactory[ruler.Ruler, ruler.Config] {
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, settings factory.ProviderSettings, config ruler.Config) (ruler.Ruler, error) {
return New(ctx, settings, config, sqlstore)
return New(ctx, settings, config, sqlstore, queryParser)
})
}
func New(ctx context.Context, settings factory.ProviderSettings, config ruler.Config, sqlstore sqlstore.SQLStore) (ruler.Ruler, error) {
queryParser := queryparser.New(settings)
func New(ctx context.Context, settings factory.ProviderSettings, config ruler.Config, sqlstore sqlstore.SQLStore, queryParser queryparser.QueryParser) (ruler.Ruler, error) {
return &provider{ruleStore: sqlrulestore.NewRuleStore(sqlstore, queryParser, settings)}, nil
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation"
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/modules/promote"
@@ -45,6 +46,7 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
struct{ flagger.Handler }{},
struct{ dashboard.Module }{},
struct{ dashboard.Handler }{},
struct{ metricsexplorer.Handler }{},
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
if err != nil {
return nil, err

View File

@@ -34,6 +34,7 @@ import (
"github.com/SigNoz/signoz/pkg/prometheus/clickhouseprometheus"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/querier/signozquerier"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/ruler"
"github.com/SigNoz/signoz/pkg/ruler/signozruler"
"github.com/SigNoz/signoz/pkg/sharder"
@@ -197,9 +198,9 @@ func NewAlertmanagerProviderFactories(sqlstore sqlstore.SQLStore, orgGetter orga
)
}
func NewRulerProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedMap[factory.ProviderFactory[ruler.Ruler, ruler.Config]] {
func NewRulerProviderFactories(sqlstore sqlstore.SQLStore, queryParser queryparser.QueryParser) factory.NamedMap[factory.ProviderFactory[ruler.Ruler, ruler.Config]] {
return factory.MustNewNamedMap(
signozruler.NewFactory(sqlstore),
signozruler.NewFactory(sqlstore, queryParser),
)
}
@@ -245,6 +246,7 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
handlers.FlaggerHandler,
modules.Dashboard,
handlers.Dashboard,
handlers.MetricsExplorer,
),
)
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlschema/sqlschematest"
"github.com/SigNoz/signoz/pkg/sqlstore"
@@ -61,7 +62,8 @@ func TestNewProviderFactories(t *testing.T) {
})
assert.NotPanics(t, func() {
NewRulerProviderFactories(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual))
queryParser := queryparser.New(instrumentationtest.New().ToProviderSettings())
NewRulerProviderFactories(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), queryParser)
})
assert.NotPanics(t, func() {

View File

@@ -311,12 +311,15 @@ func New(
return nil, err
}
// Initialize query parser
queryParser := queryparser.New(providerSettings)
// Initialize ruler from the available ruler provider factories
ruler, err := factory.NewProviderFromNamedMap(
ctx,
providerSettings,
config.Ruler,
NewRulerProviderFactories(sqlstore),
NewRulerProviderFactories(sqlstore, queryParser),
"signoz",
)
if err != nil {
@@ -333,9 +336,6 @@ func New(
return nil, err
}
// Initialize query parser
queryParser := queryparser.New(providerSettings)
// Initialize authns
store := sqlauthnstore.NewStore(sqlstore)
authNs, err := authNsCallback(ctx, providerSettings, store, licensing)

View File

@@ -16,11 +16,12 @@ import (
)
type conditionBuilder struct {
fm qbtypes.FieldMapper
fm qbtypes.FieldMapper
metadataStore telemetrytypes.MetadataStore
}
func NewConditionBuilder(fm qbtypes.FieldMapper) *conditionBuilder {
return &conditionBuilder{fm: fm}
func NewConditionBuilder(fm qbtypes.FieldMapper, metadataStore telemetrytypes.MetadataStore) *conditionBuilder {
return &conditionBuilder{fm: fm, metadataStore: metadataStore}
}
func (c *conditionBuilder) conditionFor(
@@ -30,22 +31,23 @@ func (c *conditionBuilder) conditionFor(
value any,
sb *sqlbuilder.SelectBuilder,
) (string, error) {
switch operator {
case qbtypes.FilterOperatorContains,
qbtypes.FilterOperatorNotContains,
qbtypes.FilterOperatorILike,
qbtypes.FilterOperatorNotILike,
qbtypes.FilterOperatorLike,
qbtypes.FilterOperatorNotLike:
value = querybuilder.FormatValueForContains(value)
}
column, err := c.fm.ColumnFor(ctx, key)
if err != nil {
return "", err
}
if column.IsJSONColumn() && querybuilder.BodyJSONQueryEnabled {
cond, err := c.buildJSONCondition(ctx, key, operator, value, sb)
if err != nil {
return "", err
}
return cond, nil
}
if operator.IsStringSearchOperator() {
value = querybuilder.FormatValueForContains(value)
}
tblFieldName, err := c.fm.FieldFor(ctx, key)
if err != nil {
return "", err
@@ -163,9 +165,7 @@ func (c *conditionBuilder) conditionFor(
// in the UI based query builder, `exists` and `not exists` are used for
// key membership checks, so depending on the column type, the condition changes
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
// Check if this is a body JSON search - by FieldContext
if key.FieldContext == telemetrytypes.FieldContextBody {
if key.FieldContext == telemetrytypes.FieldContextBody && !querybuilder.BodyJSONQueryEnabled {
if operator == qbtypes.FilterOperatorExists {
return GetBodyJSONKeyForExists(ctx, key, operator, value), nil
} else {
@@ -247,7 +247,7 @@ func (c *conditionBuilder) ConditionFor(
return "", err
}
if operator.AddDefaultExistsFilter() {
if !(key.FieldContext == telemetrytypes.FieldContextBody && querybuilder.BodyJSONQueryEnabled) && operator.AddDefaultExistsFilter() {
// skip adding exists filter for intrinsic fields
// with an exception for body json search
field, _ := c.fm.FieldFor(ctx, key)

View File

@@ -373,7 +373,8 @@ func TestConditionFor(t *testing.T) {
}
fm := NewFieldMapper()
conditionBuilder := NewConditionBuilder(fm)
mockMetadataStore := buildTestTelemetryMetadataStore()
conditionBuilder := NewConditionBuilder(fm, mockMetadataStore)
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()
@@ -426,7 +427,8 @@ func TestConditionForMultipleKeys(t *testing.T) {
}
fm := NewFieldMapper()
conditionBuilder := NewConditionBuilder(fm)
mockMetadataStore := buildTestTelemetryMetadataStore()
conditionBuilder := NewConditionBuilder(fm, mockMetadataStore)
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()
@@ -685,7 +687,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
}
fm := NewFieldMapper()
conditionBuilder := NewConditionBuilder(fm)
mockMetadataStore := buildTestTelemetryMetadataStore()
conditionBuilder := NewConditionBuilder(fm, mockMetadataStore)
for _, tc := range testCases {
sb := sqlbuilder.NewSelectBuilder()

View File

@@ -2,7 +2,6 @@ package telemetrylogs
import (
"github.com/SigNoz/signoz-otel-collector/constants"
"github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
@@ -37,8 +36,6 @@ const (
BodyJSONColumnPrefix = constants.BodyJSONColumnPrefix
BodyPromotedColumnPrefix = constants.BodyPromotedColumnPrefix
ArraySep = jsontypeexporter.ArraySeparator
ArrayAnyIndex = "[*]."
)
var (
@@ -48,8 +45,7 @@ var (
FieldContext: telemetrytypes.FieldContextLog,
FieldDataType: telemetrytypes.FieldDataTypeString,
}
BodyJSONStringSearchPrefix = `body.`
IntrinsicFields = map[string]telemetrytypes.TelemetryFieldKey{
IntrinsicFields = map[string]telemetrytypes.TelemetryFieldKey{
"body": {
Name: "body",
Signal: telemetrytypes.SignalLogs,

View File

@@ -6,7 +6,9 @@ import (
"strings"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz-otel-collector/utils"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
@@ -28,6 +30,11 @@ var (
"severity_text": {Name: "severity_text", Type: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}},
"severity_number": {Name: "severity_number", Type: schema.ColumnTypeUInt8},
"body": {Name: "body", Type: schema.ColumnTypeString},
LogsV2BodyJSONColumn: {Name: LogsV2BodyJSONColumn, Type: schema.JSONColumnType{
MaxDynamicTypes: utils.ToPointer(uint(32)),
MaxDynamicPaths: utils.ToPointer(uint(0)),
}},
LogsV2BodyPromotedColumn: {Name: LogsV2BodyPromotedColumn, Type: schema.JSONColumnType{}},
"attributes_string": {Name: "attributes_string", Type: schema.MapColumnType{
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
ValueType: schema.ColumnTypeString,
@@ -83,13 +90,23 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
return logsV2Columns["attributes_bool"], nil
}
case telemetrytypes.FieldContextBody:
// body context fields are stored in the body column
// Body context is for JSON body fields
// Use body_json if feature flag is enabled
if querybuilder.BodyJSONQueryEnabled {
return logsV2Columns[LogsV2BodyJSONColumn], nil
}
// Fall back to legacy body column
return logsV2Columns["body"], nil
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextUnspecified:
col, ok := logsV2Columns[key.Name]
if !ok {
// check if the key has body JSON search (backward compatibility)
if strings.HasPrefix(key.Name, BodyJSONStringSearchPrefix) {
// check if the key has body JSON search
if strings.HasPrefix(key.Name, telemetrytypes.BodyJSONStringSearchPrefix) {
// Use body_json if feature flag is enabled and we have a body condition builder
if querybuilder.BodyJSONQueryEnabled {
return logsV2Columns[LogsV2BodyJSONColumn], nil
}
// Fall back to legacy body column
return logsV2Columns["body"], nil
}
return nil, qbtypes.ErrColumnNotFound
@@ -109,20 +126,36 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
switch column.Type.GetType() {
case schema.ColumnTypeEnumJSON:
// json is only supported for resource context as of now
if key.FieldContext != telemetrytypes.FieldContextResource {
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
}
oldColumn := logsV2Columns["resources_string"]
oldKeyName := fmt.Sprintf("%s['%s']", oldColumn.Name, key.Name)
switch key.FieldContext {
case telemetrytypes.FieldContextResource:
oldColumn := logsV2Columns["resources_string"]
oldKeyName := fmt.Sprintf("%s['%s']", oldColumn.Name, key.Name)
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
// once clickHouse dependency is updated, we need to check if we can remove it.
if key.Materialized {
oldKeyName = telemetrytypes.FieldKeyToMaterializedColumnName(key)
oldKeyNameExists := telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, %s==true, %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldKeyNameExists, oldKeyName), nil
} else {
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
// once clickHouse dependency is updated, we need to check if we can remove it.
if key.Materialized {
oldKeyName = telemetrytypes.FieldKeyToMaterializedColumnName(key)
oldKeyNameExists := telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, %s==true, %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldKeyNameExists, oldKeyName), nil
}
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
case telemetrytypes.FieldContextBody:
if querybuilder.BodyJSONQueryEnabled && (strings.Contains(key.Name, telemetrytypes.ArraySep) || strings.Contains(key.Name, telemetrytypes.ArrayAnyIndex)) {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "FieldFor not supported for the Array Paths: %s", key.Name)
}
if key.JSONDataType == nil {
return "", qbtypes.ErrColumnNotFound
}
fieldExpr := BodyJSONColumnPrefix + fmt.Sprintf("`%s`", key.Name)
expr := fmt.Sprintf("dynamicElement(%s, '%s')", fieldExpr, key.JSONDataType.StringValue())
if key.Materialized {
promotedFieldExpr := BodyPromotedColumnPrefix + fmt.Sprintf("`%s`", key.Name)
expr = fmt.Sprintf("coalesce(%s, %s)", expr, fmt.Sprintf("dynamicElement(%s, '%s')", promotedFieldExpr, key.JSONDataType.StringValue()))
}
return expr, nil
default:
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String)
}
case schema.ColumnTypeEnumLowCardinality:
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {

View File

@@ -11,7 +11,7 @@ import (
// TestLikeAndILikeWithoutWildcards_Warns Tests that LIKE/ILIKE without wildcards add warnings and include docs URL
func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
cb := NewConditionBuilder(fm, nil)
keys := buildCompleteFieldKeyMap()
@@ -33,7 +33,7 @@ func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
for _, expr := range tests {
t.Run(expr, func(t *testing.T) {
clause, err := querybuilder.PrepareWhereClause(expr, opts, 0, 0)
clause, err := querybuilder.PrepareWhereClause(expr, opts, 0, 0)
require.NoError(t, err)
require.NotNil(t, clause)
@@ -47,7 +47,7 @@ func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
// TestLikeAndILikeWithWildcards_NoWarn Tests that LIKE/ILIKE with wildcards do not add warnings
func TestLikeAndILikeWithWildcards_NoWarn(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
cb := NewConditionBuilder(fm, nil)
keys := buildCompleteFieldKeyMap()
@@ -69,7 +69,7 @@ func TestLikeAndILikeWithWildcards_NoWarn(t *testing.T) {
for _, expr := range tests {
t.Run(expr, func(t *testing.T) {
clause, err := querybuilder.PrepareWhereClause(expr, opts, 0, 0)
clause, err := querybuilder.PrepareWhereClause(expr, opts, 0, 0)
require.NoError(t, err)
require.NotNil(t, clause)

View File

@@ -7,6 +7,7 @@ import (
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/huandu/go-sqlbuilder"
"github.com/stretchr/testify/require"
)
@@ -14,8 +15,7 @@ import (
// TestFilterExprLogsBodyJSON tests a comprehensive set of query patterns for body JSON search
func TestFilterExprLogsBodyJSON(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
cb := NewConditionBuilder(fm, telemetrytypestest.NewMockMetadataStore())
// Define a comprehensive set of field keys to support all test cases
keys := buildCompleteFieldKeyMap()

View File

@@ -16,7 +16,7 @@ import (
// TestFilterExprLogs tests a comprehensive set of query patterns for logs search
func TestFilterExprLogs(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
cb := NewConditionBuilder(fm, nil)
// Define a comprehensive set of field keys to support all test cases
keys := buildCompleteFieldKeyMap()
@@ -2423,7 +2423,7 @@ func TestFilterExprLogs(t *testing.T) {
// TestFilterExprLogs tests a comprehensive set of query patterns for logs search
func TestFilterExprLogsConflictNegation(t *testing.T) {
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
cb := NewConditionBuilder(fm, nil)
// Define a comprehensive set of field keys to support all test cases
keys := buildCompleteFieldKeyMap()

View File

@@ -19,18 +19,18 @@ type JSONAccessPlanBuilder struct {
value any
op qbtypes.FilterOperator
parts []string
getTypes func(ctx context.Context, path string) ([]telemetrytypes.JSONDataType, error)
isPromoted bool
typeCache map[string][]telemetrytypes.JSONDataType
}
// buildPlan recursively builds the path plan tree
func (pb *JSONAccessPlanBuilder) buildPlan(ctx context.Context, index int, parent *telemetrytypes.JSONAccessNode, isDynArrChild bool) (*telemetrytypes.JSONAccessNode, error) {
func (pb *JSONAccessPlanBuilder) buildPlan(index int, parent *telemetrytypes.JSONAccessNode, isDynArrChild bool) (*telemetrytypes.JSONAccessNode, error) {
if index >= len(pb.parts) {
return nil, errors.NewInvalidInputf(CodePlanIndexOutOfBounds, "index is out of bounds")
}
part := pb.parts[index]
pathSoFar := strings.Join(pb.parts[:index+1], ArraySep)
pathSoFar := strings.Join(pb.parts[:index+1], telemetrytypes.ArraySep)
isTerminal := index == len(pb.parts)-1
// Calculate progression parameters based on parent's values
@@ -52,10 +52,8 @@ func (pb *JSONAccessPlanBuilder) buildPlan(ctx context.Context, index int, paren
}
}
types, err := pb.getTypes(ctx, pathSoFar)
if err != nil {
return nil, err
}
// Use cached types from the batched metadata query
types := pb.typeCache[pathSoFar]
// Create node for this path segment
node := &telemetrytypes.JSONAccessNode{
@@ -80,14 +78,15 @@ func (pb *JSONAccessPlanBuilder) buildPlan(ctx context.Context, index int, paren
ValueType: telemetrytypes.MappingFieldDataTypeToJSONDataType[valueType],
}
} else {
var err error
if hasJSON {
node.Branches[telemetrytypes.BranchJSON], err = pb.buildPlan(ctx, index+1, node, false)
node.Branches[telemetrytypes.BranchJSON], err = pb.buildPlan(index+1, node, false)
if err != nil {
return nil, err
}
}
if hasDynamic {
node.Branches[telemetrytypes.BranchDynamic], err = pb.buildPlan(ctx, index+1, node, true)
node.Branches[telemetrytypes.BranchDynamic], err = pb.buildPlan(index+1, node, true)
if err != nil {
return nil, err
}
@@ -101,29 +100,58 @@ func (pb *JSONAccessPlanBuilder) buildPlan(ctx context.Context, index int, paren
// that precomputes all possible branches and their types
func PlanJSON(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, op qbtypes.FilterOperator,
value any,
getTypes func(ctx context.Context, path string) ([]telemetrytypes.JSONDataType, error),
metadataStore telemetrytypes.MetadataStore,
) (telemetrytypes.JSONAccessPlan, error) {
// if path is empty, return nil
if key.Name == "" {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "path is empty")
}
// TODO: PlanJSON requires the Start and End of the Query to select correct column between promoted and body_json using
// creation time in distributed_promoted_paths
path := strings.ReplaceAll(key.Name, ArrayAnyIndex, ArraySep)
parts := strings.Split(path, ArraySep)
path := strings.ReplaceAll(key.Name, telemetrytypes.ArrayAnyIndex, telemetrytypes.ArraySep)
parts := strings.Split(path, telemetrytypes.ArraySep)
// Pre-fetch JSON types for all path prefixes in a single metadata call to avoid
// multiple small DB queries during plan construction.
// Extract all path prefixes that will be needed during recursive buildPlan calls
selectors := make([]*telemetrytypes.FieldKeySelector, 0, len(parts))
for i := range parts {
pathSoFar := strings.Join(parts[:i+1], telemetrytypes.ArraySep)
selectors = append(selectors, &telemetrytypes.FieldKeySelector{
Name: pathSoFar,
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeExact,
Signal: telemetrytypes.SignalLogs,
Limit: 1,
})
}
keys, _, err := metadataStore.GetKeysMulti(ctx, selectors)
if err != nil {
return nil, err
}
// Build type cache from the batched results
typeCache := make(map[string][]telemetrytypes.JSONDataType, len(keys))
for name, ks := range keys {
types := make([]telemetrytypes.JSONDataType, 0, len(ks))
for _, k := range ks {
if k.JSONDataType != nil {
types = append(types, *k.JSONDataType)
}
}
typeCache[name] = types
}
pb := &JSONAccessPlanBuilder{
key: key,
op: op,
value: value,
parts: parts,
getTypes: getTypes,
isPromoted: key.Materialized,
typeCache: typeCache,
}
plans := telemetrytypes.JSONAccessPlan{}
node, err := pb.buildPlan(ctx, 0,
node, err := pb.buildPlan(0,
telemetrytypes.NewRootJSONAccessNode(LogsV2BodyJSONColumn,
32, 0),
false,
@@ -133,8 +161,10 @@ func PlanJSON(ctx context.Context, key *telemetrytypes.TelemetryFieldKey, op qbt
}
plans = append(plans, node)
// TODO: PlanJSON requires the Start and End of the Query to select correct column between promoted and body_json using
// creation time in distributed_promoted_paths
if pb.isPromoted {
node, err := pb.buildPlan(ctx, 0,
node, err := pb.buildPlan(0,
telemetrytypes.NewRootJSONAccessNode(LogsV2BodyPromotedColumn,
32, 1024),
true,

View File

@@ -6,6 +6,7 @@ import (
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes/telemetrytypestest"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)
@@ -23,13 +24,6 @@ func makeKey(name string, dataType telemetrytypes.JSONDataType, materialized boo
}
}
// makeGetTypes creates a getTypes function from a map of path -> types
func makeGetTypes(typesMap map[string][]telemetrytypes.JSONDataType) func(ctx context.Context, path string) ([]telemetrytypes.JSONDataType, error) {
return func(_ context.Context, path string) ([]telemetrytypes.JSONDataType, error) {
return typesMap[path], nil
}
}
// ============================================================================
// Helper Functions for Node Validation
// ============================================================================
@@ -237,7 +231,7 @@ func TestNode_FieldPath(t *testing.T) {
// ============================================================================
func TestPlanJSON_BasicStructure(t *testing.T) {
_, getTypes := testTypeSet()
_, metadataStore := testTypeSet()
tests := []struct {
name string
@@ -292,7 +286,7 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
plans, err := PlanJSON(context.Background(), tt.key, qbtypes.FilterOperatorEqual, "John", getTypes)
plans, err := PlanJSON(context.Background(), tt.key, qbtypes.FilterOperatorEqual, "John", metadataStore)
if tt.expectErr {
require.Error(t, err)
require.Nil(t, plans)
@@ -306,7 +300,7 @@ func TestPlanJSON_BasicStructure(t *testing.T) {
}
func TestPlanJSON_ArrayPaths(t *testing.T) {
_, getTypes := testTypeSet()
_, metadataStore := testTypeSet()
tests := []struct {
name string
@@ -442,7 +436,7 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
key := makeKey(tt.path, telemetrytypes.String, false)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, "John", getTypes)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, "John", metadataStore)
require.NoError(t, err)
require.NotNil(t, plans)
require.Len(t, plans, 1)
@@ -453,13 +447,13 @@ func TestPlanJSON_ArrayPaths(t *testing.T) {
}
func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
_, getTypes := testTypeSet()
_, metadataStore := testTypeSet()
path := "education[].awards[].type"
value := "sports"
t.Run("Non-promoted plan", func(t *testing.T) {
key := makeKey(path, telemetrytypes.String, false)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, value, getTypes)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, value, metadataStore)
require.NoError(t, err)
require.Len(t, plans, 1)
@@ -501,7 +495,7 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
t.Run("Promoted plan", func(t *testing.T) {
key := makeKey(path, telemetrytypes.String, true)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, value, getTypes)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, value, metadataStore)
require.NoError(t, err)
require.Len(t, plans, 2)
@@ -576,7 +570,7 @@ func TestPlanJSON_PromotedVsNonPromoted(t *testing.T) {
}
func TestPlanJSON_EdgeCases(t *testing.T) {
_, getTypes := testTypeSet()
_, metadataStore := testTypeSet()
tests := []struct {
name string
@@ -695,7 +689,7 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
keyType = telemetrytypes.String
}
key := makeKey(tt.path, keyType, false)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, tt.value, getTypes)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, tt.value, metadataStore)
require.NoError(t, err)
got := plansToYAML(t, plans)
require.YAMLEq(t, tt.expectedYAML, got)
@@ -704,10 +698,10 @@ func TestPlanJSON_EdgeCases(t *testing.T) {
}
func TestPlanJSON_TreeStructure(t *testing.T) {
_, getTypes := testTypeSet()
_, metadataStore := testTypeSet()
path := "education[].awards[].participated[].team[].branch"
key := makeKey(path, telemetrytypes.String, false)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, "John", getTypes)
plans, err := PlanJSON(context.Background(), key, qbtypes.FilterOperatorEqual, "John", metadataStore)
require.NoError(t, err)
require.Len(t, plans, 1)
@@ -812,12 +806,9 @@ func TestPlanJSON_TreeStructure(t *testing.T) {
// Test Data Setup
// ============================================================================
// testTypeSet returns a map of path->types and a getTypes function for testing
// testTypeSet returns a map of path->types and a mock MetadataStore for testing
// This represents the type information available in the test JSON structure
//
// TODO(Piyush): Remove this unparam nolint
// nolint:unparam
func testTypeSet() (map[string][]telemetrytypes.JSONDataType, func(ctx context.Context, path string) ([]telemetrytypes.JSONDataType, error)) {
func testTypeSet() (map[string][]telemetrytypes.JSONDataType, telemetrytypes.MetadataStore) {
types := map[string][]telemetrytypes.JSONDataType{
"user.name": {telemetrytypes.String},
"user.age": {telemetrytypes.Int64, telemetrytypes.String},
@@ -876,5 +867,17 @@ func testTypeSet() (map[string][]telemetrytypes.JSONDataType, func(ctx context.C
"message": {telemetrytypes.String},
}
return types, makeGetTypes(types)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
for path, dataTypes := range types {
for _, dataType := range dataTypes {
mockMetadataStore.SetKey(&telemetrytypes.TelemetryFieldKey{
Name: path,
JSONDataType: &dataType,
Signal: telemetrytypes.SignalLogs,
FieldContext: telemetrytypes.FieldContextBody,
FieldDataType: telemetrytypes.MappingJSONDataTypeToFieldDataType[dataType],
})
}
}
return types, mockMetadataStore
}

View File

@@ -0,0 +1,479 @@
package telemetrylogs
import (
"context"
"fmt"
"slices"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/huandu/go-sqlbuilder"
)
var (
CodeCurrentNodeNil = errors.MustNewCode("current_node_nil")
CodeNextNodeNil = errors.MustNewCode("next_node_nil")
CodeNestedExpressionsEmpty = errors.MustNewCode("nested_expressions_empty")
CodeGroupByPlanEmpty = errors.MustNewCode("group_by_plan_empty")
CodeArrayMapExpressionsEmpty = errors.MustNewCode("array_map_expressions_empty")
CodePromotedPlanMissing = errors.MustNewCode("promoted_plan_missing")
CodeArrayNavigationFailed = errors.MustNewCode("array_navigation_failed")
)
// BuildCondition builds the full WHERE condition for body_json JSON paths
func (c *conditionBuilder) buildJSONCondition(ctx context.Context, key *telemetrytypes.TelemetryFieldKey,
operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
plan, err := PlanJSON(ctx, key, operator, value, c.metadataStore)
if err != nil {
return "", err
}
conditions := []string{}
for _, plan := range plan {
condition, err := c.emitPlannedCondition(plan, operator, value, sb)
if err != nil {
return "", err
}
conditions = append(conditions, condition)
}
return sb.Or(conditions...), nil
}
// emitPlannedCondition handles paths with array traversal
func (c *conditionBuilder) emitPlannedCondition(plan *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
// Build traversal + terminal recursively per-hop
compiled, err := c.recurseArrayHops(plan, operator, value, sb)
if err != nil {
return "", err
}
return compiled, nil
}
// buildTerminalCondition creates the innermost condition
func (c *conditionBuilder) buildTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
if node.TerminalConfig.ElemType.IsArray {
conditions := []string{}
// if the value type is not an array
// TODO(piyush): Confirm the Query built for Array case and add testcases for it later
if !node.TerminalConfig.ValueType.IsArray {
// if operator is a String search Operator, then we need to build one more String comparison condition along with the Strict match condition
if operator.IsStringSearchOperator() {
formattedValue := querybuilder.FormatValueForContains(value)
arrayCond, err := c.buildArrayMembershipCondition(node, operator, formattedValue, sb)
if err != nil {
return "", err
}
conditions = append(conditions, arrayCond)
}
// switch operator for array membership checks
switch operator {
case qbtypes.FilterOperatorContains, qbtypes.FilterOperatorIn:
operator = qbtypes.FilterOperatorEqual
case qbtypes.FilterOperatorNotContains, qbtypes.FilterOperatorNotIn:
operator = qbtypes.FilterOperatorNotEqual
}
}
arrayCond, err := c.buildArrayMembershipCondition(node, operator, value, sb)
if err != nil {
return "", err
}
conditions = append(conditions, arrayCond)
// or the conditions together
return sb.Or(conditions...), nil
}
return c.buildPrimitiveTerminalCondition(node, operator, value, sb)
}
// buildPrimitiveTerminalCondition builds the condition if the terminal node is a primitive type
// it handles the data type collisions and utilizes indexes for the condition if available
func (c *conditionBuilder) buildPrimitiveTerminalCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
fieldPath := node.FieldPath()
conditions := []string{}
var formattedValue any = value
if operator.IsStringSearchOperator() {
formattedValue = querybuilder.FormatValueForContains(value)
}
elemType := node.TerminalConfig.ElemType
fieldExpr := fmt.Sprintf("dynamicElement(%s, '%s')", fieldPath, elemType.StringValue())
fieldExpr, formattedValue = querybuilder.DataTypeCollisionHandledFieldName(node.TerminalConfig.Key, formattedValue, fieldExpr, operator)
// utilize indexes for the condition if available
indexed := slices.ContainsFunc(node.TerminalConfig.Key.Indexes, func(index telemetrytypes.JSONDataTypeIndex) bool {
return index.Type == elemType && index.ColumnExpression == fieldPath
})
if elemType.IndexSupported && indexed {
indexedExpr := assumeNotNull(fieldPath, elemType)
emptyValue := func() any {
switch elemType {
case telemetrytypes.String:
return ""
case telemetrytypes.Int64, telemetrytypes.Float64, telemetrytypes.Bool:
return 0
default:
return nil
}
}()
// switch the operator and value for exists and not exists
switch operator {
case qbtypes.FilterOperatorExists:
operator = qbtypes.FilterOperatorNotEqual
value = emptyValue
case qbtypes.FilterOperatorNotExists:
operator = qbtypes.FilterOperatorEqual
value = emptyValue
default:
// do nothing
}
indexedExpr, indexedComparisonValue := querybuilder.DataTypeCollisionHandledFieldName(node.TerminalConfig.Key, formattedValue, indexedExpr, operator)
cond, err := c.applyOperator(sb, indexedExpr, operator, indexedComparisonValue)
if err != nil {
return "", err
}
// if qb has a definitive value, we can skip adding a condition to
// check the existence of the path in the json column
if value != emptyValue {
return cond, nil
}
conditions = append(conditions, cond)
// Switch operator to EXISTS since indexed paths on assumedNotNull, indexes will always have a default value
// So we flip the operator to Exists and filter the rows that actually have the value
operator = qbtypes.FilterOperatorExists
}
cond, err := c.applyOperator(sb, fieldExpr, operator, formattedValue)
if err != nil {
return "", err
}
conditions = append(conditions, cond)
if len(conditions) > 1 {
return sb.And(conditions...), nil
}
return conditions[0], nil
}
// buildArrayMembershipCondition handles array membership checks
func (c *conditionBuilder) buildArrayMembershipCondition(node *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
arrayPath := node.FieldPath()
localKeyCopy := *node.TerminalConfig.Key
// create typed array out of a dynamic array
filteredDynamicExpr := func() string {
// Change the field data type from []dynamic to the value type
// since we've filtered the value type out of the dynamic array, we need to change the field data corresponding to the value type
localKeyCopy.FieldDataType = telemetrytypes.MappingJSONDataTypeToFieldDataType[telemetrytypes.ScalerTypeToArrayType[node.TerminalConfig.ValueType]]
baseArrayDynamicExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", arrayPath)
return fmt.Sprintf("arrayMap(x->dynamicElement(x, '%s'), arrayFilter(x->(dynamicType(x) = '%s'), %s))",
node.TerminalConfig.ValueType.StringValue(),
node.TerminalConfig.ValueType.StringValue(),
baseArrayDynamicExpr)
}
typedArrayExpr := func() string {
return fmt.Sprintf("dynamicElement(%s, '%s')", arrayPath, node.TerminalConfig.ElemType.StringValue())
}
var arrayExpr string
if node.TerminalConfig.ElemType == telemetrytypes.ArrayDynamic {
arrayExpr = filteredDynamicExpr()
} else {
arrayExpr = typedArrayExpr()
}
fieldExpr, value := querybuilder.DataTypeCollisionHandledFieldName(&localKeyCopy, value, "x", operator)
op, err := c.applyOperator(sb, fieldExpr, operator, value)
if err != nil {
return "", err
}
return fmt.Sprintf("arrayExists(%s -> %s, %s)", fieldExpr, op, arrayExpr), nil
}
// recurseArrayHops recursively builds array traversal conditions
func (c *conditionBuilder) recurseArrayHops(current *telemetrytypes.JSONAccessNode, operator qbtypes.FilterOperator, value any, sb *sqlbuilder.SelectBuilder) (string, error) {
if current == nil {
return "", errors.NewInternalf(CodeArrayNavigationFailed, "navigation failed, current node is nil")
}
if current.IsTerminal {
terminalCond, err := c.buildTerminalCondition(current, operator, value, sb)
if err != nil {
return "", err
}
return terminalCond, nil
}
currAlias := current.Alias()
fieldPath := current.FieldPath()
// Determine availability of Array(JSON) and Array(Dynamic) at this hop
hasArrayJSON := current.Branches[telemetrytypes.BranchJSON] != nil
hasArrayDynamic := current.Branches[telemetrytypes.BranchDynamic] != nil
// Then, at this hop, compute child per branch and wrap
branches := make([]string, 0, 2)
if hasArrayJSON {
jsonArrayExpr := fmt.Sprintf("dynamicElement(%s, 'Array(JSON(max_dynamic_types=%d, max_dynamic_paths=%d))')", fieldPath, current.MaxDynamicTypes, current.MaxDynamicPaths)
childGroupJSON, err := c.recurseArrayHops(current.Branches[telemetrytypes.BranchJSON], operator, value, sb)
if err != nil {
return "", err
}
branches = append(branches, fmt.Sprintf("arrayExists(%s-> %s, %s)", currAlias, childGroupJSON, jsonArrayExpr))
}
if hasArrayDynamic {
dynBaseExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", fieldPath)
dynFilteredExpr := fmt.Sprintf("arrayMap(x->dynamicElement(x, 'JSON'), arrayFilter(x->(dynamicType(x) = 'JSON'), %s))", dynBaseExpr)
// Create the Query for Dynamic array
childGroupDyn, err := c.recurseArrayHops(current.Branches[telemetrytypes.BranchDynamic], operator, value, sb)
if err != nil {
return "", err
}
branches = append(branches, fmt.Sprintf("arrayExists(%s-> %s, %s)", currAlias, childGroupDyn, dynFilteredExpr))
}
if len(branches) == 1 {
return branches[0], nil
}
return sb.Or(branches...), nil
}
func (c *conditionBuilder) applyOperator(sb *sqlbuilder.SelectBuilder, fieldExpr string, operator qbtypes.FilterOperator, value any) (string, error) {
switch operator {
case qbtypes.FilterOperatorEqual:
return sb.E(fieldExpr, value), nil
case qbtypes.FilterOperatorNotEqual:
return sb.NE(fieldExpr, value), nil
case qbtypes.FilterOperatorGreaterThan:
return sb.G(fieldExpr, value), nil
case qbtypes.FilterOperatorGreaterThanOrEq:
return sb.GE(fieldExpr, value), nil
case qbtypes.FilterOperatorLessThan:
return sb.LT(fieldExpr, value), nil
case qbtypes.FilterOperatorLessThanOrEq:
return sb.LE(fieldExpr, value), nil
case qbtypes.FilterOperatorLike:
return sb.Like(fieldExpr, value), nil
case qbtypes.FilterOperatorNotLike:
return sb.NotLike(fieldExpr, value), nil
case qbtypes.FilterOperatorILike:
return sb.ILike(fieldExpr, value), nil
case qbtypes.FilterOperatorNotILike:
return sb.NotILike(fieldExpr, value), nil
case qbtypes.FilterOperatorRegexp:
return fmt.Sprintf("match(%s, %s)", fieldExpr, sb.Var(value)), nil
case qbtypes.FilterOperatorNotRegexp:
return fmt.Sprintf("NOT match(%s, %s)", fieldExpr, sb.Var(value)), nil
case qbtypes.FilterOperatorContains:
return sb.ILike(fieldExpr, fmt.Sprintf("%%%v%%", value)), nil
case qbtypes.FilterOperatorNotContains:
return sb.NotILike(fieldExpr, fmt.Sprintf("%%%v%%", value)), nil
case qbtypes.FilterOperatorIn, qbtypes.FilterOperatorNotIn:
// emulate IN/NOT IN using OR/AND over equals to leverage indexes consistently
values, ok := value.([]any)
if !ok {
values = []any{value}
}
conds := []string{}
for _, v := range values {
if operator == qbtypes.FilterOperatorIn {
conds = append(conds, sb.E(fieldExpr, v))
} else {
conds = append(conds, sb.NE(fieldExpr, v))
}
}
if operator == qbtypes.FilterOperatorIn {
return sb.Or(conds...), nil
}
return sb.And(conds...), nil
case qbtypes.FilterOperatorExists:
return fmt.Sprintf("%s IS NOT NULL", fieldExpr), nil
case qbtypes.FilterOperatorNotExists:
return fmt.Sprintf("%s IS NULL", fieldExpr), nil
default:
return "", qbtypes.ErrUnsupportedOperator
}
}
// GroupByArrayJoinInfo contains information about array joins needed for GroupBy
type GroupByArrayJoinInfo struct {
ArrayJoinClauses []string // ARRAY JOIN clauses to add to FROM clause
TerminalExpr string // Terminal field expression for SELECT/GROUP BY
}
// BuildGroupBy builds GroupBy information for body JSON fields using arrayConcat pattern
//
// BuildGroupBy was designed to be used for group by queries on body JSON fields existings inside arrays but
// currently it is not used anywhere, considering this case suits more to Data Engineering instead of Observability space.
// This code should be removed in future.
func (c *conditionBuilder) BuildGroupBy(ctx context.Context, key *telemetrytypes.TelemetryFieldKey) (*GroupByArrayJoinInfo, error) {
path := strings.TrimPrefix(key.Name, telemetrytypes.BodyJSONStringSearchPrefix)
plan, err := PlanJSON(ctx, key, qbtypes.FilterOperatorExists, nil, c.metadataStore)
if err != nil {
return nil, err
}
if len(plan) == 0 {
return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput,
"Could not find any valid paths for: %s", path)
}
if plan[0].IsTerminal {
node := plan[0]
expr := fmt.Sprintf("dynamicElement(%s, '%s')", node.FieldPath(), node.TerminalConfig.ElemType.StringValue())
if key.Materialized {
if len(plan) < 2 {
return nil, errors.Newf(errors.TypeUnexpected, CodePromotedPlanMissing,
"plan length is less than 2 for promoted path: %s", path)
}
// promoted column first then body_json column
// TODO(Piyush): Change this in future for better performance
expr = fmt.Sprintf("coalesce(%s, %s)",
fmt.Sprintf("dynamicElement(%s, '%s')", plan[1].FieldPath(), plan[1].TerminalConfig.ElemType.StringValue()),
expr,
)
}
return &GroupByArrayJoinInfo{
ArrayJoinClauses: []string{},
TerminalExpr: expr,
}, nil
}
// Build arrayConcat pattern directly from the tree structure
arrayConcatExpr, err := c.buildArrayConcat(plan)
if err != nil {
return nil, err
}
// Create single ARRAY JOIN clause with arrayFlatten
arrayJoinClause := fmt.Sprintf("ARRAY JOIN %s AS `%s`", arrayConcatExpr, key.Name)
return &GroupByArrayJoinInfo{
ArrayJoinClauses: []string{arrayJoinClause},
TerminalExpr: fmt.Sprintf("`%s`", key.Name),
}, nil
}
// buildArrayConcat builds the arrayConcat pattern directly from the tree structure
func (c *conditionBuilder) buildArrayConcat(plan telemetrytypes.JSONAccessPlan) (string, error) {
if len(plan) == 0 {
return "", errors.Newf(errors.TypeInternal, CodeGroupByPlanEmpty, "group by plan is empty while building arrayConcat")
}
// Build arrayMap expressions for ALL available branches at the root level
var arrayMapExpressions []string
for _, node := range plan {
hasJSON := node.Branches[telemetrytypes.BranchJSON] != nil
hasDynamic := node.Branches[telemetrytypes.BranchDynamic] != nil
if hasJSON {
jsonExpr, err := c.buildArrayMap(node, telemetrytypes.BranchJSON)
if err != nil {
return "", err
}
arrayMapExpressions = append(arrayMapExpressions, jsonExpr)
}
if hasDynamic {
dynamicExpr, err := c.buildArrayMap(node, telemetrytypes.BranchDynamic)
if err != nil {
return "", err
}
arrayMapExpressions = append(arrayMapExpressions, dynamicExpr)
}
}
if len(arrayMapExpressions) == 0 {
return "", errors.Newf(errors.TypeInternal, CodeArrayMapExpressionsEmpty, "array map expressions are empty while building arrayConcat")
}
// Build the arrayConcat expression
arrayConcatExpr := fmt.Sprintf("arrayConcat(%s)", strings.Join(arrayMapExpressions, ", "))
// Wrap with arrayFlatten
arrayFlattenExpr := fmt.Sprintf("arrayFlatten(%s)", arrayConcatExpr)
return arrayFlattenExpr, nil
}
// buildArrayMap builds the arrayMap expression for a specific branch, handling all sub-branches
func (c *conditionBuilder) buildArrayMap(currentNode *telemetrytypes.JSONAccessNode, branchType telemetrytypes.JSONAccessBranchType) (string, error) {
if currentNode == nil {
return "", errors.Newf(errors.TypeInternal, CodeCurrentNodeNil, "current node is nil while building arrayMap")
}
nextNode := currentNode.Branches[branchType]
if nextNode == nil {
return "", errors.Newf(errors.TypeInternal, CodeNextNodeNil, "next node is nil while building arrayMap")
}
// Build the array expression for this level
var arrayExpr string
if branchType == telemetrytypes.BranchJSON {
// Array(JSON) branch
arrayExpr = fmt.Sprintf("dynamicElement(%s, 'Array(JSON(max_dynamic_types=%d, max_dynamic_paths=%d))')",
currentNode.FieldPath(), currentNode.MaxDynamicTypes, currentNode.MaxDynamicPaths)
} else {
// Array(Dynamic) branch - filter for JSON objects
dynBaseExpr := fmt.Sprintf("dynamicElement(%s, 'Array(Dynamic)')", currentNode.FieldPath())
arrayExpr = fmt.Sprintf("arrayMap(x->assumeNotNull(dynamicElement(x, 'JSON')), arrayFilter(x->(dynamicType(x) = 'JSON'), %s))", dynBaseExpr)
}
// If this is the terminal level, return the simple arrayMap
if nextNode.IsTerminal {
dynamicElementExpr := fmt.Sprintf("dynamicElement(%s, '%s')", nextNode.FieldPath(),
nextNode.TerminalConfig.ElemType.StringValue(),
)
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), dynamicElementExpr, arrayExpr), nil
}
// For non-terminal nodes, we need to handle ALL possible branches at the next level
var nestedExpressions []string
hasJSON := nextNode.Branches[telemetrytypes.BranchJSON] != nil
hasDynamic := nextNode.Branches[telemetrytypes.BranchDynamic] != nil
if hasJSON {
jsonNested, err := c.buildArrayMap(nextNode, telemetrytypes.BranchJSON)
if err != nil {
return "", err
}
nestedExpressions = append(nestedExpressions, jsonNested)
}
if hasDynamic {
dynamicNested, err := c.buildArrayMap(nextNode, telemetrytypes.BranchDynamic)
if err != nil {
return "", err
}
nestedExpressions = append(nestedExpressions, dynamicNested)
}
// If we have multiple nested expressions, we need to concat them
var nestedExpr string
if len(nestedExpressions) == 1 {
nestedExpr = nestedExpressions[0]
} else if len(nestedExpressions) > 1 {
// This shouldn't happen in our current tree structure, but handle it just in case
nestedExpr = fmt.Sprintf("arrayConcat(%s)", strings.Join(nestedExpressions, ", "))
} else {
return "", errors.Newf(errors.TypeInternal, CodeNestedExpressionsEmpty, "nested expressions are empty while building arrayMap")
}
return fmt.Sprintf("arrayMap(%s->%s, %s)", currentNode.Alias(), nestedExpr, arrayExpr), nil
}
func assumeNotNull(column string, elemType telemetrytypes.JSONDataType) string {
return fmt.Sprintf("assumeNotNull(dynamicElement(%s, '%s'))", column, elemType.StringValue())
}

File diff suppressed because one or more lines are too long

View File

@@ -84,7 +84,6 @@ func getBodyJSONPath(key *telemetrytypes.TelemetryFieldKey) string {
}
func GetBodyJSONKey(_ context.Context, key *telemetrytypes.TelemetryFieldKey, operator qbtypes.FilterOperator, value any) (string, any) {
dataType, value := inferDataType(value, operator, key)
// for array types, we need to extract the value from the JSON_QUERY

View File

@@ -23,7 +23,6 @@ type logQueryStatementBuilder struct {
aggExprRewriter qbtypes.AggExprRewriter
fullTextColumn *telemetrytypes.TelemetryFieldKey
jsonBodyPrefix string
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
}
@@ -37,7 +36,6 @@ func NewLogQueryStatementBuilder(
resourceFilterStmtBuilder qbtypes.StatementBuilder[qbtypes.LogAggregation],
aggExprRewriter qbtypes.AggExprRewriter,
fullTextColumn *telemetrytypes.TelemetryFieldKey,
jsonBodyPrefix string,
jsonKeyToKey qbtypes.JsonKeyToFieldFunc,
) *logQueryStatementBuilder {
logsSettings := factory.NewScopedProviderSettings(settings, "github.com/SigNoz/signoz/pkg/telemetrylogs")
@@ -50,7 +48,6 @@ func NewLogQueryStatementBuilder(
resourceFilterStmtBuilder: resourceFilterStmtBuilder,
aggExprRewriter: aggExprRewriter,
fullTextColumn: fullTextColumn,
jsonBodyPrefix: jsonBodyPrefix,
jsonKeyToKey: jsonKeyToKey,
}
}
@@ -171,6 +168,29 @@ func (b *logQueryStatementBuilder) adjustKeys(ctx context.Context, keys map[stri
overallMatch = overallMatch || findMatch(IntrinsicFields)
}
if strings.HasPrefix(k.Name, telemetrytypes.BodyJSONStringSearchPrefix) {
k.Name = strings.TrimPrefix(k.Name, telemetrytypes.BodyJSONStringSearchPrefix)
fieldKeys, found := keys[k.Name]
if found && len(fieldKeys) > 0 {
k.FieldContext = fieldKeys[0].FieldContext
k.FieldDataType = fieldKeys[0].FieldDataType
// only attach the JSON data type if there is only one key for the field so incase there are multiple keys.
// it's handled by the fallback expr logic
if len(fieldKeys) == 1 {
k.JSONDataType = fieldKeys[0].JSONDataType
k.Materialized = fieldKeys[0].Materialized
}
k.Indexes = fieldKeys[0].Indexes
overallMatch = true // because we found a match
} else {
b.logger.InfoContext(ctx, "overriding the field context and data type", "key", k.Name)
k.FieldContext = telemetrytypes.FieldContextBody
k.FieldDataType = telemetrytypes.FieldDataTypeString
k.JSONDataType = &telemetrytypes.String
}
}
if !overallMatch {
// check if all the key for the given field have been materialized, if so
// set the key to materialized
@@ -234,6 +254,10 @@ func (b *logQueryStatementBuilder) buildListQuery(
sb.SelectMore(LogsV2ScopeNameColumn)
sb.SelectMore(LogsV2ScopeVersionColumn)
sb.SelectMore(LogsV2BodyColumn)
if querybuilder.BodyJSONQueryEnabled {
sb.SelectMore(LogsV2BodyJSONColumn)
sb.SelectMore(LogsV2BodyPromotedColumn)
}
sb.SelectMore(LogsV2AttributesStringColumn)
sb.SelectMore(LogsV2AttributesNumberColumn)
sb.SelectMore(LogsV2AttributesBoolColumn)
@@ -246,6 +270,7 @@ func (b *logQueryStatementBuilder) buildListQuery(
if query.SelectFields[index].Name == LogsV2TimestampColumn || query.SelectFields[index].Name == LogsV2IDColumn {
continue
}
// get column expression for the field - use array index directly to avoid pointer to loop variable
colExpr, err := b.fm.ColumnExpressionFor(ctx, &query.SelectFields[index], keys)
if err != nil {
@@ -255,7 +280,6 @@ func (b *logQueryStatementBuilder) buildListQuery(
}
}
// From table
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
// Add filter conditions
@@ -333,10 +357,11 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
// Keep original column expressions so we can build the tuple
fieldNames := make([]string, 0, len(query.GroupBy))
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonBodyPrefix, b.jsonKeyToKey)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
if err != nil {
return nil, err
}
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name)
allGroupByArgs = append(allGroupByArgs, args...)
sb.SelectMore(colExpr)
@@ -358,7 +383,9 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
sb.SelectMore(fmt.Sprintf("%s AS __result_%d", rewritten, i))
}
// Add FROM clause
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
preparedWhereClause, err := b.addFilterCondition(ctx, sb, start, end, query, keys, variables)
if err != nil {
@@ -404,7 +431,6 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
}
combinedArgs := append(allGroupByArgs, allAggChArgs...)
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
// Stitch it all together: WITH … SELECT …
@@ -431,7 +457,6 @@ func (b *logQueryStatementBuilder) buildTimeSeriesQuery(
}
combinedArgs := append(allGroupByArgs, allAggChArgs...)
mainSQL, mainArgs := sb.BuildWithFlavor(sqlbuilder.ClickHouse, combinedArgs...)
// Stitch it all together: WITH … SELECT …
@@ -479,10 +504,11 @@ func (b *logQueryStatementBuilder) buildScalarQuery(
var allGroupByArgs []any
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonBodyPrefix, b.jsonKeyToKey)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, b.jsonKeyToKey)
if err != nil {
return nil, err
}
colExpr := fmt.Sprintf("toString(%s) AS `%s`", expr, gb.TelemetryFieldKey.Name)
allGroupByArgs = append(allGroupByArgs, args...)
sb.SelectMore(colExpr)
@@ -508,7 +534,6 @@ func (b *logQueryStatementBuilder) buildScalarQuery(
}
}
// From table
sb.From(fmt.Sprintf("%s.%s", DBName, LogsV2TableName))
// Add filter conditions
@@ -654,7 +679,6 @@ func (b *logQueryStatementBuilder) buildResourceFilterCTE(
start, end uint64,
variables map[string]qbtypes.VariableItem,
) (*qbtypes.Statement, error) {
return b.resourceFilterStmtBuilder.Build(
ctx,
start,

View File

@@ -32,7 +32,6 @@ func resourceFilterStmtBuilder() qbtypes.StatementBuilder[qbtypes.LogAggregation
cb,
mockMetadataStore,
DefaultFullTextColumn,
BodyJSONStringSearchPrefix,
GetBodyJSONKey,
)
}
@@ -197,11 +196,11 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
cb := NewConditionBuilder(fm, mockMetadataStore)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
@@ -213,7 +212,6 @@ func TestStatementBuilderTimeSeries(t *testing.T) {
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
BodyJSONStringSearchPrefix,
GetBodyJSONKey,
)
@@ -318,11 +316,11 @@ func TestStatementBuilderListQuery(t *testing.T) {
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
cb := NewConditionBuilder(fm, mockMetadataStore)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
@@ -334,7 +332,6 @@ func TestStatementBuilderListQuery(t *testing.T) {
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
BodyJSONStringSearchPrefix,
GetBodyJSONKey,
)
@@ -427,11 +424,11 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
cb := NewConditionBuilder(fm, mockMetadataStore)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
@@ -443,10 +440,11 @@ func TestStatementBuilderListQueryResourceTests(t *testing.T) {
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
BodyJSONStringSearchPrefix,
GetBodyJSONKey,
)
//
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
@@ -491,7 +489,8 @@ func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) {
GroupBy: []qbtypes.GroupByKey{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
Name: "body.status",
Name: "status",
FieldContext: telemetrytypes.FieldContextBody,
},
},
},
@@ -501,11 +500,11 @@ func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) {
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
cb := NewConditionBuilder(fm, mockMetadataStore)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
@@ -517,7 +516,6 @@ func TestStatementBuilderTimeSeriesBodyGroupBy(t *testing.T) {
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
BodyJSONStringSearchPrefix,
GetBodyJSONKey,
)
@@ -597,11 +595,11 @@ func TestStatementBuilderListQueryServiceCollision(t *testing.T) {
}
fm := NewFieldMapper()
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMapCollision()
cb := NewConditionBuilder(fm, mockMetadataStore)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
@@ -613,7 +611,6 @@ func TestStatementBuilderListQueryServiceCollision(t *testing.T) {
resourceFilterStmtBuilder,
aggExprRewriter,
DefaultFullTextColumn,
BodyJSONStringSearchPrefix,
GetBodyJSONKey,
)

View File

@@ -47,17 +47,10 @@ var (
//
// searchOperator: LIKE for pattern matching, EQUAL for exact match
// Returns: (paths, error)
// TODO(Piyush): Remove this lint skip
//
// nolint:unused
func (t *telemetryMetaStore) getBodyJSONPaths(ctx context.Context,
fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
query, args, limit, err := buildGetBodyJSONPathsQuery(fieldKeySelectors)
if err != nil {
return nil, false, err
}
query, args, limit := buildGetBodyJSONPathsQuery(fieldKeySelectors)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
if err != nil {
return nil, false, errors.WrapInternalf(err, CodeFailExtractBodyJSONKeys, "failed to extract body JSON keys")
@@ -80,7 +73,8 @@ func (t *telemetryMetaStore) getBodyJSONPaths(ctx context.Context,
for _, typ := range typesArray {
mapping, found := telemetrytypes.MappingStringToJSONDataType[typ]
if !found {
return nil, false, errors.NewInternalf(CodeUnknownJSONDataType, "failed to map type string to JSON data type: %s", typ)
t.logger.ErrorContext(ctx, "failed to map type string to JSON data type", "type", typ, "path", path)
continue
}
fieldKeys = append(fieldKeys, &telemetrytypes.TelemetryFieldKey{
Name: path,
@@ -116,9 +110,9 @@ func (t *telemetryMetaStore) getBodyJSONPaths(ctx context.Context,
return fieldKeys, rowCount <= limit, nil
}
func buildGetBodyJSONPathsQuery(fieldKeySelectors []*telemetrytypes.FieldKeySelector) (string, []any, int, error) {
func buildGetBodyJSONPathsQuery(fieldKeySelectors []*telemetrytypes.FieldKeySelector) (string, []any, int) {
if len(fieldKeySelectors) == 0 {
return "", nil, defaultPathLimit, errors.NewInternalf(CodeFailBuildJSONPathsQuery, "no field key selectors provided")
return "", nil, defaultPathLimit
}
from := fmt.Sprintf("%s.%s", DBName, PathTypesTableName)
@@ -135,14 +129,14 @@ func buildGetBodyJSONPathsQuery(fieldKeySelectors []*telemetrytypes.FieldKeySele
orClauses := []string{}
for _, fieldKeySelector := range fieldKeySelectors {
// replace [*] with []
fieldKeySelector.Name = strings.ReplaceAll(fieldKeySelector.Name, telemetrylogs.ArrayAnyIndex, telemetrylogs.ArraySep)
fieldKeySelector.Name = strings.ReplaceAll(fieldKeySelector.Name, telemetrytypes.ArrayAnyIndex, telemetrytypes.ArraySep)
// Extract search text for body JSON keys
keyName := CleanPathPrefixes(fieldKeySelector.Name)
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
orClauses = append(orClauses, sb.Equal("path", keyName))
} else {
// Pattern matching for metadata API (defaults to LIKE behavior for other operators)
orClauses = append(orClauses, sb.Like("path", querybuilder.FormatValueForContains(keyName)))
orClauses = append(orClauses, sb.ILike("path", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(keyName))))
}
limit += fieldKeySelector.Limit
}
@@ -159,22 +153,20 @@ func buildGetBodyJSONPathsQuery(fieldKeySelectors []*telemetrytypes.FieldKeySele
sb.Limit(limit)
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
return query, args, limit, nil
return query, args, limit
}
// TODO(Piyush): Remove this lint skip
//
// nolint:unused
func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...string) (map[string][]telemetrytypes.JSONDataTypeIndex, error) {
filteredPaths := []string{}
for _, path := range paths {
if strings.Contains(path, telemetrylogs.ArraySep) || strings.Contains(path, telemetrylogs.ArrayAnyIndex) {
// skip array paths; since they don't have any indexes
if strings.Contains(path, telemetrytypes.ArraySep) || strings.Contains(path, telemetrytypes.ArrayAnyIndex) {
continue
}
filteredPaths = append(filteredPaths, path)
}
if len(filteredPaths) == 0 {
return nil, errors.NewInternalf(CodeNoPathsToQueryIndexes, "no paths to query indexes provided")
return make(map[string][]telemetrytypes.JSONDataTypeIndex), nil
}
// list indexes for the paths
@@ -194,7 +186,8 @@ func (t *telemetryMetaStore) getJSONPathIndexes(ctx context.Context, paths ...st
jsonDataType, found := telemetrytypes.MappingStringToJSONDataType[columnType]
if !found {
return nil, errors.NewInternalf(CodeUnknownJSONDataType, "failed to map column type to JSON data type: %s", columnType)
t.logger.ErrorContext(ctx, "failed to map column type to JSON data type", "column_type", columnType, "column_expr", columnExpr)
continue
}
if jsonDataType == telemetrytypes.String {
@@ -296,7 +289,7 @@ func (t *telemetryMetaStore) ListPromotedPaths(ctx context.Context, paths ...str
func (t *telemetryMetaStore) ListJSONValues(ctx context.Context, path string, limit int) (*telemetrytypes.TelemetryFieldValues, bool, error) {
path = CleanPathPrefixes(path)
if strings.Contains(path, telemetrylogs.ArraySep) || strings.Contains(path, telemetrylogs.ArrayAnyIndex) {
if strings.Contains(path, telemetrytypes.ArraySep) || strings.Contains(path, telemetrytypes.ArrayAnyIndex) {
return nil, false, errors.NewInvalidInputf(errors.CodeInvalidInput, "array paths are not supported")
}
@@ -456,7 +449,7 @@ func derefValue(v any) any {
// IsPathPromoted checks if a specific path is promoted
func (t *telemetryMetaStore) IsPathPromoted(ctx context.Context, path string) (bool, error) {
split := strings.Split(path, telemetrylogs.ArraySep)
split := strings.Split(path, telemetrytypes.ArraySep)
query := fmt.Sprintf("SELECT 1 FROM %s.%s WHERE path = ? LIMIT 1", DBName, PromotedPathsTableName)
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, split[0])
if err != nil {

View File

@@ -40,8 +40,8 @@ func TestBuildGetBodyJSONPathsQuery(t *testing.T) {
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeFuzzy,
},
},
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (path LIKE ?) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
expectedArgs: []any{"user", 100},
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (LOWER(path) LIKE LOWER(?)) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
expectedArgs: []any{"%user%", 100},
expectedLimit: 100,
},
{
@@ -72,8 +72,8 @@ func TestBuildGetBodyJSONPathsQuery(t *testing.T) {
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeFuzzy,
},
},
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (path LIKE ? OR path LIKE ?) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
expectedArgs: []any{"user", "admin", defaultPathLimit},
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (LOWER(path) LIKE LOWER(?) OR LOWER(path) LIKE LOWER(?)) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
expectedArgs: []any{"%user%", "%admin%", defaultPathLimit},
expectedLimit: defaultPathLimit,
},
{
@@ -84,17 +84,15 @@ func TestBuildGetBodyJSONPathsQuery(t *testing.T) {
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeFuzzy,
},
},
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (path LIKE ?) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
expectedArgs: []any{"test", defaultPathLimit},
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (LOWER(path) LIKE LOWER(?)) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
expectedArgs: []any{"%test%", defaultPathLimit},
expectedLimit: defaultPathLimit,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
query, args, limit, err := buildGetBodyJSONPathsQuery(tc.fieldKeySelectors)
require.NoError(t, err, "Error building query: %v", err)
query, args, limit := buildGetBodyJSONPathsQuery(tc.fieldKeySelectors)
require.Equal(t, tc.expectedSQL, query)
require.Equal(t, tc.expectedArgs, args)
require.Equal(t, tc.expectedLimit, limit)

View File

@@ -573,6 +573,14 @@ func (t *telemetryMetaStore) getLogsKeys(ctx context.Context, fieldKeySelectors
}
}
if querybuilder.BodyJSONQueryEnabled {
bodyJSONPaths, finished, err := t.getBodyJSONPaths(ctx, fieldKeySelectors) // LIKE for pattern matching
if err != nil {
t.logger.ErrorContext(ctx, "failed to extract body JSON paths", "error", err)
}
keys = append(keys, bodyJSONPaths...)
complete = complete && finished
}
return keys, complete, nil
}

View File

@@ -32,6 +32,8 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
options.MaxIdleConns = config.Connection.MaxIdleConns
options.MaxOpenConns = config.Connection.MaxOpenConns
options.DialTimeout = config.Connection.DialTimeout
// This is to avoid the driver decoding issues with JSON columns
options.Settings["output_format_native_write_json_as_string"] = 1
chConn, err := clickhouse.Open(options)
if err != nil {

View File

@@ -495,7 +495,7 @@ func (b *traceQueryStatementBuilder) buildTimeSeriesQuery(
// Keep original column expressions so we can build the tuple
fieldNames := make([]string, 0, len(query.GroupBy))
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, "", nil)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, nil)
if err != nil {
return nil, err
}
@@ -637,7 +637,7 @@ func (b *traceQueryStatementBuilder) buildScalarQuery(
var allGroupByArgs []any
for _, gb := range query.GroupBy {
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, "", nil)
expr, args, err := querybuilder.CollisionHandledFinalExpr(ctx, &gb.TelemetryFieldKey, b.fm, b.cb, keys, telemetrytypes.FieldDataTypeString, nil)
if err != nil {
return nil, err
}
@@ -746,7 +746,7 @@ func (b *traceQueryStatementBuilder) addFilterCondition(
FieldKeys: keys,
SkipResourceFilter: true,
Variables: variables,
}, start, end)
}, start, end)
if err != nil {
return nil, err

View File

@@ -357,7 +357,7 @@ func TestStatementBuilder(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
@@ -525,7 +525,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
@@ -681,7 +681,7 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()

View File

@@ -237,7 +237,7 @@ func (b *traceOperatorCTEBuilder) buildQueryCTE(ctx context.Context, queryName s
ConditionBuilder: b.stmtBuilder.cb,
FieldKeys: keys,
SkipResourceFilter: true,
}, b.start, b.end,
}, b.start, b.end,
)
if err != nil {
b.stmtBuilder.logger.ErrorContext(ctx, "Failed to prepare where clause", "error", err, "filter", query.Filter.Expression)
@@ -552,7 +552,6 @@ func (b *traceOperatorCTEBuilder) buildTimeSeriesQuery(ctx context.Context, sele
b.stmtBuilder.cb,
keys,
telemetrytypes.FieldDataTypeString,
"",
nil,
)
if err != nil {
@@ -662,7 +661,6 @@ func (b *traceOperatorCTEBuilder) buildTraceQuery(ctx context.Context, selectFro
b.stmtBuilder.cb,
keys,
telemetrytypes.FieldDataTypeString,
"",
nil,
)
if err != nil {
@@ -802,7 +800,6 @@ func (b *traceOperatorCTEBuilder) buildScalarQuery(ctx context.Context, selectFr
b.stmtBuilder.cb,
keys,
telemetrytypes.FieldDataTypeString,
"",
nil,
)
if err != nil {

View File

@@ -390,7 +390,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
traceStmtBuilder := NewTraceQueryStatementBuilder(
@@ -506,7 +506,7 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
cb := NewConditionBuilder(fm)
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
resourceFilterStmtBuilder := resourceFilterStmtBuilder()
traceStmtBuilder := NewTraceQueryStatementBuilder(

View File

@@ -44,7 +44,7 @@ func TestTraceTimeRangeOptimization(t *testing.T) {
mockMetadataStore,
)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, "", nil)
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil)
statementBuilder := NewTraceQueryStatementBuilder(
instrumentationtest.New().ToProviderSettings(),

View File

@@ -21,9 +21,9 @@ func NewStateStore() *StateStore {
}
}
func (s *StateStore) Set(ctx context.Context, orgID string, storeableState *alertmanagertypes.StoreableState) error {
func (s *StateStore) Set(ctx context.Context, storeableState *alertmanagertypes.StoreableState) error {
s.mtx.Lock()
s.states[orgID] = storeableState
s.states[storeableState.OrgID] = storeableState
s.mtx.Unlock()
return nil
}

View File

@@ -16,6 +16,7 @@ import (
var (
ErrCodeAlertmanagerChannelNotFound = errors.MustNewCode("alertmanager_channel_not_found")
ErrCodeAlertmanagerChannelNameMismatch = errors.MustNewCode("alertmanager_channel_name_mismatch")
ErrCodeAlertmanagerChannelInvalid = errors.MustNewCode("alertmanager_channel_invalid")
)
var (
@@ -41,9 +42,9 @@ type Channel struct {
// NewChannelFromReceiver creates a new Channel from a Receiver.
// It can return nil if the receiver is the default receiver.
func NewChannelFromReceiver(receiver config.Receiver, orgID string) *Channel {
func NewChannelFromReceiver(receiver config.Receiver, orgID string) (*Channel, error) {
if receiver.Name == DefaultReceiverName {
return nil
return nil, errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelInvalid, "cannot use %s name as a channel name", receiver.Name)
}
// Initialize channel with common fields
@@ -98,7 +99,12 @@ func NewChannelFromReceiver(receiver config.Receiver, orgID string) *Channel {
break
}
return &channel
// If we were unable to find the channel type, return an error
if channel.Type == "" {
return nil, errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelInvalid, "channel '%s' must have at least one notification configuration (e.g., email_configs, webhook_configs, slack_configs)", receiver.Name)
}
return &channel, nil
}
func NewConfigFromChannels(globalConfig GlobalConfig, routeConfig RouteConfig, channels Channels, orgID string) (*Config, error) {
@@ -163,9 +169,9 @@ func NewStatsFromChannels(channels Channels) map[string]any {
}
func (c *Channel) Update(receiver Receiver) error {
channel := NewChannelFromReceiver(receiver, c.OrgID)
if channel == nil {
return errors.Newf(errors.TypeInvalidInput, ErrCodeAlertmanagerChannelNotFound, "cannot find channel with id %s", c.ID.StringValue())
channel, err := NewChannelFromReceiver(receiver, c.OrgID)
if err != nil {
return err
}
if c.Name != channel.Name {

View File

@@ -2,6 +2,7 @@ package alertmanagertypes
import (
"encoding/json"
"net/url"
"testing"
"time"
@@ -228,3 +229,66 @@ func TestNewConfigFromChannels(t *testing.T) {
})
}
}
func TestNewChannelFromReceiver(t *testing.T) {
testCases := []struct {
name string
receiver config.Receiver
expected *Channel
pass bool
}{
{
name: "InvalidReceiver_OnlyName",
receiver: config.Receiver{
Name: "test-receiver",
},
expected: nil,
pass: false,
},
{
name: "InvalidReceiver_DefaultReceiver",
receiver: config.Receiver{
Name: DefaultReceiverName,
},
expected: nil,
pass: false,
},
{
name: "ValidReceiver_Slack",
receiver: config.Receiver{
Name: "test-receiver",
SlackConfigs: []*config.SlackConfig{
{
Channel: "#alerts",
APIURL: &config.SecretURL{URL: &url.URL{Scheme: "https", Host: "slack.com", Path: "/api/test"}},
NotifierConfig: config.NotifierConfig{
VSendResolved: true,
},
},
},
},
expected: &Channel{
Name: "test-receiver",
Type: "slack",
Data: `{"name":"test-receiver","slack_configs":[{"send_resolved":true,"api_url":"https://slack.com/api/test","channel":"#alerts"}]}`,
},
pass: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
channel, err := NewChannelFromReceiver(testCase.receiver, "1")
if !testCase.pass {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, testCase.expected.Name, channel.Name)
assert.Equal(t, testCase.expected.Type, channel.Type)
assert.Equal(t, testCase.expected.Data, channel.Data)
})
}
}

View File

@@ -104,7 +104,7 @@ type StateStore interface {
// The return type matches the return of `silence.Maintenance` or `nflog.Maintenance`.
// See https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/silence/silence.go#L217
// and https://github.com/prometheus/alertmanager/blob/3b06b97af4d146e141af92885a185891eb79a5b0/nflog/nflog.go#L94
Set(context.Context, string, *StoreableState) error
Set(context.Context, *StoreableState) error
// Gets the silence state or the notification log state as a string from the store. This is used as a snapshot to load the
// initial state of silences or notification log when starting the alertmanager.

View File

@@ -2,24 +2,44 @@ package alertmanagertypes
import (
"bytes"
"net/url"
tmplhtml "html/template"
tmpltext "text/template"
"github.com/SigNoz/signoz/pkg/errors"
alertmanagertemplate "github.com/prometheus/alertmanager/template"
)
// customTemplateOption returns an Option that adds custom functions to the template.
func customTemplateOption() alertmanagertemplate.Option {
return func(text *tmpltext.Template, html *tmplhtml.Template) {
funcs := tmpltext.FuncMap{
// urlescape escapes the string for use in a URL query parameter.
// It returns tmplhtml.HTML to prevent the template engine from escaping the already escaped string.
// url.QueryEscape escapes spaces as "+", and html/template escapes "+" as "&#43;" if tmplhtml.HTML is not used.
"urlescape": func(value string) tmplhtml.HTML {
return tmplhtml.HTML(url.QueryEscape(value))
},
}
text.Funcs(funcs)
html.Funcs(funcs)
}
}
// FromGlobs overrides the default alertmanager template to add a ruleIdPath template.
// This is used to generate a link to the rule in the alertmanager.
//
// It explicitly checks for a ruleId that is a number and then generates a path to the rule.
// It checks for a ruleId label and generates a path to the rule.
// If testAlert=true label is present, it adds isTestAlert=true query parameter to the URL.
func FromGlobs(paths []string) (*alertmanagertemplate.Template, error) {
t, err := alertmanagertemplate.FromGlobs(paths)
t, err := alertmanagertemplate.FromGlobs(paths, customTemplateOption())
if err != nil {
return nil, err
}
if err := t.Parse(bytes.NewReader([]byte(`
{{ define "__ruleIdPath" }}{{ range .CommonLabels.SortedPairs }}{{ if eq .Name "ruleId" }}{{ if match "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$" .Value }}/edit?ruleId={{ .Value | urlquery }}{{ end }}{{ end }}{{ end }}{{ end }}
{{ define "__ruleIdPath" }}{{- $isTestAlert := "" -}}{{- range .CommonLabels.SortedPairs -}}{{- if eq .Name "testalert" -}}{{- if eq .Value "true" -}}{{- $isTestAlert = "true" -}}{{- end -}}{{- end -}}{{- end -}}{{- range .CommonLabels.SortedPairs -}}{{- if eq .Name "ruleId" -}}{{- if ne .Value "" -}}/edit?ruleId={{ .Value | urlescape }}{{- if $isTestAlert -}}&isTestAlert=true{{- end -}}{{- end -}}{{- end -}}{{- end -}}{{- end }}
{{ define "__alertmanagerURL" }}{{ .ExternalURL }}/alerts{{ template "__ruleIdPath" . }}{{ end }}
{{ define "msteamsv2.default.titleLink" }}{{ template "__alertmanagerURL" . }}{{ end }}
`))); err != nil {

View File

@@ -51,21 +51,6 @@ func TestFromGlobs(t *testing.T) {
},
expected: "http://localhost:8080/alerts/edit?ruleId=2d8edca5-4f24-4266-afd1-28cefadcfa88",
},
{
name: "SingleAlertWithInvalidRuleId",
alerts: []*types.Alert{
{
Alert: model.Alert{
Labels: model.LabelSet{
"ruleId": "43textabc",
},
},
UpdatedAt: time.Now(),
Timeout: false,
},
},
expected: "http://localhost:8080/alerts",
},
{
name: "MultipleAlertsWithMismatchingRuleId",
alerts: []*types.Alert{
@@ -138,6 +123,68 @@ func TestFromGlobs(t *testing.T) {
},
expected: "http://localhost:8080/alerts",
},
{
name: "TestAlertWithNoRuleId",
alerts: []*types.Alert{
{
Alert: model.Alert{
Labels: model.LabelSet{
"testalert": "true",
},
},
UpdatedAt: time.Now(),
Timeout: false,
},
},
expected: "http://localhost:8080/alerts",
},
{
name: "TestAlertWithRuleId",
alerts: []*types.Alert{
{
Alert: model.Alert{
Labels: model.LabelSet{
"testalert": "true",
"ruleId": "01961575-461c-7668-875f-05d374062bfc",
},
},
UpdatedAt: time.Now(),
Timeout: false,
},
},
expected: "http://localhost:8080/alerts/edit?ruleId=01961575-461c-7668-875f-05d374062bfc&isTestAlert=true",
},
{
name: "TestAlertWithRuleIdWithSpacesAndSymbol",
alerts: []*types.Alert{
{
Alert: model.Alert{
Labels: model.LabelSet{
"testalert": "true",
"ruleId": "Prom + Alert & Rule",
},
},
UpdatedAt: time.Now(),
Timeout: false,
},
},
expected: "http://localhost:8080/alerts/edit?ruleId=Prom+%2B+Alert+%26+Rule&isTestAlert=true",
},
{
name: "AlertWithBlankRuleId",
alerts: []*types.Alert{
{
Alert: model.Alert{
Labels: model.LabelSet{
"ruleId": "",
},
},
UpdatedAt: time.Now(),
Timeout: false,
},
},
expected: "http://localhost:8080/alerts",
},
}
for _, tc := range testCases {

View File

@@ -6,7 +6,6 @@ import (
"github.com/SigNoz/signoz-otel-collector/constants"
"github.com/SigNoz/signoz-otel-collector/pkg/keycheck"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/telemetrylogs"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
@@ -33,7 +32,7 @@ func (i *PromotePath) ValidateAndSetDefaults() error {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "path cannot contain spaces")
}
if strings.Contains(i.Path, telemetrylogs.ArraySep) || strings.Contains(i.Path, telemetrylogs.ArrayAnyIndex) {
if strings.Contains(i.Path, telemetrytypes.ArraySep) || strings.Contains(i.Path, telemetrytypes.ArrayAnyIndex) {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "array paths can not be promoted or indexed")
}
@@ -41,12 +40,12 @@ func (i *PromotePath) ValidateAndSetDefaults() error {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "`%s`, `%s` don't add these prefixes to the path", constants.BodyJSONColumnPrefix, constants.BodyPromotedColumnPrefix)
}
if !strings.HasPrefix(i.Path, telemetrylogs.BodyJSONStringSearchPrefix) {
if !strings.HasPrefix(i.Path, telemetrytypes.BodyJSONStringSearchPrefix) {
return errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "path must start with `body.`")
}
// remove the "body." prefix from the path
i.Path = strings.TrimPrefix(i.Path, telemetrylogs.BodyJSONStringSearchPrefix)
i.Path = strings.TrimPrefix(i.Path, telemetrytypes.BodyJSONStringSearchPrefix)
isCardinal := keycheck.IsCardinal(i.Path)
if isCardinal {

View File

@@ -145,6 +145,20 @@ func (f FilterOperator) IsComparisonOperator() bool {
return false
}
func (f FilterOperator) IsStringSearchOperator() bool {
switch f {
case FilterOperatorContains,
FilterOperatorNotContains,
FilterOperatorILike,
FilterOperatorNotILike,
FilterOperatorLike,
FilterOperatorNotLike:
return true
default:
return false
}
}
type OrderDirection struct {
valuer.String
}

View File

@@ -23,9 +23,10 @@ type Result struct {
}
type ExecStats struct {
RowsScanned uint64 `json:"rowsScanned"`
BytesScanned uint64 `json:"bytesScanned"`
DurationMS uint64 `json:"durationMs"`
RowsScanned uint64 `json:"rowsScanned"`
BytesScanned uint64 `json:"bytesScanned"`
DurationMS uint64 `json:"durationMs"`
StepIntervals map[string]uint64 `json:"stepIntervals,omitempty"`
}
type TimeRange struct{ From, To uint64 } // ms since epoch

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -17,9 +18,13 @@ var (
FieldSelectorMatchTypeFuzzy = FieldSelectorMatchType{valuer.NewString("fuzzy")}
)
// BodyJSONStringSearchPrefix is the prefix used for body JSON search queries
// e.g., "body.status" where "body." is the prefix
const BodyJSONStringSearchPrefix = `body.`
const (
// BodyJSONStringSearchPrefix is the prefix used for body JSON search queries
// e.g., "body.status" where "body." is the prefix
BodyJSONStringSearchPrefix = "body."
ArraySep = jsontypeexporter.ArraySeparator
ArrayAnyIndex = "[*]."
)
type TelemetryFieldKey struct {
Name string `json:"name"`
@@ -29,7 +34,7 @@ type TelemetryFieldKey struct {
FieldContext FieldContext `json:"fieldContext,omitempty"`
FieldDataType FieldDataType `json:"fieldDataType,omitempty"`
JSONDataType *JSONDataType `json:"-,omitempty"`
JSONDataType *JSONDataType `json:"-"`
Indexes []JSONDataTypeIndex `json:"-"`
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
}