Compare commits

..

9 Commits

Author SHA1 Message Date
Naman Verma
860c5e42cb feat: agg tables should use the basic query with check for restarts 2026-02-24 19:32:33 +05:30
Naman Verma
7e6ce8dea2 fix: rate expr was missing an OVER rate_window 2026-02-24 19:32:00 +05:30
Naman Verma
c948b04fa5 Merge branch 'main' into nv/7069 2026-02-24 17:57:04 +05:30
Vinicius Lourenço
cb1a2a8a13 perf(bundle-size): lazy load pages to reduce main bundle size (#10230)
Some checks are pending
build-staging / prepare (push) Waiting to run
build-staging / js-build (push) Blocked by required conditions
build-staging / go-build (push) Blocked by required conditions
build-staging / staging (push) Blocked by required conditions
Release Drafter / update_release_draft (push) Waiting to run
2026-02-24 10:41:40 +00:00
Nikhil Soni
1a5d37b25a fix: add missing filtering for ip address for scalar data (#10264)
* fix: add missing filtering for ip address for scalar data

In domain listing api for external api monitoring,
we have option to filter out the IP address but
it only handles timeseries and raw type data while
domain list handler returns scalar data.

* fix: switch to new derived attributes for ip filtering

---------

Co-authored-by: Nityananda Gohain <nityanandagohain@gmail.com>
2026-02-24 10:26:10 +00:00
Piyush Singariya
bc4273f2f8 chore: test clickhouse version 25.12.5 (#10402) 2026-02-24 14:55:51 +05:30
Naman Verma
d6ba98148a fix: use column for latest val based on table being used 2026-02-23 20:26:55 +05:30
Naman Verma
953b9e8216 feat: proper expr for rate 2026-02-23 20:25:36 +05:30
Naman Verma
212f5f4120 feat: use start time in temporal aggregation 2026-02-23 17:06:23 +05:30
16 changed files with 263 additions and 427 deletions

View File

@@ -54,7 +54,7 @@ jobs:
- sqlite
clickhouse-version:
- 25.5.6
- 25.10.5
- 25.12.5
schema-migrator-version:
- v0.142.0
postgres-version:

View File

@@ -308,3 +308,15 @@ export const PublicDashboardPage = Loadable(
/* webpackChunkName: "Public Dashboard Page" */ 'pages/PublicDashboard'
),
);
export const AlertTypeSelectionPage = Loadable(
() =>
import(
/* webpackChunkName: "Alert Type Selection Page" */ 'pages/AlertTypeSelection'
),
);
export const MeterExplorerPage = Loadable(
() =>
import(/* webpackChunkName: "Meter Explorer Page" */ 'pages/MeterExplorer'),
);

View File

@@ -1,12 +1,10 @@
import { RouteProps } from 'react-router-dom';
import ROUTES from 'constants/routes';
import AlertTypeSelectionPage from 'pages/AlertTypeSelection';
import MessagingQueues from 'pages/MessagingQueues';
import MeterExplorer from 'pages/MeterExplorer';
import {
AlertHistory,
AlertOverview,
AlertTypeSelectionPage,
AllAlertChannels,
AllErrors,
ApiMonitoring,
@@ -29,6 +27,8 @@ import {
LogsExplorer,
LogsIndexToFields,
LogsSaveViews,
MessagingQueuesMainPage,
MeterExplorerPage,
MetricsExplorer,
OldLogsExplorer,
Onboarding,
@@ -399,28 +399,28 @@ const routes: AppRoutes[] = [
{
path: ROUTES.MESSAGING_QUEUES_KAFKA,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_KAFKA',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_CELERY_TASK,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_CELERY_TASK',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_OVERVIEW,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_OVERVIEW',
isPrivate: true,
},
{
path: ROUTES.MESSAGING_QUEUES_KAFKA_DETAIL,
exact: true,
component: MessagingQueues,
component: MessagingQueuesMainPage,
key: 'MESSAGING_QUEUES_KAFKA_DETAIL',
isPrivate: true,
},
@@ -463,21 +463,21 @@ const routes: AppRoutes[] = [
{
path: ROUTES.METER,
exact: true,
component: MeterExplorer,
component: MeterExplorerPage,
key: 'METER',
isPrivate: true,
},
{
path: ROUTES.METER_EXPLORER,
exact: true,
component: MeterExplorer,
component: MeterExplorerPage,
key: 'METER_EXPLORER',
isPrivate: true,
},
{
path: ROUTES.METER_EXPLORER_VIEWS,
exact: true,
component: MeterExplorer,
component: MeterExplorerPage,
key: 'METER_EXPLORER_VIEWS',
isPrivate: true,
},

View File

@@ -1,12 +0,0 @@
package signozapiserver
import (
"net/http"
"github.com/gorilla/mux"
)
func (provider *provider) addLogspipelineRoutes(router *mux.Router) error {
router.HandleFunc("/api/v2/pipelines", provider.logspipelineHandler.ListPipelines).Methods(http.MethodGet)
return nil
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
@@ -49,7 +48,6 @@ type provider struct {
authzHandler authz.Handler
zeusHandler zeus.Handler
querierHandler querier.Handler
logspipelineHandler logspipeline.Handler
}
func NewFactory(
@@ -71,7 +69,6 @@ func NewFactory(
authzHandler authz.Handler,
zeusHandler zeus.Handler,
querierHandler querier.Handler,
logspipelineHandler logspipeline.Handler,
) factory.ProviderFactory[apiserver.APIServer, apiserver.Config] {
return factory.NewProviderFactory(factory.MustNewName("signoz"), func(ctx context.Context, providerSettings factory.ProviderSettings, config apiserver.Config) (apiserver.APIServer, error) {
return newProvider(
@@ -96,7 +93,6 @@ func NewFactory(
authzHandler,
zeusHandler,
querierHandler,
logspipelineHandler,
)
})
}
@@ -123,7 +119,6 @@ func newProvider(
authzHandler authz.Handler,
zeusHandler zeus.Handler,
querierHandler querier.Handler,
logspipelineHandler logspipeline.Handler,
) (apiserver.APIServer, error) {
settings := factory.NewScopedProviderSettings(providerSettings, "github.com/SigNoz/signoz/pkg/apiserver/signozapiserver")
router := mux.NewRouter().UseEncodedPath()
@@ -148,7 +143,6 @@ func newProvider(
authzHandler: authzHandler,
zeusHandler: zeusHandler,
querierHandler: querierHandler,
logspipelineHandler: logspipelineHandler,
}
provider.authZ = middleware.NewAuthZ(settings.Logger(), orgGetter, authz)
@@ -229,14 +223,9 @@ func (provider *provider) AddToRouter(router *mux.Router) error {
return err
}
if err := provider.addLogspipelineRoutes(router); err != nil {
return err
}
return nil
}
func newSecuritySchemes(role types.Role) []handler.OpenAPISecurityScheme {
return []handler.OpenAPISecurityScheme{
{Name: ctxtypes.AuthTypeAPIKey.StringValue(), Scopes: []string{role.String()}},

View File

@@ -1,91 +0,0 @@
package impllogspipeline
import (
"net/http"
"strconv"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
)
type handler struct {
module logspipeline.Module
}
func NewHandler(module logspipeline.Module) logspipeline.Handler {
return &handler{module: module}
}
func (h *handler) ListPipelines(w http.ResponseWriter, r *http.Request) {
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(w, err)
return
}
orgID, errv2 := valuer.NewUUID(claims.OrgID)
if errv2 != nil {
render.Error(w, errv2)
return
}
version, err := ParseAgentConfigVersion(r)
if err != nil {
render.Error(w, err)
return
}
if version != -1 {
pipelines, err := h.module.ListPipelinesByVersion(r.Context(), orgID, version)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, pipelines)
return
}
pipelines, err := h.module.ListPipelines(r.Context(), orgID)
if err != nil {
render.Error(w, err)
return
}
render.Success(w, http.StatusOK, pipelines)
}
func (h *handler) GetPipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) CreatePipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) UpdatePipeline(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) DeletePipeline(w http.ResponseWriter, r *http.Request) {
}
func ParseAgentConfigVersion(r *http.Request) (int, error) {
versionString := mux.Vars(r)["version"]
if versionString == "latest" {
return -1, nil
}
version64, err := strconv.ParseInt(versionString, 0, 8)
if err != nil {
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
}
if version64 <= 0 {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
}
return int(version64), nil
}

View File

@@ -1,230 +0,0 @@
package impllogspipeline
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"go.uber.org/zap"
)
type module struct {
sqlstore sqlstore.SQLStore
}
func NewModule(sqlstore sqlstore.SQLStore) logspipeline.Module {
return &module{sqlstore: sqlstore}
}
func (m *module) ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error) {
latestVersion := -1
// get latest agent config
lastestConfig, err := agentConf.GetLatestVersion(ctx, orgID, opamptypes.ElementTypeLogPipelines)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
return nil, err
}
if lastestConfig != nil {
latestVersion = lastestConfig.Version
}
return m.ListPipelinesByVersion(ctx, orgID, latestVersion)
}
func (m *module) ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error) {
var stored []pipelinetypes.StoreablePipeline
err := m.sqlstore.BunDB().NewSelect().
Model(&stored).
Join("JOIN agent_config_element e ON p.id = e.element_id").
Join("JOIN agent_config_version v ON v.id = e.version_id").
Where("e.element_type = ?", opamptypes.ElementTypeLogPipelines.StringValue()).
Where("v.version = ?", version).
Where("v.org_id = ?", orgID.StringValue()).
Order("p.order_id ASC").
Scan(ctx)
if err != nil {
return nil, err
}
pipelines := make([]pipelinetypes.GettablePipeline, len(stored))
if len(stored) == 0 {
return pipelines, nil
}
for i := range stored {
pipelines[i].StoreablePipeline = stored[i]
if err := pipelines[i].ParseRawConfig(); err != nil {
return nil, err
}
if err := pipelines[i].ParseFilter(); err != nil {
return nil, err
}
}
return pipelines, nil
}
func (m *module) GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error) {
return nil, nil
}
func (m *module) CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
storeable, err := pipeline.ToStoreablePipeline()
if err != nil {
return nil, err
}
// regenerate the id and set other fields
storeable.Identifiable.ID = valuer.GenerateUUID()
storeable.OrgID = orgID.String()
storeable.TimeAuditable = types.TimeAuditable{
CreatedAt: time.Now(),
}
storeable.UserAuditable = types.UserAuditable{
CreatedBy: claims.Email,
}
_, err = m.sqlstore.BunDB().NewInsert().
Model(&storeable).
Exec(ctx)
if err != nil {
zap.L().Error("error in inserting pipeline data", zap.Error(err))
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to insert pipeline")
}
return &pipelinetypes.GettablePipeline{
StoreablePipeline: *storeable,
Filter: pipeline.Filter,
Config: pipeline.Config,
}, nil
}
func (m *module) UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error) {
if err := pipeline.IsValid(); err != nil {
return nil, err
}
storeable, err := pipeline.ToStoreablePipeline()
if err != nil {
return nil, err
}
storeable.OrgID = orgID.String()
storeable.TimeAuditable = types.TimeAuditable{
UpdatedAt: time.Now(),
}
storeable.UserAuditable = types.UserAuditable{
UpdatedBy: claims.Email,
}
// get id from storeable pipeline
id := storeable.ID.StringValue()
// depending on the order_id update the rest of the table
// example 1: total available pipelines are 6, and order_id 5 is moved to 2, then we need to update the rest of the table
// old: 1, 2, 3, 4, 5, 6
// ^ |
// |_________|
// So pipelines starting from 2nd position till 4th position shift to right (or increase their order_id) by 1 position
// example 2: total available pipelines are 6, and order_id 2 is moved to 4, then we need to update the rest of the table
// old: 1, 2, 3, 4, 5, 6
// | ^
// |_____|
// So pipelines starting from 3rd position till 4th position shift to left (or decrease their order_id) by 1 position
if err := m.sqlstore.RunInTxCtx(ctx, nil, func(ctx context.Context) error {
db := m.sqlstore.BunDBCtx(ctx)
var existing pipelinetypes.StoreablePipeline
if err := db.NewSelect().
Model(&existing).
Where("id = ?", id).
Where("org_id = ?", orgID.StringValue()).
Scan(ctx); err != nil {
return m.sqlstore.WrapNotFoundErrf(
err,
errors.CodeNotFound,
"pipeline with id %s does not exist in org %s",
id,
orgID.StringValue(),
)
}
oldOrderID := existing.OrderID
newOrderID := storeable.OrderID
// Reorder other pipelines if the order has changed.
if newOrderID != oldOrderID {
if err := reorderPipelinesInTx(ctx, db, orgID.StringValue(), oldOrderID, newOrderID); err != nil {
return err
}
}
// Preserve primary key and immutable fields.
storeable.ID = existing.ID
// Persist the updated pipeline (including its new order).
if _, err := db.NewUpdate().
Model(storeable).
Where("id = ?", id).
Where("org_id = ?", orgID.StringValue()).
Exec(ctx); err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return &pipelinetypes.GettablePipeline{
StoreablePipeline: *storeable,
Filter: pipeline.Filter,
Config: pipeline.Config,
}, nil
}
// reorderPipelinesInTx updates order_id of other pipelines in a transaction-aware way.
// It assumes that all pipelines for a given org have consecutive order_id values starting from 1.
// The logic is:
// - When moving a pipeline from a higher position to a lower position (e.g., 5 -> 2),
// all pipelines in [newOrderID, oldOrderID) are shifted right by +1.
// - When moving from a lower position to a higher position (e.g., 2 -> 4),
// all pipelines in (oldOrderID, newOrderID] are shifted left by -1.
func reorderPipelinesInTx(ctx context.Context, tx bun.IDB, orgID string, oldOrderID, newOrderID int) error {
switch {
case newOrderID < oldOrderID:
// Move up: shift affected pipelines down (order_id + 1).
_, err := tx.NewUpdate().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Set("order_id = order_id + 1").
Where("org_id = ?", orgID).
Where("order_id >= ?", newOrderID).
Where("order_id < ?", oldOrderID).
Exec(ctx)
return err
case newOrderID > oldOrderID:
// Move down: shift affected pipelines up (order_id - 1).
_, err := tx.NewUpdate().
Model((*pipelinetypes.StoreablePipeline)(nil)).
Set("order_id = order_id - 1").
Where("org_id = ?", orgID).
Where("order_id > ?", oldOrderID).
Where("order_id <= ?", newOrderID).
Exec(ctx)
return err
default:
return nil
}
}
func (m *module) DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error {
return nil
}

View File

@@ -1,27 +0,0 @@
package logspipeline
import (
"context"
"net/http"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Module interface {
ListPipelines(ctx context.Context, orgID valuer.UUID) ([]pipelinetypes.GettablePipeline, error)
ListPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) ([]pipelinetypes.GettablePipeline, error)
GetPipeline(ctx context.Context, orgID valuer.UUID, id string) (*pipelinetypes.GettablePipeline, error)
CreatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
UpdatePipeline(ctx context.Context, orgID valuer.UUID, claims *authtypes.Claims, pipeline *pipelinetypes.PostablePipeline) (*pipelinetypes.GettablePipeline, error)
DeletePipeline(ctx context.Context, orgID valuer.UUID, id string) error
}
type Handler interface {
ListPipelines(w http.ResponseWriter, r *http.Request)
GetPipeline(w http.ResponseWriter, r *http.Request)
CreatePipeline(w http.ResponseWriter, r *http.Request)
UpdatePipeline(w http.ResponseWriter, r *http.Request)
DeletePipeline(w http.ResponseWriter, r *http.Request)
}

View File

@@ -120,6 +120,8 @@ func FilterResponse(results []*qbtypes.QueryRangeResponse) []*qbtypes.QueryRange
}
}
resultData.Rows = filteredRows
case *qbtypes.ScalarData:
resultData.Data = filterScalarDataIPs(resultData.Columns, resultData.Data)
}
filteredData = append(filteredData, result)
@@ -145,6 +147,39 @@ func shouldIncludeSeries(series *qbtypes.TimeSeries) bool {
return true
}
func filterScalarDataIPs(columns []*qbtypes.ColumnDescriptor, data [][]any) [][]any {
// Find column indices for server address fields
serverColIndices := make([]int, 0)
for i, col := range columns {
if col.Name == derivedKeyHTTPHost {
serverColIndices = append(serverColIndices, i)
}
}
if len(serverColIndices) == 0 {
return data
}
filtered := make([][]any, 0, len(data))
for _, row := range data {
includeRow := true
for _, colIdx := range serverColIndices {
if colIdx < len(row) {
if strVal, ok := row[colIdx].(string); ok {
if net.ParseIP(strVal) != nil {
includeRow = false
break
}
}
}
}
if includeRow {
filtered = append(filtered, row)
}
}
return filtered
}
func shouldIncludeRow(row *qbtypes.RawRow) bool {
if row.Data != nil {
if domainVal, ok := row.Data[derivedKeyHTTPHost]; ok {

View File

@@ -117,6 +117,59 @@ func TestFilterResponse(t *testing.T) {
},
},
},
{
name: "should filter out IP addresses from scalar data",
input: []*qbtypes.QueryRangeResponse{
{
Data: qbtypes.QueryData{
Results: []any{
&qbtypes.ScalarData{
QueryName: "endpoints",
Columns: []*qbtypes.ColumnDescriptor{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
Type: qbtypes.ColumnTypeGroup,
},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
Type: qbtypes.ColumnTypeAggregation,
},
},
Data: [][]any{
{"192.168.1.1", 10},
{"example.com", 20},
{"10.0.0.1", 5},
},
},
},
},
},
},
expected: []*qbtypes.QueryRangeResponse{
{
Data: qbtypes.QueryData{
Results: []any{
&qbtypes.ScalarData{
QueryName: "endpoints",
Columns: []*qbtypes.ColumnDescriptor{
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: derivedKeyHTTPHost},
Type: qbtypes.ColumnTypeGroup,
},
{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "endpoints"},
Type: qbtypes.ColumnTypeAggregation,
},
},
Data: [][]any{
{"example.com", 20},
},
},
},
},
},
},
},
}
for _, tt := range tests {

View File

@@ -9,7 +9,6 @@ import (
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/flagger"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/thirdpartyapi"
"github.com/SigNoz/signoz/pkg/queryparser"
@@ -4049,6 +4048,26 @@ func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
aH.WriteJSON(w, r, model.GetLogsAggregatesResponse{})
}
func parseAgentConfigVersion(r *http.Request) (int, error) {
versionString := mux.Vars(r)["version"]
if versionString == "latest" {
return -1, nil
}
version64, err := strconv.ParseInt(versionString, 0, 8)
if err != nil {
return 0, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid version number")
}
if version64 <= 0 {
return 0, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid version number")
}
return int(version64), nil
}
func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
req := logparsingpipeline.PipelinesPreviewRequest{}
@@ -4079,7 +4098,7 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
return
}
version, err := impllogspipeline.ParseAgentConfigVersion(r)
version, err := parseAgentConfigVersion(r)
if err != nil {
render.Error(w, err)
return

View File

@@ -13,8 +13,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -68,7 +66,6 @@ type Modules struct {
SpanPercentile spanpercentile.Module
MetricsExplorer metricsexplorer.Module
Promote promote.Module
LogsPipeline logspipeline.Module
}
func NewModules(
@@ -113,6 +110,5 @@ func NewModules(
Services: implservices.NewModule(querier, telemetryStore),
MetricsExplorer: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, ruleStore, dashboard, providerSettings, config.MetricsExplorer),
Promote: implpromote.NewModule(telemetryMetadataStore, telemetryStore),
LogsPipeline: impllogspipeline.NewModule(sqlstore),
}
}

View File

@@ -18,7 +18,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/authdomain"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/fields"
"github.com/SigNoz/signoz/pkg/modules/logspipeline"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/preference"
@@ -60,7 +59,6 @@ func NewOpenAPI(ctx context.Context, instrumentation instrumentation.Instrumenta
struct{ authz.Handler }{},
struct{ zeus.Handler }{},
struct{ querier.Handler }{},
struct{ logspipeline.Handler }{},
).New(ctx, instrumentation.ToProviderSettings(), apiserver.Config{})
if err != nil {
return nil, err

View File

@@ -23,7 +23,6 @@ import (
"github.com/SigNoz/signoz/pkg/global"
"github.com/SigNoz/signoz/pkg/global/signozglobal"
"github.com/SigNoz/signoz/pkg/modules/authdomain/implauthdomain"
"github.com/SigNoz/signoz/pkg/modules/logspipeline/impllogspipeline"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
@@ -255,7 +254,6 @@ func NewAPIServerProviderFactories(orgGetter organization.Getter, authz authz.Au
handlers.AuthzHandler,
handlers.ZeusHandler,
handlers.QuerierHandler,
impllogspipeline.NewHandler(modules.LogsPipeline),
),
)
}

View File

@@ -26,6 +26,9 @@ const (
IncreaseMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, multiIf(row_number() OVER rate_window = 1, nan, (%s - lagInFrame(%s, 1) OVER rate_window) < 0, %s, (%s - lagInFrame(%s, 1) OVER rate_window))) AS per_series_value`
OthersMultiTemporality = `IF(LOWER(temporality) LIKE LOWER('delta'), %s, %s) AS per_series_value`
RateWithStartTs = "multiIf(row_number() OVER rate_window = 1 AND earliest_start_ts < %d, NAN, row_number() OVER rate_window = 1, sum_all_values / (ts - earliest_start_ts), earliest_start_ts = lagInFrame(latest_start_ts, 1) OVER rate_window, (sum_all_values - lagInFrame(latest_value, 1) OVER rate_window) / (ts - lagInFrame(ts, 1) OVER rate_window), sum_all_values / (ts - lagInFrame(ts, 1) OVER rate_window)) AS per_series_value"
IncreaseWithStartTs = "multiIf(row_number() OVER rate_window = 1 AND earliest_start_ts < %d, NAN, row_number() OVER rate_window = 1, sum_all_values, earliest_start_ts = lagInFrame(latest_start_ts, 1) OVER rate_window, sum_all_values - lagInFrame(latest_value, 1) OVER rate_window, sum_all_values) AS per_series_value"
)
type MetricQueryStatementBuilder struct {
@@ -330,6 +333,9 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggregationCTE(
if query.Aggregations[0].Temporality == metrictypes.Delta {
return b.buildTemporalAggDelta(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
} else if query.Aggregations[0].Temporality != metrictypes.Multiple {
if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationIncrease || query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate {
return b.buildTemporalAggCumulativeOrUnspecifiedWithStartTs(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
}
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
}
return b.buildTemporalAggForMultipleTemporalities(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
@@ -382,6 +388,115 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggDelta(
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
}
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecifiedWithStartTs(
ctx context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if tbl == SamplesV4TableName {
return b.buildTemporalAggCumulativeOrUnspecifiedWithStartTsForSamplesTbl(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
}
return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query, timeSeriesCTE, timeSeriesCTEArgs)
}
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecifiedWithStartTsForSamplesTbl(
_ context.Context,
start, end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
stepSec := int64(query.StepInterval.Seconds())
moreInfoQueryBuilder := sqlbuilder.NewSelectBuilder()
moreInfoQueryBuilder.Select("fingerprint")
moreInfoQueryBuilder.SelectMore(fmt.Sprintf(
"toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(%d)) AS ts",
stepSec,
))
moreInfoQueryBuilder.SelectMore("toDateTime(intDiv(start_timestamp_unix_milli, 1000)) AS start_ts")
for _, g := range query.GroupBy {
moreInfoQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
return "", nil, err
}
moreInfoQueryBuilder.SelectMore(fmt.Sprintf("%s AS max_value", aggCol))
colWithLatestValue, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, metrictypes.TimeAggregationLatest, query.Aggregations[0].TableHints)
if err != nil {
return "", nil, err
}
moreInfoQueryBuilder.SelectMore(fmt.Sprintf("%s AS latest_value", colWithLatestValue))
moreInfoQueryBuilder.SelectMore("max(unix_milli) AS latest_timestamp")
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
moreInfoQueryBuilder.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
moreInfoQueryBuilder.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
moreInfoQueryBuilder.Where(
moreInfoQueryBuilder.In("metric_name", query.Aggregations[0].MetricName),
moreInfoQueryBuilder.GTE("unix_milli", start),
moreInfoQueryBuilder.LT("unix_milli", end),
)
moreInfoQueryBuilder.GroupBy("fingerprint", "ts", "start_ts")
moreInfoQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
moreInfoPerRowQuery, moreInfoPerRowArgs := moreInfoQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse, timeSeriesCTEArgs...)
innerQueryBuilder := sqlbuilder.NewSelectBuilder()
innerQueryBuilder.Select("fingerprint")
innerQueryBuilder.SelectMore("ts")
for _, g := range query.GroupBy {
innerQueryBuilder.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
innerQueryBuilder.SelectMore("max(start_ts) AS latest_start_ts")
innerQueryBuilder.SelectMore("min(start_ts) AS earliest_start_ts")
innerQueryBuilder.SelectMore("sum(max_value) AS sum_all_values")
innerQueryBuilder.SelectMore("argMax(latest_value, latest_timestamp) AS latest_value")
innerQueryBuilder.From(fmt.Sprintf("(%s)", moreInfoPerRowQuery))
innerQueryBuilder.GroupBy("fingerprint", "ts")
innerQueryBuilder.GroupBy(querybuilder.GroupByKeys(query.GroupBy)...)
innerQuery, innerQueryArgs := innerQueryBuilder.BuildWithFlavor(sqlbuilder.ClickHouse, moreInfoPerRowArgs...)
switch query.Aggregations[0].TimeAggregation {
case metrictypes.TimeAggregationRate:
rateExpr := fmt.Sprintf(RateWithStartTs, start)
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
wrapped.SelectMore("latest_value")
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
wrapped.SelectMore(rateExpr)
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, moreInfoPerRowArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
case metrictypes.TimeAggregationIncrease:
incExpr := fmt.Sprintf(IncreaseWithStartTs, start)
wrapped := sqlbuilder.NewSelectBuilder()
wrapped.Select("ts")
wrapped.SelectMore("latest_value")
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
wrapped.SelectMore(incExpr)
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, moreInfoPerRowArgs...)
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", q), args, nil
default:
return fmt.Sprintf("__temporal_aggregation_cte AS (%s)", innerQuery), innerQueryArgs, nil
}
}
func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
_ context.Context,
start, end uint64,
@@ -389,6 +504,7 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
timeSeriesCTE string,
timeSeriesCTEArgs []any,
) (string, []any, error) {
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
stepSec := int64(query.StepInterval.Seconds())
baseSb := sqlbuilder.NewSelectBuilder()
@@ -400,6 +516,9 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
for _, g := range query.GroupBy {
baseSb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
if tbl != SamplesV4TableName {
baseSb.SelectMore("uniqMerge(num_start_timestamps) AS num_start_timestamps")
}
aggCol, err := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if err != nil {
@@ -407,7 +526,6 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
}
baseSb.SelectMore(fmt.Sprintf("%s AS per_series_value", aggCol))
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
baseSb.From(fmt.Sprintf("%s.%s AS points", DBName, tbl))
baseSb.JoinWithOption(sqlbuilder.InnerJoin, timeSeriesCTE, "points.fingerprint = filtered_time_series.fingerprint")
baseSb.Where(
@@ -428,6 +546,9 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
if tbl != SamplesV4TableName {
wrapped.SelectMore("num_start_timestamps > 1 AS has_restarts")
}
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", RateTmpl))
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
@@ -439,6 +560,9 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified(
for _, g := range query.GroupBy {
wrapped.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
if tbl != SamplesV4TableName {
wrapped.SelectMore("num_start_timestamps > 1 AS has_restarts")
}
wrapped.SelectMore(fmt.Sprintf("%s AS per_series_value", IncreaseTmpl))
wrapped.From(fmt.Sprintf("(%s) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)", innerQuery))
q, args := wrapped.BuildWithFlavor(sqlbuilder.ClickHouse, innerArgs...)
@@ -515,11 +639,12 @@ func (b *MetricQueryStatementBuilder) buildTemporalAggForMultipleTemporalities(
func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
_ context.Context,
_ uint64,
_ uint64,
start uint64,
end uint64,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
_ map[string][]*telemetrytypes.TelemetryFieldKey,
) (string, []any, error) {
tbl := WhichSamplesTableToUse(start, end, query.Aggregations[0].Type, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints)
if query.Aggregations[0].SpaceAggregation.IsZero() {
return "", nil, errors.Newf(
errors.TypeInvalidInput,
@@ -534,6 +659,9 @@ func (b *MetricQueryStatementBuilder) buildSpatialAggregationCTE(
sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name))
}
sb.SelectMore(fmt.Sprintf("%s(per_series_value) AS value", query.Aggregations[0].SpaceAggregation.StringValue()))
if tbl != SamplesV4TableName {
sb.SelectMore("max(has_restarts) AS has_restarts")
}
sb.From("__temporal_aggregation_cte")
sb.Where(sb.EQ("isNaN(per_series_value)", 0))
if query.Aggregations[0].ValueFilter != nil {

View File

@@ -10,7 +10,6 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
@@ -267,37 +266,6 @@ func (p *PostablePipeline) IsValid() error {
return nil
}
func (p *PostablePipeline) ToStoreablePipeline() (*StoreablePipeline, error) {
rawConfig, err := json.Marshal(p.Config)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to unmarshal postable pipeline config")
}
filter, err := json.Marshal(p.Filter)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to marshal postable pipeline filter")
}
identifier := valuer.GenerateUUID()
if p.ID != "" {
identifier, err = valuer.NewUUID(p.ID)
if err != nil {
return nil, errors.WithAdditionalf(err, "failed to parse postable pipeline id")
}
}
return &StoreablePipeline{
Identifiable: types.Identifiable{
ID: identifier,
},
OrderID: p.OrderID,
Enabled: p.Enabled,
Name: p.Name,
Alias: p.Alias,
Description: p.Description,
FilterString: string(filter),
ConfigJSON: string(rawConfig),
}, nil
}
func isValidOperator(op PipelineOperator) error {
if op.ID == "" {
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "PipelineOperator.ID is required")