Compare commits

..

6 Commits

Author SHA1 Message Date
srikanthccv
f05c574606 chore: add initial version of query range design principles doc 2026-02-25 11:23:20 +05:30
Nageshbansal
4e4c9ce5af chore: enable metadataexporter in docker (#10409)
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
2026-02-25 03:13:27 +05:30
Srikanth Chekuri
7605775a38 chore: remove support for non v5 version in rules (#10406) 2026-02-24 23:16:21 +05:30
Vinicius Lourenço
cb1a2a8a13 perf(bundle-size): lazy load pages to reduce main bundle size (#10230)
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
2026-02-24 10:41:40 +00:00
Nikhil Soni
1a5d37b25a fix: add missing filtering for ip address for scalar data (#10264)
* fix: add missing filtering for ip address for scalar data

In domain listing api for external api monitoring,
we have option to filter out the IP address but
it only handles timeseries and raw type data while
domain list handler returns scalar data.

* fix: switch to new derived attributes for ip filtering

---------

Co-authored-by: Nityananda Gohain <nityanandagohain@gmail.com>
2026-02-24 10:26:10 +00:00
Piyush Singariya
bc4273f2f8 chore: test clickhouse version 25.12.5 (#10402) 2026-02-24 14:55:51 +05:30
25 changed files with 647 additions and 545 deletions

View File

@@ -54,7 +54,7 @@ jobs:
- sqlite
clickhouse-version:
- 25.5.6
- 25.10.5
- 25.12.5
schema-migrator-version:
- v0.142.0
postgres-version:

View File

@@ -82,6 +82,12 @@ exporters:
timeout: 45s
sending_queue:
enabled: false
metadataexporter:
cache:
provider: in_memory
dsn: tcp://clickhouse:9000/signoz_metadata
enabled: true
timeout: 45s
service:
telemetry:
logs:
@@ -93,19 +99,19 @@ service:
traces:
receivers: [otlp]
processors: [signozspanmetrics/delta, batch]
exporters: [clickhousetraces, signozmeter]
exporters: [clickhousetraces, metadataexporter, signozmeter]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [signozclickhousemetrics, signozmeter]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
metrics/prometheus:
receivers: [prometheus]
processors: [batch]
exporters: [signozclickhousemetrics, signozmeter]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
logs:
receivers: [otlp]
processors: [batch]
exporters: [clickhouselogsexporter, signozmeter]
exporters: [clickhouselogsexporter, metadataexporter, signozmeter]
metrics/meter:
receivers: [signozmeter]
processors: [batch/meter]

View File

@@ -82,6 +82,12 @@ exporters:
timeout: 45s
sending_queue:
enabled: false
metadataexporter:
cache:
provider: in_memory
dsn: tcp://clickhouse:9000/signoz_metadata
enabled: true
timeout: 45s
service:
telemetry:
logs:
@@ -93,19 +99,19 @@ service:
traces:
receivers: [otlp]
processors: [signozspanmetrics/delta, batch]
exporters: [clickhousetraces, signozmeter]
exporters: [clickhousetraces, metadataexporter, signozmeter]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [signozclickhousemetrics, signozmeter]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
metrics/prometheus:
receivers: [prometheus]
processors: [batch]
exporters: [signozclickhousemetrics, signozmeter]
exporters: [signozclickhousemetrics, metadataexporter, signozmeter]
logs:
receivers: [otlp]
processors: [batch]
exporters: [clickhouselogsexporter, signozmeter]
exporters: [clickhouselogsexporter, metadataexporter, signozmeter]
metrics/meter:
receivers: [signozmeter]
processors: [batch/meter]

View File

@@ -0,0 +1,216 @@
# Query Range v5 — Design Principles & Architectural Contracts
## Purpose of This Document
This document defines the design principles, invariants, and architectural contracts of the Query Range v5 system. It is intended for the authors working on the querier and querier related parts codebase. Any change to the system must align with the principles described here. If a change would violate a principle, it must be flagged and discussed.
---
## Core Architectural Principle
**The user speaks OpenTelemetry. The storage speaks ClickHouse. The system translates between them. These two worlds must never leak into each other.**
Every design choice in Query Range flows from this separation. The user-facing API surface deals exclusively in `TelemetryFieldKey`: a representation of fields as they exist in the OpenTelemetry data model. The storage layer deals in ClickHouse column expressions, table names, and SQL fragments. The translation between them is mediated by a small set of composable abstractions with strict boundaries.
---
## The Central Type: `TelemetryFieldKey`
`TelemetryFieldKey` is the atomic unit of the entire query system. Every filter, aggregation, group-by, order-by, and select operation is expressed in terms of field keys. Understanding its design contracts is non-negotiable.
### Identity
A field key is identified by three dimensions:
- **Name** — the field name as the user knows it (`service.name`, `http.method`, `trace_id`)
- **FieldContext** — where the field lives in the OTel model (`resource`, `attribute`, `span`, `log`, `body`, `scope`, `event`, `metric`)
- **FieldDataType** — the data type (`string`, `bool`, `number`/`float64`/`int64`, array variants)
### Invariant: Same name does not mean same field
`status` as an attribute, `status` as a body JSON key, and `status` as a span-level field are **three different fields**. The context disambiguates. Code that resolves or compares field keys must always consider all three dimensions, never just the name.
### Invariant: Normalization happens once, at the boundary
`TelemetryFieldKey.Normalize()` is called during JSON unmarshaling. After normalization, the text representation `resource.service.name:string` and the programmatic construction `{Name: "service.name", FieldContext: Resource, FieldDataType: String}` are identical.
**Consequence:** Downstream code must never re-parse or re-normalize field keys. If you find yourself splitting on `.` or `:` deep in the pipeline, something is wrong — the normalization should have already happened.
### Invariant: The text format is `context.name:datatype`
Parsing rules (implemented in `GetFieldKeyFromKeyText` and `Normalize`):
1. Data type is extracted from the right, after the last `:`.
2. Field context is extracted from the left, before the first `.`, if it matches a known context prefix.
3. Everything remaining is the name.
Special case: `log.body.X` normalizes to `{FieldContext: body, Name: X}` — the `log.body.` prefix collapses because body fields under log are a nested context.
### Invariant: Historical aliases must be preserved
The `fieldContexts` map includes aliases (`tag` -> `attribute`, `spanfield` -> `span`, `logfield` -> `log`). These exist because older database entries use these names. Removing or changing these aliases will break existing saved queries and dashboard configurations.
---
## The Abstraction Stack
The query pipeline is built from four interfaces that compose vertically. Each layer has a single responsibility. Each layer depends only on the layers below it. This layering is intentional and must be preserved.
```
StatementBuilder <- Orchestrates everything into executable SQL
├── AggExprRewriter <- Rewrites aggregation expressions (maps field refs to columns)
├── ConditionBuilder <- Builds WHERE predicates (field + operator + value -> SQL)
└── FieldMapper <- Maps TelemetryFieldKey -> ClickHouse column expression
```
### FieldMapper
**Contract:** Given a `TelemetryFieldKey`, return a ClickHouse column expression that yields the value for that field when used in a SELECT.
**Principle:** This is the *only* place where field-to-column translation happens. No other layer should contain knowledge of how fields map to storage. If you need a column expression, go through the FieldMapper.
**Why:** The user says `http.request.method`. ClickHouse might store it as `attributes_string['http.request.method']`, or as a materialized column `` `attribute_string_http$$request$$method` ``, or via a JSON access path in a body column. This variation is entirely contained within the FieldMapper. Everything above it is storage-agnostic.
### ConditionBuilder
**Contract:** Given a field key, an operator, and a value, produce a valid SQL predicate for a WHERE clause.
**Dependency:** Uses FieldMapper for the left-hand side of the condition.
**Principle:** The ConditionBuilder owns all the complexity of operator semantics, i.e type casting, array operators (`hasAny`/`hasAll` vs `=`), existence checks, and negative operator behavior. This complexity must not leak upward into the StatementBuilder.
### AggExprRewriter
**Contract:** Given a user-facing aggregation expression like `sum(duration_nano)`, resolve field references within it and produce valid ClickHouse SQL.
**Dependency:** Uses FieldMapper to resolve field names within expressions.
**Principle:** Aggregation expressions are user-authored strings that contain field references. The rewriter parses them, identifies field references, resolves each through the FieldMapper, and reassembles the expression.
### StatementBuilder
**Contract:** Given a complete `QueryBuilderQuery`, a time range, and a request type, produces an executable SQL statement.
**Dependency:** Uses all three abstractions above.
**Principle:** This is the composition layer. It does not contain field mapping logic, condition building logic, or expression rewriting logic. It orchestrates the other abstractions. If you find storage-specific logic creeping into the StatementBuilder, push it down into the appropriate abstraction.
### Invariant: No layer skipping
The StatementBuilder must not call FieldMapper directly to build conditions, it goes through the ConditionBuilder. The AggExprRewriter must not hardcode column names, it goes through the FieldMapper. Skipping layers creates hidden coupling and makes the system fragile to storage changes.
---
## Design Decisions as Constraints
### Constraint: Formula evaluation happens in Go, not in ClickHouse
Formulas (`A + B`, `A / B`, `sqrt(A*A + B*B)`) are evaluated application-side by `FormulaEvaluator`, not via ClickHouse JOINs.
**Why this is a constraint, not just an implementation choice:** The original JOIN-based approach was abandoned because ClickHouse evaluates joins right-to-left, serializing execution unnecessarily. Running queries independently allows parallelism and caching of intermediate results. Any future optimization must not reintroduce the JOIN pattern without solving the serialization problem.
**Consequence:** Individual query results must be independently cacheable. Formula evaluation must handle label matching, timestamp alignment, and missing values without requiring the queries to coordinate at the SQL level.
### Constraint: Zero-defaulting is aggregation-dependent
Only additive/counting aggregations (`count`, `count_distinct`, `sum`, `rate`) default missing values to zero. Statistical aggregations (`avg`, `min`, `max`, percentiles) must show gaps.
**Why:** Absence of data has different meanings. No error requests in a time bucket means error count = 0. No requests at all means average latency is *unknown*, not 0. Conflating these is a correctness bug, not a display preference.
**Enforcement:** `GetQueriesSupportingZeroDefault` determines which queries can default to zero. The `FormulaEvaluator` consumes this via `canDefaultZero`. Changes to aggregation handling must preserve this distinction.
### Constraint: Existence semantics differ for positive vs negative operators
- **Positive operators** (`=`, `>`, `LIKE`, `IN`, etc.) implicitly assert field existence. `http.method = GET` means "the field exists AND equals GET".
- **Negative operators** (`!=`, `NOT IN`, `NOT LIKE`, etc.) do **not** add an existence check. `http.method != GET` includes records where the field doesn't exist at all.
**Why:** The user's intent with negative operators is ambiguous. Rather than guess, we take the broader interpretation. Users can add an explicit `EXISTS` filter if they want the narrower one. This is documented in `AddDefaultExistsFilter`.
**Consequence:** Any new operator must declare its existence behavior in `AddDefaultExistsFilter`. Do not add operators without considering this.
### Constraint: Post-processing functions operate on result sets, not in SQL
Functions like `cutOffMin`, `ewma`, `median`, `timeShift`, `fillZero`, `runningDiff`, and `cumulativeSum` are applied in Go on the returned time series, not pushed into ClickHouse SQL.
**Why:** These are sequential time-series transformations that require complete, ordered result sets. Pushing them into SQL would complicate query generation, prevent caching of raw results, and make the functions harder to test. They are applied via `ApplyFunctions` after query execution.
**Consequence:** New time-series transformation functions should follow this pattern i.e implement them as Go functions on `*TimeSeries`, not as SQL modifications.
### Constraint: The API surface rejects unknown fields with suggestions
All request types use custom `UnmarshalJSON` that calls `DisallowUnknownFields`. Unknown fields trigger error messages with Levenshtein-based suggestions ("did you mean: 'groupBy'?").
**Why:** Silent acceptance of unknown fields causes subtle bugs. A misspelled `groupBy` results in ungrouped data with no indication of what went wrong. Failing fast with suggestions turns errors into actionable feedback.
**Consequence:** Any new request type or query spec struct must implement custom unmarshaling with `UnmarshalJSONWithContext`. Do not use default `json.Unmarshal` for user-facing types.
### Constraint: Validation is context-sensitive to request type
What's valid depends on the `RequestType`. For aggregation requests (`time_series`, `scalar`, `distribution`), fields like `groupBy`, `aggregations`, `having`, and aggregation-referenced `orderBy` are validated. For non-aggregation requests (`raw`, `raw_stream`, `trace`), these fields are ignored.
**Why:** A raw log query doesn't have aggregations, so requiring `aggregations` would be wrong. But a time-series query without aggregations is meaningless. The validation rules are request-type-aware to avoid both false positives and false negatives.
**Consequence:** When adding new fields to query specs, consider which request types they apply to and gate validation accordingly.
---
## The Composite Query Model
### Structure
A `QueryRangeRequest` contains a `CompositeQuery` which holds `[]QueryEnvelope`. Each envelope is a discriminated union: a `Type` field determines how `Spec` is decoded.
### Invariant: Query names are unique within a composite query
Builder queries must have unique names. Formulas reference queries by name (`A`, `B`, `A.0`, `A.my_alias`). Duplicate names would make formula evaluation ambiguous.
### Invariant: Multi-aggregation uses indexed or aliased references
A single builder query can have multiple aggregations. They are accessed in formulas via:
- Index: `A.0`, `A.1` (zero-based)
- Alias: `A.total`, `A.error_count`
The default (just `A`) resolves to index 0. This is the formula evaluation contract and must be preserved.
### Invariant: Type-specific decoding through signal detection
Builder queries are decoded by first peeking at the `signal` field in the raw JSON, then unmarshaling into the appropriate generic type (`QueryBuilderQuery[TraceAggregation]`, `QueryBuilderQuery[LogAggregation]`, `QueryBuilderQuery[MetricAggregation]`). This two-pass decoding is intentional — it allows each signal to have its own aggregation schema while sharing the query structure.
---
## The Metadata Layer
### MetadataStore
The `MetadataStore` interface provides runtime field discovery and type resolution. It answers questions like "what fields exist for this signal?" and "what are the data types of field X?".
### Principle: Fields can be ambiguous until resolved
The same name can map to multiple `TelemetryFieldKey` variants (different contexts, different types). The metadata store returns *all* variants. Resolution to a single field happens during query building, using the query's signal and any explicit context/type hints from the user.
**Consequence:** Code that calls `GetKey` or `GetKeys` must handle multiple results. Do not assume a name maps to a single field.
### Principle: Materialized fields are a performance optimization, not a semantic distinction
A materialized field and its non-materialized equivalent represent the same logical field. The `Materialized` flag tells the FieldMapper to generate a simpler column expression. The user should never need to know whether a field is materialized.
### Principle: JSON body fields require access plans
Fields inside JSON body columns (`body.response.errors[].code`) need pre-computed `JSONAccessPlan` trees that encode the traversal path, including branching at array boundaries between `Array(JSON)` and `Array(Dynamic)` representations. These plans are computed during metadata resolution, not during query execution.
---
## Summary of Inviolable Rules
1. **User-facing types never contain ClickHouse column names or SQL fragments.**
2. **Field-to-column translation only happens in FieldMapper.**
3. **Normalization happens once at the API boundary, never deeper.**
4. **Historical aliases in fieldContexts and fieldDataTypes must not be removed.**
5. **Formula evaluation stays in Go — do not push it into ClickHouse JOINs.**
6. **Zero-defaulting is aggregation-type-dependent — do not universally default to zero.**
7. **Positive operators imply existence, negative operators do not.**
8. **Post-processing functions operate on Go result sets, not in SQL.**
9. **All user-facing types reject unknown JSON fields with suggestions.**
10. **Validation rules are gated by request type.**
11. **Query names must be unique within a composite query.**
12. **The four-layer abstraction stack (FieldMapper -> ConditionBuilder -> AggExprRewriter -> StatementBuilder) must not be bypassed or flattened.**

View File

@@ -308,3 +308,15 @@ export const PublicDashboardPage = Loadable(
/* webpackChunkName: "Public Dashboard Page" */ 'pages/PublicDashboard'
),
);
export const AlertTypeSelectionPage = Loadable(
() =>
import(
/* webpackChunkName: "Alert Type Selection Page" */ 'pages/AlertTypeSelection'
),
);
export const MeterExplorerPage = Loadable(
() =>
import(/* webpackChunkName: "Meter Explorer Page" */ 'pages/MeterExplorer'),
);

View File

@@ -1,12 +1,10 @@
import { RouteProps } from 'react-router-dom';
import ROUTES from 'constants/routes';
import AlertTypeSelectionPage from 'pages/AlertTypeSelection';
import MessagingQueues from 'pages/MessagingQueues';
import MeterExplorer from 'pages/MeterExplorer';
import {
AlertHistory,
AlertOverview,
AlertTypeSelectionPage,
AllAlertChannels,
AllErrors,
ApiMonitoring,
@@ -29,6 +27,8 @@ import {
LogsExplorer,
LogsIndexToFields,
LogsSaveViews,
MessagingQueuesMainPage,
MeterExplorerPage,
MetricsExplorer,
OldLogsExplorer,
Onboarding,
@@ -399,28 +399,28 @@ const routes: AppRoutes[] = [
{
path: ROUTES.MESSAGING_QUEUES_KAFKA,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_KAFKA',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_CELERY_TASK,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_CELERY_TASK',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_OVERVIEW,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_OVERVIEW',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_KAFKA_DETAIL,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_KAFKA_DETAIL',
isPrivate: true,
},
@@ -463,21 +463,21 @@ const routes: AppRoutes[] = [
{
path: ROUTES.METER,
exact: true,
component: MeterExplorer,
component: MeterExplorerPage,
key: 'METER',
isPrivate: true,
},
{
path: ROUTES.METER_EXPLORER,
exact: true,
component: MeterExplorer,
component: MeterExplorerPage,
key: 'METER_EXPLORER',
isPrivate: true,
},
{
path: ROUTES.METER_EXPLORER_VIEWS,
exact: true,
component: MeterExplorer,
component: MeterExplorerPage,
key: 'METER_EXPLORER_VIEWS',
isPrivate: true,
},

View File

@@ -1,12 +0,0 @@
package signozapiserver
import (
"net/http"
"github.com/gorilla/mux"
)
func (provider *provider) addLogspipelineRoutes(router *mux.Router) error {
router.HandleFunc("/api/v2/pipelines", provider.logspipelineHandler.ListPipelines).Methods(http.MethodGet)
return nil
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
@@ -49,7 +48,6 @@ type provider struct {
authzHandler authz.Handler
zeusHandler zeus.Handler
querierHandler querier.Handler
logspipelineHandler logspipeline.Handler
}
func NewFactory(
@@ -71,7 +69,6 @@ func NewFactory(
authzHandler authz.Handler,
zeusHandler zeus.Handler,
querierHandler querier.Handler,
logspipelineHandler logspipeline.Handler,
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
return newProvider(
@@ -96,7 +93,6 @@ func NewFactory(
authzHandler,
zeusHandler,
querierHandler,
logspipelineHandler,
)
})
}
@@ -123,7 +119,6 @@ func newProvider(
authzHandler authz.Handler,
zeusHandler zeus.Handler,
querierHandler querier.Handler,
logspipelineHandler logspipeline.Handler,
) (apiserver.APIServer, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
router := mux.NewRouter().UseEncodedPath()
@@ -148,7 +143,6 @@ func newProvider(
authzHandler: authzHandler,
zeusHandler: zeusHandler,
querierHandler: querierHandler,
logspipelineHandler: logspipelineHandler,
}
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
@@ -229,14 +223,9 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
return err
}
if err := provider.addLogspipelineRoutes(router); err != nil {
return err
}
return nil
}
func newSecuritySchemes(role types.Role) []handler.OpenAPISecurityScheme {
return []handler.OpenAPISecurityScheme{
{Name: ctxtypes.AuthTypeAPIKey.StringValue(), Scopes: []string{role.String()}},

View File

@@ -1,91 +0,0 @@
package impllogspipeline
import (
"net/http"
"strconv"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
)
type handler struct {
module logspipeline.Module
}
func NewHandler(module logspipeline.Module) logspipeline.Handler {
return &handler{module: module}
}
func (h *handler) ListPipelines(w http.ResponseWriter, r *http.Request) {
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
orgID, errv2 := valuer.NewUUID(claims.OrgID)
if errv2 != nil {
render.Error(w, errv2)
return
}
version, err := ParseAgentConfigVersion(r)
if err != nil {
render.Error(w, err)
return
}
if version != -1 {
pipelines, err := h.module.ListPipelinesByVersion(r.Context(), orgID, version)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, pipelines)
return
}
pipelines, err := h.module.ListPipelines(r.Context(), orgID)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, pipelines)
}
func (h *handler) GetPipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) CreatePipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) UpdatePipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) DeletePipeline(w http.ResponseWriter, r *http.Request) {
}
func ParseAgentConfigVersion(r *http.Request) (int, error) {
versionString := mux.Vars(r)["version"]
if versionString == "latest" {
return -1, nil
}
version64, err := strconv.ParseInt(versionString, 0, 8)
if err != nil {
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
}
if version64 <= 0 {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
}
return int(version64), nil
}

View File

@@ -1,322 +0,0 @@
package impllogspipeline
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"go.uber.org/zap"
)
type module struct {
sqlstore sqlstore.SQLStore
}
func NewModule(sqlstore sqlstore.SQLStore) logspipeline.Module {
return &module{sqlstore: sqlstore}
}
func (m *module) ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error) {
latestVersion := -1
// get latest agent config
lastestConfig, err := agentConf.GetLatestVersion(ctx, orgID, opamptypes.ElementTypeLogPipelines)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
return nil, err
}
if lastestConfig != nil {
latestVersion = lastestConfig.Version
}
return m.ListPipelinesByVersion(ctx, orgID, latestVersion)
}
func (m *module) ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error) {
var stored []pipelinetypes.StoreablePipeline
err := m.sqlstore.BunDB().NewSelect().
Model(&stored).
Join("JOIN agent_config_element e ON p.id = e.element_id").
Join("JOIN agent_config_version v ON v.id = e.version_id").
Where("e.element_type = ?", opamptypes.ElementTypeLogPipelines.StringValue()).
Where("v.version = ?", version).
Where("v.org_id = ?", orgID.StringValue()).
Order("p.order_id ASC").
Scan(ctx)
if err != nil {
return nil, err
}
pipelines := make([]pipelinetypes.GettablePipeline, len(stored))
if len(stored) == 0 {
return pipelines, nil
}
for i := range stored {
pipelines[i].StoreablePipeline = stored[i]
if err := pipelines[i].ParseRawConfig(); err != nil {
return nil, err
}
if err := pipelines[i].ParseFilter(); err != nil {
return nil, err
}
}
return pipelines, nil
}
func (m *module) GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error) {
return nil, nil
}
func (m *module) CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
storeable, err := pipeline.ToStoreablePipeline()
if err != nil {
return nil, err
}
// regenerate the id and set other fields
storeable.Identifiable.ID = valuer.GenerateUUID()
storeable.OrgID = orgID.String()
storeable.TimeAuditable = types.TimeAuditable{
CreatedAt: time.Now(),
}
storeable.UserAuditable = types.UserAuditable{
CreatedBy: claims.Email,
}
_, err = m.sqlstore.BunDB().NewInsert().
Model(&storeable).
Exec(ctx)
if err != nil {
zap.L().Error("error in inserting pipeline data", zap.Error(err))
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to insert pipeline")
}
return &pipelinetypes.GettablePipeline{
StoreablePipeline: *storeable,
Filter: pipeline.Filter,
Config: pipeline.Config,
}, nil
}
func (m *module) UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
if err := pipeline.IsValid(); err != nil {
return nil, err
}
storeable, err := pipeline.ToStoreablePipeline()
if err != nil {
return nil, err
}
storeable.OrgID = orgID.String()
storeable.TimeAuditable = types.TimeAuditable{
UpdatedAt: time.Now(),
}
storeable.UserAuditable = types.UserAuditable{
UpdatedBy: claims.Email,
}
// get id from storeable pipeline
id := storeable.ID.StringValue()
// depending on the order_id update the rest of the table
// example 1: total available pipelines are 6, and order_id 5 is moved to 2, then we need to update the rest of the table
// old: 1, 2, 3, 4, 5, 6
// ^ |
// |_________|
// So pipelines starting from 2nd position till 4th position shift to right (or increase their order_id) by 1 position
// example 2: total available pipelines are 6, and order_id 2 is moved to 4, then we need to update the rest of the table
// old: 1, 2, 3, 4, 5, 6
// | ^
// |_____|
// So pipelines starting from 3rd position till 4th position shift to left (or decrease their order_id) by 1 position
if err := m.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
db := m.sqlstore.BunDBCtx(ctx)
var existing pipelinetypes.StoreablePipeline
if err := db.NewSelect().
Column("order_id", "enabled").
Model(&existing).
Where("id = ?", id).
Where("org_id = ?", orgID.StringValue()).
Scan(ctx); err != nil {
return m.sqlstore.WrapNotFoundErrf(
err,
errors.CodeNotFound,
"pipeline with id %s does not exist in org %s",
id,
orgID.StringValue(),
)
}
oldOrderID := existing.OrderID
newOrderID := storeable.OrderID
// Reorder other pipelines if the order has changed.
if newOrderID != oldOrderID {
if err := reorderPipelinesInTx(ctx, db, orgID.StringValue(), oldOrderID, newOrderID); err != nil {
return err
}
}
// Preserve primary key and immutable fields.
storeable.ID = existing.ID
// Persist the updated pipeline (including its new order).
if _, err := db.NewUpdate().
Model(storeable).
Where("id = ?", id).
Where("org_id = ?", orgID.StringValue()).
Exec(ctx); err != nil {
return err
}
// Apply pipelines if the enabled state has changed
if existing.Enabled != storeable.Enabled {
if err := m.applyPipelinesInTx(ctx, db, orgID, claims); err != nil {
return err
}
}
return nil
}); err != nil {
return nil, err
}
return &pipelinetypes.GettablePipeline{
StoreablePipeline: *storeable,
Filter: pipeline.Filter,
Config: pipeline.Config,
}, nil
}
func (m *module) applyPipelinesInTx(ctx context.Context, tx bun.IDB, orgID valuer.UUID, claims *authtypes.Claims) error {
// Get ids pipelines for the given org
var pipelines []pipelinetypes.StoreablePipeline
if err := tx.NewSelect().
Column("id").
Model(&pipelines).
Where("org_id = ?", orgID.StringValue()).
Scan(ctx); err != nil {
return m.sqlstore.WrapNotFoundErrf(
err,
errors.CodeNotFound,
"no pipelines found for org %s",
orgID.StringValue(),
)
}
// prepare config elements
elements := make([]string, len(pipelines))
for i, p := range pipelines {
elements[i] = p.ID.StringValue()
}
cfg, err := agentConf.StartNewVersion(ctx, tx, orgID, claims, opamptypes.ElementTypeLogPipelines, elements)
if err != nil || cfg == nil {
return errors.WithAdditionalf(err, "failed to start new version for org %s", orgID.StringValue())
}
return nil
}
// reorderPipelinesInTx updates order_id of other pipelines in a transaction-aware way.
// It assumes that all pipelines for a given org have consecutive order_id values starting from 1.
// The logic is:
// - When moving a pipeline from a higher position to a lower position (e.g., 5 -> 2),
// all pipelines in [newOrderID, oldOrderID) are shifted right by +1.
// - When moving from a lower position to a higher position (e.g., 2 -> 4),
// all pipelines in (oldOrderID, newOrderID] are shifted left by -1.
func reorderPipelinesInTx(ctx context.Context, tx bun.IDB, orgID string, oldOrderID, newOrderID int) error {
switch {
case newOrderID < oldOrderID:
// Move up: shift affected pipelines down (order_id + 1).
_, err := tx.NewUpdate().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Set("order_id = order_id + 1").
Where("org_id = ?", orgID).
Where("order_id >= ?", newOrderID).
Where("order_id < ?", oldOrderID).
Exec(ctx)
return err
case newOrderID > oldOrderID:
// Move down: shift affected pipelines up (order_id - 1).
_, err := tx.NewUpdate().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Set("order_id = order_id - 1").
Where("org_id = ?", orgID).
Where("order_id > ?", oldOrderID).
Where("order_id <= ?", newOrderID).
Exec(ctx)
return err
default:
return nil
}
}
func (m *module) DeletePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) error {
if err := pipeline.IsValid(); err != nil {
return err
}
if err := m.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
db := m.sqlstore.BunDBCtx(ctx)
// Fetch existing pipeline to determine its current order_id.
var existing pipelinetypes.StoreablePipeline
if err := db.NewSelect().
Model(&existing).
Column("order_id", "enabled").
Where("id = ?", pipeline.ID).
Where("org_id = ?", orgID.StringValue()).
Scan(ctx); err != nil {
return m.sqlstore.WrapNotFoundErrf(
err,
errors.CodeNotFound,
"pipeline with id %s does not exist in org %s",
pipeline.ID,
orgID.StringValue(),
)
}
if _, err := db.NewDelete().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Where("id = ?", pipeline.ID).
Where("org_id = ?", orgID.StringValue()).
Exec(ctx); err != nil {
return err
}
// Set order_ids of other pipelines by collapsing the gap left by the deleted pipeline.
if _, err := db.NewUpdate().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Set("order_id = order_id - 1").
Where("org_id = ?", orgID.StringValue()).
Where("order_id > ?", existing.OrderID).
Exec(ctx); err != nil {
return err
}
// Apply pipelines if the deleted pipeline was enabled
if existing.Enabled {
if err := m.applyPipelinesInTx(ctx, db, orgID, claims); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
return nil
}

View File

@@ -1,27 +0,0 @@
package logspipeline
import (
"context"
"net/http"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Module interface {
ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error)
ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error)
GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error)
CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
DeletePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) error
}
type Handler interface {
ListPipelines(w http.ResponseWriter, r *http.Request)
GetPipeline(w http.ResponseWriter, r *http.Request)
CreatePipeline(w http.ResponseWriter, r *http.Request)
UpdatePipeline(w http.ResponseWriter, r *http.Request)
DeletePipeline(w http.ResponseWriter, r *http.Request)
}

View File

@@ -120,6 +120,8 @@ func FilterResponse(results []*qbtypes.QueryRangeResponse) []*qbtypes.QueryRange
}
}
resultData.Rows = filteredRows
case *qbtypes.ScalarData:
resultData.Data = filterScalarDataIPs(resultData.Columns, resultData.Data)
}
filteredData = append(filteredData, result)
@@ -145,6 +147,39 @@ func shouldIncludeSeries(series *qbtypes.TimeSeries) bool {
return true
}
func filterScalarDataIPs(columns []*qbtypes.ColumnDescriptor, data [][]any) [][]any {
// Find column indices for server address fields
serverColIndices := make([]int, 0)
for i, col := range columns {
if col.Name == derivedKeyHTTPHost {
serverColIndices = append(serverColIndices, i)
}
}
if len(serverColIndices) == 0 {
return data
}
filtered := make([][]any, 0, len(data))
for _, row := range data {
includeRow := true
for _, colIdx := range serverColIndices {
if colIdx < len(row) {
if strVal, ok := row[colIdx].(string); ok {
if net.ParseIP(strVal) != nil {
includeRow = false
break
}
}
}
}
if includeRow {
filtered = append(filtered, row)
}
}
return filtered
}
func shouldIncludeRow(row *qbtypes.RawRow) bool {
if row.Data != nil {
if domainVal, ok := row.Data[derivedKeyHTTPHost]; ok {

View File

@@ -117,6 +117,59 @@ func TestFilterResponse(t *testing.T) {
},
},
},
{
name: "should filter out IP addresses from scalar data",
input: []*qbtypes.QueryRangeResponse{
{
Data: qbtypes.QueryData{
Results: []any{
&qbtypes.ScalarData{
QueryName: "endpoints",
Columns: []*qbtypes.ColumnDescriptor{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
Type: qbtypes.ColumnTypeGroup,
},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
Type: qbtypes.ColumnTypeAggregation,
},
},
Data: [][]any{
{"192.168.1.1", 10},
{"example.com", 20},
{"10.0.0.1", 5},
},
},
},
},
},
},
expected: []*qbtypes.QueryRangeResponse{
{
Data: qbtypes.QueryData{
Results: []any{
&qbtypes.ScalarData{
QueryName: "endpoints",
Columns: []*qbtypes.ColumnDescriptor{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
Type: qbtypes.ColumnTypeGroup,
},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
Type: qbtypes.ColumnTypeAggregation,
},
},
Data: [][]any{
{"example.com", 20},
},
},
},
},
},
},
},
}
for _, tt := range tests {

View File

@@ -13,7 +13,6 @@ import (
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
@@ -89,10 +88,10 @@ func (r *Repo) GetConfigVersion(
}
func (r *Repo) GetLatestVersion(
ctx context.Context, tx bun.IDB, orgId valuer.UUID, typ opamptypes.ElementType,
ctx context.Context, orgId valuer.UUID, typ opamptypes.ElementType,
) (*opamptypes.AgentConfigVersion, error) {
var c opamptypes.AgentConfigVersion
err := tx.NewSelect().
err := r.store.BunDB().NewSelect().
Model(&c).
ColumnExpr("id, version, element_type, deploy_status, deploy_result, created_at").
ColumnExpr("COALESCE(created_by, '') as created_by").
@@ -112,8 +111,8 @@ func (r *Repo) GetLatestVersion(
return &c, nil
}
func (r *Repo) insertConfigInTx(
ctx context.Context, tx bun.IDB, orgId valuer.UUID, c *opamptypes.AgentConfigVersion, elements []string,
func (r *Repo) insertConfig(
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, c *opamptypes.AgentConfigVersion, elements []string,
) error {
if c.ElementType.StringValue() == "" {
@@ -122,6 +121,7 @@ func (r *Repo) insertConfigInTx(
// allowing empty elements for logs - use case is deleting all pipelines
if len(elements) == 0 && c.ElementType != opamptypes.ElementTypeLogPipelines {
zap.L().Error("insert config called with no elements ", zap.String("ElementType", c.ElementType.StringValue()))
return errors.NewInvalidInputf(CodeConfigElementsRequired, "config must have atleast one element")
}
@@ -129,11 +129,13 @@ func (r *Repo) insertConfigInTx(
// the version can not be set by the user, we want to auto-assign the versions
// in a monotonically increasing order starting with 1. hence, we reject insert
// requests with version anything other than 0. here, 0 indicates un-assigned
zap.L().Error("invalid version assignment while inserting agent config", zap.Int("version", c.Version), zap.String("ElementType", c.ElementType.StringValue()))
return errors.NewInvalidInputf(errors.CodeInvalidInput, "user defined versions are not supported in the agent config")
}
configVersion, err := r.GetLatestVersion(ctx, tx, orgId, c.ElementType)
configVersion, err := r.GetLatestVersion(ctx, orgId, c.ElementType)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
zap.L().Error("failed to fetch latest config version", zap.Error(err))
return err
}
@@ -144,10 +146,31 @@ func (r *Repo) insertConfigInTx(
c.Version = 1
}
_, dbErr := tx.NewInsert().
// Track whether we've successfully finished the insert operation
success := false
defer func() {
if !success {
// remove all the damage (invalid rows from db)
// Delete elements first, then version (to respect potential foreign key constraints)
_, delErr := r.store.BunDB().NewDelete().Model(new(opamptypes.AgentConfigElement)).Where("version_id = ?", c.ID).Exec(ctx)
if delErr != nil {
zap.L().Error("failed to delete config elements during cleanup", zap.Error(delErr), zap.String("version_id", c.ID.String()))
}
_, delErr = r.store.BunDB().NewDelete().Model(new(opamptypes.AgentConfigVersion)).Where("id = ?", c.ID).Where("org_id = ?", orgId).Exec(ctx)
if delErr != nil {
zap.L().Error("failed to delete config version during cleanup", zap.Error(delErr), zap.String("version_id", c.ID.String()))
}
}
}()
_, dbErr := r.store.
BunDB().
NewInsert().
Model(c).
Exec(ctx)
if dbErr != nil {
zap.L().Error("error in inserting config version: ", zap.Error(dbErr))
return errors.WrapInternalf(dbErr, CodeConfigVersionInsertFailed, "failed to insert config version")
}
@@ -162,12 +185,13 @@ func (r *Repo) insertConfigInTx(
ElementType: c.ElementType.StringValue(),
ElementID: e,
}
_, dbErr = tx.NewInsert().Model(agentConfigElement).Exec(ctx)
_, dbErr = r.store.BunDB().NewInsert().Model(agentConfigElement).Exec(ctx)
if dbErr != nil {
return errors.WrapInternalf(dbErr, CodeConfigElementInsertFailed, "failed to insert config element")
}
}
success = true
return nil
}

View File

@@ -14,11 +14,9 @@ import (
tsp "github.com/SigNoz/signoz/pkg/query-service/app/opamp/otelconfig/tailsampler"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"github.com/uptrace/bun"
"go.uber.org/zap"
yaml "gopkg.in/yaml.v3"
)
@@ -183,7 +181,7 @@ func (m *Manager) ReportConfigDeploymentStatus(
func GetLatestVersion(
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType,
) (*opamptypes.AgentConfigVersion, error) {
return m.GetLatestVersion(ctx, m.store.BunDBCtx(ctx), orgId, elementType)
return m.GetLatestVersion(ctx, orgId, elementType)
}
func GetConfigVersion(
@@ -200,14 +198,14 @@ func GetConfigHistory(
// StartNewVersion launches a new config version for given set of elements
func StartNewVersion(
ctx context.Context, tx bun.IDB, orgId valuer.UUID, claims *authtypes.Claims, eleType opamptypes.ElementType, elementIds []string,
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, eleType opamptypes.ElementType, elementIds []string,
) (*opamptypes.AgentConfigVersion, error) {
// create a new version
cfg := opamptypes.NewAgentConfigVersion(orgId, claims, eleType)
cfg := opamptypes.NewAgentConfigVersion(orgId, userId, eleType)
// insert new config and elements into database
err := m.insertConfigInTx(ctx, tx, orgId, cfg, elementIds)
err := m.insertConfig(ctx, orgId, userId, cfg, elementIds)
if err != nil {
return nil, err
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
"github.com/SigNoz/signoz/pkg/queryparser"
@@ -4049,6 +4048,26 @@ func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
aH.WriteJSON(w, r, model.GetLogsAggregatesResponse{})
}
func parseAgentConfigVersion(r *http.Request) (int, error) {
versionString := mux.Vars(r)["version"]
if versionString == "latest" {
return -1, nil
}
version64, err := strconv.ParseInt(versionString, 0, 8)
if err != nil {
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
}
if version64 <= 0 {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
}
return int(version64), nil
}
func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
req := logparsingpipeline.PipelinesPreviewRequest{}
@@ -4079,7 +4098,7 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
return
}
version, err := impllogspipeline.ParseAgentConfigVersion(r)
version, err := parseAgentConfigVersion(r)
if err != nil {
render.Error(w, err)
return
@@ -4162,6 +4181,11 @@ func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
render.Error(w, errv2)
return
}
userID, errv2 := valuer.NewUUID(claims.UserID)
if errv2 != nil {
render.Error(w, errv2)
return
}
req := pipelinetypes.PostablePipelines{}
@@ -4183,7 +4207,7 @@ func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
return nil, err
}
return aH.LogsParsingPipelineController.ApplyPipelines(ctx, orgID, &claims, postable)
return aH.LogsParsingPipelineController.ApplyPipelines(ctx, orgID, userID, postable)
}
res, err := createPipeline(r.Context(), req.Pipelines)

View File

@@ -16,7 +16,6 @@ import (
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -59,7 +58,7 @@ type PipelinesResponse struct {
func (ic *LogParsingPipelineController) ApplyPipelines(
ctx context.Context,
orgID valuer.UUID,
claims *authtypes.Claims,
userID valuer.UUID,
postable []pipelinetypes.PostablePipeline,
) (*PipelinesResponse, error) {
var pipelines []pipelinetypes.GettablePipeline
@@ -90,7 +89,7 @@ func (ic *LogParsingPipelineController) ApplyPipelines(
elements[i] = p.ID.StringValue()
}
cfg, err := agentConf.StartNewVersion(ctx, ic.sqlStore.BunDBCtx(ctx), orgID, claims, opamptypes.ElementTypeLogPipelines, elements)
cfg, err := agentConf.StartNewVersion(ctx, orgID, userID, opamptypes.ElementTypeLogPipelines, elements)
if err != nil || cfg == nil {
return nil, model.InternalError(fmt.Errorf("failed to start new version: %w", err))
}

View File

@@ -13,8 +13,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -68,7 +66,6 @@ type Modules struct {
SpanPercentile spanpercentile.Module
MetricsExplorer metricsexplorer.Module
Promote promote.Module
LogsPipeline logspipeline.Module
}
func NewModules(
@@ -113,6 +110,5 @@ func NewModules(
Services: implservices.NewModule(querier, telemetryStore),
MetricsExplorer: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, ruleStore, dashboard, providerSettings, config.MetricsExplorer),
Promote: implpromote.NewModule(telemetryMetadataStore, telemetryStore),
LogsPipeline: impllogspipeline.NewModule(sqlstore),
}
}

View File

@@ -18,7 +18,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
@@ -60,7 +59,6 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
struct{ authz.Handler }{},
struct{ zeus.Handler }{},
struct{ querier.Handler }{},
struct{ logspipeline.Handler }{},
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
if err != nil {
return nil, err

View File

@@ -23,7 +23,6 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/global/signozglobal"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
@@ -170,6 +169,7 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewAddAnonymousPublicDashboardTransactionFactory(sqlstore),
sqlmigration.NewAddRootUserFactory(sqlstore, sqlschema),
sqlmigration.NewAddUserEmailOrgIDIndexFactory(sqlstore, sqlschema),
sqlmigration.NewMigrateRulesV4ToV5Factory(sqlstore, telemetryStore),
)
}
@@ -255,7 +255,6 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
handlers.AuthzHandler,
handlers.ZeusHandler,
handlers.QuerierHandler,
impllogspipeline.NewHandler(modules.LogsPipeline),
),
)
}

View File

@@ -0,0 +1,209 @@
package sqlmigration
import (
"context"
"database/sql"
"encoding/json"
"log/slog"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/transition"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type migrateRulesV4ToV5 struct {
store sqlstore.SQLStore
telemetryStore telemetrystore.TelemetryStore
logger *slog.Logger
}
func NewMigrateRulesV4ToV5Factory(
store sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(
factory.MustNewName("migrate_rules_post_deprecation"),
func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return &migrateRulesV4ToV5{
store: store,
telemetryStore: telemetryStore,
logger: ps.Logger,
}, nil
})
}
func (migration *migrateRulesV4ToV5) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *migrateRulesV4ToV5) getLogDuplicateKeys(ctx context.Context) ([]string, error) {
query := `
SELECT name
FROM (
SELECT DISTINCT name FROM signoz_logs.distributed_logs_attribute_keys
INTERSECT
SELECT DISTINCT name FROM signoz_logs.distributed_logs_resource_keys
)
ORDER BY name
`
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
if err != nil {
migration.logger.WarnContext(ctx, "failed to query log duplicate keys", "error", err)
return nil, nil
}
defer rows.Close()
var keys []string
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
migration.logger.WarnContext(ctx, "failed to scan log duplicate key", "error", err)
continue
}
keys = append(keys, key)
}
return keys, nil
}
func (migration *migrateRulesV4ToV5) getTraceDuplicateKeys(ctx context.Context) ([]string, error) {
query := `
SELECT tagKey
FROM signoz_traces.distributed_span_attributes_keys
WHERE tagType IN ('tag', 'resource')
GROUP BY tagKey
HAVING COUNT(DISTINCT tagType) > 1
ORDER BY tagKey
`
rows, err := migration.telemetryStore.ClickhouseDB().Query(ctx, query)
if err != nil {
migration.logger.WarnContext(ctx, "failed to query trace duplicate keys", "error", err)
return nil, nil
}
defer rows.Close()
var keys []string
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
migration.logger.WarnContext(ctx, "failed to scan trace duplicate key", "error", err)
continue
}
keys = append(keys, key)
}
return keys, nil
}
func (migration *migrateRulesV4ToV5) Up(ctx context.Context, db *bun.DB) error {
logsKeys, err := migration.getLogDuplicateKeys(ctx)
if err != nil {
return err
}
tracesKeys, err := migration.getTraceDuplicateKeys(ctx)
if err != nil {
return err
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
var rules []struct {
ID string `bun:"id"`
Data map[string]any `bun:"data"`
}
err = tx.NewSelect().
Table("rule").
Column("id", "data").
Scan(ctx, &rules)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
alertsMigrator := transition.NewAlertMigrateV5(migration.logger, logsKeys, tracesKeys)
count := 0
for _, rule := range rules {
version, _ := rule.Data["version"].(string)
if version == "v5" {
continue
}
if version == "" {
migration.logger.WarnContext(ctx, "unexpected empty version for rule", "rule_id", rule.ID)
}
migration.logger.InfoContext(ctx, "migrating rule v4 to v5", "rule_id", rule.ID, "current_version", version)
// Check if the queries envelope already exists and is non-empty
hasQueriesEnvelope := false
if condition, ok := rule.Data["condition"].(map[string]any); ok {
if compositeQuery, ok := condition["compositeQuery"].(map[string]any); ok {
if queries, ok := compositeQuery["queries"].([]any); ok && len(queries) > 0 {
hasQueriesEnvelope = true
}
}
}
if hasQueriesEnvelope {
// already has queries envelope, just bump version
// this is because user made a mistake of choosing version
migration.logger.InfoContext(ctx, "rule already has queries envelope, bumping version", "rule_id", rule.ID)
rule.Data["version"] = "v5"
} else {
// old format, run full migration
migration.logger.InfoContext(ctx, "rule has old format, running full migration", "rule_id", rule.ID)
updated := alertsMigrator.Migrate(ctx, rule.Data)
if !updated {
migration.logger.WarnContext(ctx, "expected updated to be true but got false", "rule_id", rule.ID)
continue
}
rule.Data["version"] = "v5"
}
dataJSON, err := json.Marshal(rule.Data)
if err != nil {
return err
}
_, err = tx.NewUpdate().
Table("rule").
Set("data = ?", string(dataJSON)).
Where("id = ?", rule.ID).
Exec(ctx)
if err != nil {
return err
}
count++
}
if count != 0 {
migration.logger.InfoContext(ctx, "migrate v4 alerts", "count", count)
}
return tx.Commit()
}
func (migration *migrateRulesV4ToV5) Down(ctx context.Context, db *bun.DB) error {
return nil
}

View File

@@ -5,7 +5,6 @@ import (
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
@@ -99,13 +98,13 @@ type AgentConfigVersion struct {
Config string `json:"config" bun:"config,type:text"`
}
func NewAgentConfigVersion(orgId valuer.UUID, claims *authtypes.Claims, elementType ElementType) *AgentConfigVersion {
func NewAgentConfigVersion(orgId valuer.UUID, userId valuer.UUID, elementType ElementType) *AgentConfigVersion {
return &AgentConfigVersion{
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
UserAuditable: types.UserAuditable{CreatedBy: claims.Email, UpdatedBy: claims.Email},
UserAuditable: types.UserAuditable{CreatedBy: userId.String(), UpdatedBy: userId.String()},
OrgID: orgId,
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
ElementType: elementType,

View File

@@ -10,7 +10,6 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
@@ -267,37 +266,6 @@ func (p *PostablePipeline) IsValid() error {
return nil
}
func (p *PostablePipeline) ToStoreablePipeline() (*StoreablePipeline, error) {
rawConfig, err := json.Marshal(p.Config)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to unmarshal postable pipeline config")
}
filter, err := json.Marshal(p.Filter)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to marshal postable pipeline filter")
}
identifier := valuer.GenerateUUID()
if p.ID != "" {
identifier, err = valuer.NewUUID(p.ID)
if err != nil {
return nil, errors.WithAdditionalf(err, "failed to parse postable pipeline id")
}
}
return &StoreablePipeline{
Identifiable: types.Identifiable{
ID: identifier,
},
OrderID: p.OrderID,
Enabled: p.Enabled,
Name: p.Name,
Alias: p.Alias,
Description: p.Description,
FilterString: string(filter),
ConfigJSON: string(rawConfig),
}, nil
}
func isValidOperator(op PipelineOperator) error {
if op.ID == "" {
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "PipelineOperator.ID is required")

View File

@@ -355,6 +355,10 @@ func (r *PostableRule) validate() error {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "composite query is required"))
}
if r.Version != "v5" {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "only version v5 is supported, got %q", r.Version))
}
if isAllQueriesDisabled(r.RuleCondition.CompositeQuery) {
errs = append(errs, signozError.NewInvalidInputf(signozError.CodeInvalidInput, "all queries are disabled in rule condition"))
}

View File

@@ -108,6 +108,7 @@ func TestParseIntoRule(t *testing.T) {
"ruleType": "threshold_rule",
"evalWindow": "5m",
"frequency": "1m",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -150,6 +151,7 @@ func TestParseIntoRule(t *testing.T) {
content: []byte(`{
"alert": "DefaultsRule",
"ruleType": "threshold_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -187,6 +189,7 @@ func TestParseIntoRule(t *testing.T) {
initRule: PostableRule{},
content: []byte(`{
"alert": "PromQLRule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "promql",
@@ -256,6 +259,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
content: []byte(`{
"alert": "SeverityLabelTest",
"schemaVersion": "v1",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -344,6 +348,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
content: []byte(`{
"alert": "NoLabelsTest",
"schemaVersion": "v1",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -384,6 +389,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
content: []byte(`{
"alert": "OverwriteTest",
"schemaVersion": "v1",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -474,6 +480,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
content: []byte(`{
"alert": "V2Test",
"schemaVersion": "v2",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -517,6 +524,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
initRule: PostableRule{},
content: []byte(`{
"alert": "DefaultSchemaTest",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -569,6 +577,7 @@ func TestParseIntoRuleSchemaVersioning(t *testing.T) {
func TestParseIntoRuleThresholdGeneration(t *testing.T) {
content := []byte(`{
"alert": "TestThresholds",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -639,6 +648,7 @@ func TestParseIntoRuleMultipleThresholds(t *testing.T) {
"schemaVersion": "v2",
"alert": "MultiThresholdAlert",
"ruleType": "threshold_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -732,6 +742,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyBelowTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -766,6 +777,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyBelowTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -799,6 +811,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyAboveTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -833,6 +846,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyAboveTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -866,6 +880,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyBelowAllTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -901,6 +916,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyBelowAllTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -935,6 +951,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "AnomalyOutOfBoundsTest",
"ruleType": "anomaly_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -969,6 +986,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "ThresholdTest",
"ruleType": "threshold_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",
@@ -1003,6 +1021,7 @@ func TestAnomalyNegationEval(t *testing.T) {
ruleJSON: []byte(`{
"alert": "ThresholdTest",
"ruleType": "threshold_rule",
"version": "v5",
"condition": {
"compositeQuery": {
"queryType": "builder",