Compare commits

...

6 Commits

Author SHA1 Message Date
grandwizard28
037d8e84cb refactor: fix lint 2026-03-16 15:20:24 +05:30
grandwizard28
5c37be16ce refactor: replace zap logger with slog across codebase 2026-03-16 15:12:58 +05:30
Ashwin Bhatkal
9689b847f0 chore: add slack notification on dequeue from merge queue (#10580)
* chore: add slack notification on merge queue failure

* chore: break type

* chore: update yaml

* chore: update yaml

* chore: update yaml

* chore: update yaml

* chore: update yaml

* chore: update yaml

* chore: update yaml

* chore: resolve comments
2026-03-16 07:12:19 +00:00
Vishal Sharma
15e5938e95 fix: add allInOneLightMode SVG for light mode (#10589) 2026-03-16 06:59:28 +00:00
Abhi kumar
c5ef455283 fix: added fix for panel setting scrollbar issue (#10587)
Some checks failed
build-staging / prepare (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
* fix: added fix for panel setting scrollbar issue

* fix: added changes for panel switch
2026-03-13 19:30:49 +00:00
Ishan
2316b5be83 Sig 3634 revert (#10578)
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
* Revert "Revert "feat: Option to zoom out OR reset zoom in the explorer pages (#10464)" (#10574)"

This reverts commit 5b8d5fbfd3.

* fix: stop bubble
2026-03-13 15:29:28 +00:00
77 changed files with 3750 additions and 893 deletions

59
.github/workflows/mergequeueci.yaml vendored Normal file
View File

@@ -0,0 +1,59 @@
name: mergequeueci
on:
pull_request:
types:
- dequeued
jobs:
notify:
runs-on: ubuntu-latest
steps:
- name: alert
uses: slackapi/slack-github-action@v2.1.1
with:
webhook: ${{ secrets.SLACK_MERGE_QUEUE_WEBHOOK }}
webhook-type: incoming-webhook
payload: |
{
"text": ":x: PR removed from merge queue",
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": ":x: PR Removed from Merge Queue"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*<${{ github.event.pull_request.html_url }}|PR #${{ github.event.pull_request.number }}: ${{ github.event.pull_request.title }}>*"
}
},
{
"type": "divider"
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Author*\n@${{ github.event.pull_request.user.login }}"
}
]
}
]
}
- name: comment
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
PR_NUMBER: ${{ github.event.pull_request.number }}
PR_AUTHOR: ${{ github.event.pull_request.user.login }}
PR_URL: ${{ github.event.pull_request.html_url }}
run: |
gh api repos/${{ github.repository }}/issues/$PR_NUMBER/comments \
-f body="> :x: **PR removed from merge queue**
>
> @$PR_AUTHOR your PR was removed from the merge queue. Fix the issue and re-queue when ready."

View File

@@ -6,7 +6,6 @@ import (
"github.com/SigNoz/signoz/pkg/version"
"github.com/spf13/cobra"
"go.uber.org/zap" //nolint:depguard
)
var RootCmd = &cobra.Command{
@@ -19,12 +18,6 @@ var RootCmd = &cobra.Command{
}
func Execute(logger *slog.Logger) {
zapLogger := newZapLogger()
zap.ReplaceGlobals(zapLogger)
defer func() {
_ = zapLogger.Sync()
}()
err := RootCmd.Execute()
if err != nil {
logger.ErrorContext(RootCmd.Context(), "error running command", "error", err)

View File

@@ -1,110 +0,0 @@
package cmd
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"go.uber.org/zap" //nolint:depguard
"go.uber.org/zap/zapcore" //nolint:depguard
)
// Deprecated: Use `NewLogger` from `pkg/instrumentation` instead.
func newZapLogger() *zap.Logger {
config := zap.NewProductionConfig()
config.EncoderConfig.TimeKey = "timestamp"
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
// Extract sampling config before building the logger.
// We need to disable sampling in the config and apply it manually later
// to ensure correct core ordering. See filteringCore documentation for details.
samplerConfig := config.Sampling
config.Sampling = nil
logger, _ := config.Build()
// Wrap with custom core wrapping to filter certain log entries.
// The order of wrapping is important:
// 1. First wrap with filteringCore
// 2. Then wrap with sampler
//
// This creates the call chain: sampler -> filteringCore -> ioCore
//
// During logging:
// - sampler.Check decides whether to sample the log entry
// - If sampled, filteringCore.Check is called
// - filteringCore adds itself to CheckedEntry.cores
// - All cores in CheckedEntry.cores have their Write method called
// - filteringCore.Write can now filter the entry before passing to ioCore
//
// If we didn't disable the sampler above, filteringCore would have wrapped
// sampler. By calling sampler.Check we would have allowed it to call
// ioCore.Check that adds itself to CheckedEntry.cores. Then ioCore.Write
// would have bypassed our checks, making filtering impossible.
return logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
core = &filteringCore{core}
if samplerConfig != nil {
core = zapcore.NewSamplerWithOptions(
core,
time.Second,
samplerConfig.Initial,
samplerConfig.Thereafter,
)
}
return core
}))
}
// filteringCore wraps a zapcore.Core to filter out log entries based on a
// custom logic.
//
// Note: This core must be positioned before the sampler in the core chain
// to ensure Write is called. See newZapLogger for ordering details.
type filteringCore struct {
zapcore.Core
}
// filter determines whether a log entry should be written based on its fields.
// Returns false if the entry should be suppressed, true otherwise.
//
// Current filters:
// - context.Canceled: These are expected errors from cancelled operations,
// and create noise in logs.
func (c *filteringCore) filter(fields []zapcore.Field) bool {
for _, field := range fields {
if field.Type == zapcore.ErrorType {
if loggedErr, ok := field.Interface.(error); ok {
// Suppress logs containing context.Canceled errors
if errors.Is(loggedErr, context.Canceled) {
return false
}
}
}
}
return true
}
// With implements zapcore.Core.With
// It returns a new copy with the added context.
func (c *filteringCore) With(fields []zapcore.Field) zapcore.Core {
return &filteringCore{c.Core.With(fields)}
}
// Check implements zapcore.Core.Check.
// It adds this core to the CheckedEntry if the log level is enabled,
// ensuring that Write will be called for this entry.
func (c *filteringCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if c.Enabled(ent.Level) {
return ce.AddCore(ent, c)
}
return ce
}
// Write implements zapcore.Core.Write.
// It filters log entries based on their fields before delegating to the wrapped core.
func (c *filteringCore) Write(ent zapcore.Entry, fields []zapcore.Field) error {
if !c.filter(fields) {
return nil
}
return c.Core.Write(ent, fields)
}

View File

@@ -2,6 +2,7 @@ package anomaly
import (
"context"
"log/slog"
"math"
"time"
@@ -13,7 +14,6 @@ import (
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
)
var (
@@ -67,7 +67,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, orgID valuer.UUID
instrumentationtypes.CodeNamespace: "anomaly",
instrumentationtypes.CodeFunctionName: "getResults",
})
zap.L().Info("fetching results for current period", zap.Any("currentPeriodQuery", params.CurrentPeriodQuery))
slog.InfoContext(ctx, "fetching results for current period", "current_period_query", params.CurrentPeriodQuery)
currentPeriodResults, _, err := p.querierV2.QueryRange(ctx, orgID, params.CurrentPeriodQuery)
if err != nil {
return nil, err
@@ -78,7 +78,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, orgID valuer.UUID
return nil, err
}
zap.L().Info("fetching results for past period", zap.Any("pastPeriodQuery", params.PastPeriodQuery))
slog.InfoContext(ctx, "fetching results for past period", "past_period_query", params.PastPeriodQuery)
pastPeriodResults, _, err := p.querierV2.QueryRange(ctx, orgID, params.PastPeriodQuery)
if err != nil {
return nil, err
@@ -89,7 +89,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, orgID valuer.UUID
return nil, err
}
zap.L().Info("fetching results for current season", zap.Any("currentSeasonQuery", params.CurrentSeasonQuery))
slog.InfoContext(ctx, "fetching results for current season", "current_season_query", params.CurrentSeasonQuery)
currentSeasonResults, _, err := p.querierV2.QueryRange(ctx, orgID, params.CurrentSeasonQuery)
if err != nil {
return nil, err
@@ -100,7 +100,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, orgID valuer.UUID
return nil, err
}
zap.L().Info("fetching results for past season", zap.Any("pastSeasonQuery", params.PastSeasonQuery))
slog.InfoContext(ctx, "fetching results for past season", "past_season_query", params.PastSeasonQuery)
pastSeasonResults, _, err := p.querierV2.QueryRange(ctx, orgID, params.PastSeasonQuery)
if err != nil {
return nil, err
@@ -111,7 +111,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, orgID valuer.UUID
return nil, err
}
zap.L().Info("fetching results for past 2 season", zap.Any("past2SeasonQuery", params.Past2SeasonQuery))
slog.InfoContext(ctx, "fetching results for past 2 season", "past_2_season_query", params.Past2SeasonQuery)
past2SeasonResults, _, err := p.querierV2.QueryRange(ctx, orgID, params.Past2SeasonQuery)
if err != nil {
return nil, err
@@ -122,7 +122,7 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, orgID valuer.UUID
return nil, err
}
zap.L().Info("fetching results for past 3 season", zap.Any("past3SeasonQuery", params.Past3SeasonQuery))
slog.InfoContext(ctx, "fetching results for past 3 season", "past_3_season_query", params.Past3SeasonQuery)
past3SeasonResults, _, err := p.querierV2.QueryRange(ctx, orgID, params.Past3SeasonQuery)
if err != nil {
return nil, err
@@ -235,17 +235,17 @@ func (p *BaseSeasonalProvider) getPredictedSeries(
if predictedValue < 0 {
// this should not happen (except when the data has extreme outliers)
// we will use the moving avg of the previous period series in this case
zap.L().Warn("predictedValue is less than 0", zap.Float64("predictedValue", predictedValue), zap.Any("labels", series.Labels))
slog.Warn("predicted value is less than 0", "predicted_value", predictedValue, "labels", series.Labels)
predictedValue = p.getMovingAvg(prevSeries, movingAvgWindowSize, idx)
}
zap.L().Debug("predictedSeries",
zap.Float64("movingAvg", movingAvg),
zap.Float64("avg", avg),
zap.Float64("mean", mean),
zap.Any("labels", series.Labels),
zap.Float64("predictedValue", predictedValue),
zap.Float64("curr", curr.Value),
slog.Debug("predicted series",
"moving_avg", movingAvg,
"avg", avg,
"mean", mean,
"labels", series.Labels,
"predicted_value", predictedValue,
"curr", curr.Value,
)
predictedSeries.Points = append(predictedSeries.Points, v3.Point{
Timestamp: curr.Timestamp,
@@ -418,7 +418,7 @@ func (p *BaseSeasonalProvider) getAnomalies(ctx context.Context, orgID valuer.UU
for _, series := range result.Series {
stdDev := p.getStdDev(series)
zap.L().Info("stdDev", zap.Float64("stdDev", stdDev), zap.Any("labels", series.Labels))
slog.InfoContext(ctx, "computed standard deviation", "std_dev", stdDev, "labels", series.Labels)
pastPeriodSeries := p.getMatchingSeries(pastPeriodResult, series)
currentSeasonSeries := p.getMatchingSeries(currentSeasonResult, series)
@@ -431,7 +431,7 @@ func (p *BaseSeasonalProvider) getAnomalies(ctx context.Context, orgID valuer.UU
pastSeasonSeriesAvg := p.getAvg(pastSeasonSeries)
past2SeasonSeriesAvg := p.getAvg(past2SeasonSeries)
past3SeasonSeriesAvg := p.getAvg(past3SeasonSeries)
zap.L().Info("getAvg", zap.Float64("prevSeriesAvg", prevSeriesAvg), zap.Float64("currentSeasonSeriesAvg", currentSeasonSeriesAvg), zap.Float64("pastSeasonSeriesAvg", pastSeasonSeriesAvg), zap.Float64("past2SeasonSeriesAvg", past2SeasonSeriesAvg), zap.Float64("past3SeasonSeriesAvg", past3SeasonSeriesAvg), zap.Any("labels", series.Labels))
slog.InfoContext(ctx, "computed averages", "prev_series_avg", prevSeriesAvg, "current_season_series_avg", currentSeasonSeriesAvg, "past_season_series_avg", pastSeasonSeriesAvg, "past_2_season_series_avg", past2SeasonSeriesAvg, "past_3_season_series_avg", past3SeasonSeriesAvg, "labels", series.Labels)
predictedSeries := p.getPredictedSeries(
series,

View File

@@ -18,7 +18,7 @@ import (
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
"go.uber.org/zap"
"log/slog"
)
type CloudIntegrationConnectionParamsResponse struct {
@@ -71,7 +71,7 @@ func (ah *APIHandler) CloudIntegrationsGenerateConnectionParams(w http.ResponseW
// Return the API Key (PAT) even if the rest of the params can not be deduced.
// Params not returned from here will be requested from the user via form inputs.
// This enables gracefully degraded but working experience even for non-cloud deployments.
zap.L().Info("ingestion params and signoz api url can not be deduced since no license was found")
slog.InfoContext(r.Context(), "ingestion params and signoz api url can not be deduced since no license was found")
ah.Respond(w, result)
return
}
@@ -103,7 +103,7 @@ func (ah *APIHandler) CloudIntegrationsGenerateConnectionParams(w http.ResponseW
result.IngestionKey = ingestionKey
} else {
zap.L().Info("ingestion key can't be deduced since no gateway url has been configured")
slog.InfoContext(r.Context(), "ingestion key can't be deduced since no gateway url has been configured")
}
ah.Respond(w, result)
@@ -138,9 +138,8 @@ func (ah *APIHandler) getOrCreateCloudIntegrationPAT(ctx context.Context, orgId
}
}
zap.L().Info(
"no PAT found for cloud integration, creating a new one",
zap.String("cloudProvider", cloudProvider),
slog.InfoContext(ctx, "no PAT found for cloud integration, creating a new one",
"cloud_provider", cloudProvider,
)
newPAT, err := types.NewStorableAPIKey(
@@ -287,9 +286,8 @@ func getOrCreateCloudProviderIngestionKey(
}
}
zap.L().Info(
"no existing ingestion key found for cloud integration, creating a new one",
zap.String("cloudProvider", cloudProvider),
slog.InfoContext(ctx, "no existing ingestion key found for cloud integration, creating a new one",
"cloud_provider", cloudProvider,
)
createKeyResult, apiErr := requestGateway[createIngestionKeyResponse](
ctx, gatewayUrl, licenseKey, "/v1/workspaces/me/keys",

View File

@@ -15,7 +15,7 @@ import (
"github.com/SigNoz/signoz/pkg/types/featuretypes"
"github.com/SigNoz/signoz/pkg/types/licensetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
"log/slog"
)
func (ah *APIHandler) getFeatureFlags(w http.ResponseWriter, r *http.Request) {
@@ -35,23 +35,23 @@ func (ah *APIHandler) getFeatureFlags(w http.ResponseWriter, r *http.Request) {
}
if constants.FetchFeatures == "true" {
zap.L().Debug("fetching license")
slog.DebugContext(ctx, "fetching license")
license, err := ah.Signoz.Licensing.GetActive(ctx, orgID)
if err != nil {
zap.L().Error("failed to fetch license", zap.Error(err))
slog.ErrorContext(ctx, "failed to fetch license", "error", err)
} else if license == nil {
zap.L().Debug("no active license found")
slog.DebugContext(ctx, "no active license found")
} else {
licenseKey := license.Key
zap.L().Debug("fetching zeus features")
slog.DebugContext(ctx, "fetching zeus features")
zeusFeatures, err := fetchZeusFeatures(constants.ZeusFeaturesURL, licenseKey)
if err == nil {
zap.L().Debug("fetched zeus features", zap.Any("features", zeusFeatures))
slog.DebugContext(ctx, "fetched zeus features", "features", zeusFeatures)
// merge featureSet and zeusFeatures in featureSet with higher priority to zeusFeatures
featureSet = MergeFeatureSets(zeusFeatures, featureSet)
} else {
zap.L().Error("failed to fetch zeus features", zap.Error(err))
slog.ErrorContext(ctx, "failed to fetch zeus features", "error", err)
}
}
}

View File

@@ -14,7 +14,7 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
"log/slog"
)
func (aH *APIHandler) queryRangeV4(w http.ResponseWriter, r *http.Request) {
@@ -35,7 +35,7 @@ func (aH *APIHandler) queryRangeV4(w http.ResponseWriter, r *http.Request) {
queryRangeParams, apiErrorObj := baseapp.ParseQueryRangeParams(r)
if apiErrorObj != nil {
zap.L().Error("error parsing metric query range params", zap.Error(apiErrorObj.Err))
slog.ErrorContext(r.Context(), "error parsing metric query range params", "error", apiErrorObj.Err)
RespondError(w, apiErrorObj, nil)
return
}
@@ -44,7 +44,7 @@ func (aH *APIHandler) queryRangeV4(w http.ResponseWriter, r *http.Request) {
// add temporality for each metric
temporalityErr := aH.PopulateTemporality(r.Context(), orgID, queryRangeParams)
if temporalityErr != nil {
zap.L().Error("Error while adding temporality for metrics", zap.Error(temporalityErr))
slog.ErrorContext(r.Context(), "error while adding temporality for metrics", "error", temporalityErr)
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
return
}

View File

@@ -47,7 +47,7 @@ import (
baseint "github.com/SigNoz/signoz/pkg/query-service/interfaces"
baserules "github.com/SigNoz/signoz/pkg/query-service/rules"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"go.uber.org/zap"
"log/slog"
)
// Server runs HTTP, Mux and a grpc server
@@ -83,6 +83,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
}
reader := clickhouseReader.NewReader(
signoz.Instrumentation.Logger(),
signoz.SQLStore,
signoz.TelemetryStore,
signoz.Prometheus,
@@ -278,7 +279,7 @@ func (s *Server) initListeners() error {
return err
}
zap.L().Info(fmt.Sprintf("Query server started listening on %s...", s.httpHostPort))
slog.Info(fmt.Sprintf("Query server started listening on %s...", s.httpHostPort))
return nil
}
@@ -298,31 +299,31 @@ func (s *Server) Start(ctx context.Context) error {
}
go func() {
zap.L().Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.httpHostPort))
slog.Info("Starting HTTP server", "port", httpPort, "addr", s.httpHostPort)
switch err := s.httpServer.Serve(s.httpConn); err {
case nil, http.ErrServerClosed, cmux.ErrListenerClosed:
// normal exit, nothing to do
default:
zap.L().Error("Could not start HTTP server", zap.Error(err))
slog.Error("Could not start HTTP server", "error", err)
}
s.unavailableChannel <- healthcheck.Unavailable
}()
go func() {
zap.L().Info("Starting pprof server", zap.String("addr", baseconst.DebugHttpPort))
slog.Info("Starting pprof server", "addr", baseconst.DebugHttpPort)
err = http.ListenAndServe(baseconst.DebugHttpPort, nil)
if err != nil {
zap.L().Error("Could not start pprof server", zap.Error(err))
slog.Error("Could not start pprof server", "error", err)
}
}()
go func() {
zap.L().Info("Starting OpAmp Websocket server", zap.String("addr", baseconst.OpAmpWsEndpoint))
slog.Info("Starting OpAmp Websocket server", "addr", baseconst.OpAmpWsEndpoint)
err := s.opampServer.Start(baseconst.OpAmpWsEndpoint)
if err != nil {
zap.L().Error("opamp ws server failed to start", zap.Error(err))
slog.Error("opamp ws server failed to start", "error", err)
s.unavailableChannel <- healthcheck.Unavailable
}
}()
@@ -358,10 +359,9 @@ func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertma
MetadataStore: metadataStore,
Prometheus: prometheus,
Context: context.Background(),
Logger: zap.L(),
Reader: ch,
Querier: querier,
SLogger: providerSettings.Logger,
Logger: providerSettings.Logger,
Cache: cache,
EvalDelay: baseconst.GetEvalDelay(),
PrepareTaskFunc: rules.PrepareTaskFunc,
@@ -380,7 +380,7 @@ func makeRulesManager(ch baseint.Reader, cache cache.Cache, alertmanager alertma
return nil, fmt.Errorf("rule manager error: %v", err)
}
zap.L().Info("rules manager is ready")
slog.Info("rules manager is ready")
return manager, nil
}

View File

@@ -2,6 +2,7 @@ package rules
import (
"context"
"log/slog"
"testing"
"time"
@@ -116,7 +117,7 @@ func TestAnomalyRule_NoData_AlertOnAbsent(t *testing.T) {
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, nil)
options := clickhouseReader.NewOptions("primaryNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, nil, "", time.Second, nil, nil, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, nil, "", time.Second, nil, nil, options)
rule, err := NewAnomalyRule(
"test-anomaly-rule",
@@ -247,7 +248,7 @@ func TestAnomalyRule_NoData_AbsentFor(t *testing.T) {
telemetryStore := telemetrystoretest.New(telemetrystore.Config{}, nil)
options := clickhouseReader.NewOptions("primaryNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, nil, "", time.Second, nil, nil, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, nil, "", time.Second, nil, nil, options)
rule, err := NewAnomalyRule("test-anomaly-rule", valuer.GenerateUUID(), &postableRule, reader, nil, logger, nil)
require.NoError(t, err)

View File

@@ -13,7 +13,7 @@ import (
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"go.uber.org/zap"
"log/slog"
)
func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) {
@@ -34,7 +34,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Rule,
opts.Reader,
opts.Querier,
opts.SLogger,
opts.Logger,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
baserules.WithQueryParser(opts.ManagerOpts.QueryParser),
@@ -57,7 +57,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
ruleId,
opts.OrgID,
opts.Rule,
opts.SLogger,
opts.Logger,
opts.Reader,
opts.ManagerOpts.Prometheus,
baserules.WithSQLStore(opts.SQLStore),
@@ -82,7 +82,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
opts.Rule,
opts.Reader,
opts.Querier,
opts.SLogger,
opts.Logger,
opts.Cache,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
@@ -142,7 +142,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
parsedRule,
opts.Reader,
opts.Querier,
opts.SLogger,
opts.Logger,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),
@@ -151,7 +151,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
)
if err != nil {
zap.L().Error("failed to prepare a new threshold rule for test", zap.String("name", alertname), zap.Error(err))
slog.Error("failed to prepare a new threshold rule for test", "name", alertname, "error", err)
return 0, basemodel.BadRequest(err)
}
@@ -162,7 +162,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
alertname,
opts.OrgID,
parsedRule,
opts.SLogger,
opts.Logger,
opts.Reader,
opts.ManagerOpts.Prometheus,
baserules.WithSendAlways(),
@@ -173,7 +173,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
)
if err != nil {
zap.L().Error("failed to prepare a new promql rule for test", zap.String("name", alertname), zap.Error(err))
slog.Error("failed to prepare a new promql rule for test", "name", alertname, "error", err)
return 0, basemodel.BadRequest(err)
}
} else if parsedRule.RuleType == ruletypes.RuleTypeAnomaly {
@@ -184,7 +184,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
parsedRule,
opts.Reader,
opts.Querier,
opts.SLogger,
opts.Logger,
opts.Cache,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
@@ -193,7 +193,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
baserules.WithMetadataStore(opts.ManagerOpts.MetadataStore),
)
if err != nil {
zap.L().Error("failed to prepare a new anomaly rule for test", zap.String("name", alertname), zap.Error(err))
slog.Error("failed to prepare a new anomaly rule for test", "name", alertname, "error", err)
return 0, basemodel.BadRequest(err)
}
} else {
@@ -205,7 +205,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
alertsFound, err := rule.Eval(ctx, ts)
if err != nil {
zap.L().Error("evaluating rule failed", zap.String("rule", rule.Name()), zap.Error(err))
slog.Error("evaluating rule failed", "rule", rule.Name(), "error", err)
return 0, basemodel.InternalError(fmt.Errorf("rule evaluation failed"))
}
rule.SendAlerts(ctx, ts, 0, time.Minute, opts.NotifyFunc)

View File

@@ -8,12 +8,12 @@ import (
"sync/atomic"
"time"
"log/slog"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/go-co-op/gocron"
"github.com/google/uuid"
"go.uber.org/zap"
"github.com/SigNoz/signoz/ee/query-service/model"
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -76,19 +76,19 @@ func (lm *Manager) Start(ctx context.Context) error {
func (lm *Manager) UploadUsage(ctx context.Context) {
organizations, err := lm.orgGetter.ListByOwnedKeyRange(ctx)
if err != nil {
zap.L().Error("failed to get organizations", zap.Error(err))
slog.ErrorContext(ctx, "failed to get organizations", "error", err)
return
}
for _, organization := range organizations {
// check if license is present or not
license, err := lm.licenseService.GetActive(ctx, organization.ID)
if err != nil {
zap.L().Error("failed to get active license", zap.Error(err))
slog.ErrorContext(ctx, "failed to get active license", "error", err)
return
}
if license == nil {
// we will not start the usage reporting if license is not present.
zap.L().Info("no license present, skipping usage reporting")
slog.InfoContext(ctx, "no license present, skipping usage reporting")
return
}
@@ -115,7 +115,7 @@ func (lm *Manager) UploadUsage(ctx context.Context) {
dbusages := []model.UsageDB{}
err := lm.clickhouseConn.Select(ctx, &dbusages, fmt.Sprintf(query, db, db), time.Now().Add(-(24 * time.Hour)))
if err != nil && !strings.Contains(err.Error(), "doesn't exist") {
zap.L().Error("failed to get usage from clickhouse: %v", zap.Error(err))
slog.ErrorContext(ctx, "failed to get usage from clickhouse", "error", err)
return
}
for _, u := range dbusages {
@@ -125,24 +125,24 @@ func (lm *Manager) UploadUsage(ctx context.Context) {
}
if len(usages) <= 0 {
zap.L().Info("no snapshots to upload, skipping.")
slog.InfoContext(ctx, "no snapshots to upload, skipping")
return
}
zap.L().Info("uploading usage data")
slog.InfoContext(ctx, "uploading usage data")
usagesPayload := []model.Usage{}
for _, usage := range usages {
usageDataBytes, err := encryption.Decrypt([]byte(usage.ExporterID[:32]), []byte(usage.Data))
if err != nil {
zap.L().Error("error while decrypting usage data: %v", zap.Error(err))
slog.ErrorContext(ctx, "error while decrypting usage data", "error", err)
return
}
usageData := model.Usage{}
err = json.Unmarshal(usageDataBytes, &usageData)
if err != nil {
zap.L().Error("error while unmarshalling usage data: %v", zap.Error(err))
slog.ErrorContext(ctx, "error while unmarshalling usage data", "error", err)
return
}
@@ -163,13 +163,13 @@ func (lm *Manager) UploadUsage(ctx context.Context) {
body, errv2 := json.Marshal(payload)
if errv2 != nil {
zap.L().Error("error while marshalling usage payload: %v", zap.Error(errv2))
slog.ErrorContext(ctx, "error while marshalling usage payload", "error", errv2)
return
}
errv2 = lm.zeus.PutMeters(ctx, payload.LicenseKey.String(), body)
if errv2 != nil {
zap.L().Error("failed to upload usage: %v", zap.Error(errv2))
slog.ErrorContext(ctx, "failed to upload usage", "error", errv2)
// not returning error here since it is captured in the failed count
return
}
@@ -179,7 +179,7 @@ func (lm *Manager) UploadUsage(ctx context.Context) {
func (lm *Manager) Stop(ctx context.Context) {
lm.scheduler.Stop()
zap.L().Info("sending usage data before shutting down")
slog.InfoContext(ctx, "sending usage data before shutting down")
// send usage before shutting down
lm.UploadUsage(ctx)
atomic.StoreUint32(&locker, stateUnlocked)

File diff suppressed because it is too large Load Diff

After

Width:  |  Height:  |  Size: 214 KiB

View File

@@ -1,6 +1,31 @@
.custom-time-picker {
display: flex;
flex-direction: column;
flex-direction: row;
align-items: center;
gap: 4px;
.zoom-out-btn {
display: flex;
align-items: center;
justify-content: center;
flex-shrink: 0;
color: var(--foreground);
border: 1px solid var(--border);
border-radius: 2px;
box-shadow: none;
padding: 10px;
height: 33px;
&:hover:not(:disabled) {
color: var(--bg-vanilla-100);
background: var(--primary);
}
&:disabled {
opacity: 0.5;
cursor: not-allowed;
}
}
.timeSelection-input {
&:hover {

View File

@@ -16,6 +16,15 @@ jest.mock('react-router-dom', () => {
};
});
jest.mock('react-redux', () => ({
...jest.requireActual('react-redux'),
useDispatch: jest.fn(() => jest.fn()),
useSelector: jest.fn(() => ({
minTime: 0,
maxTime: Date.now(),
})),
}));
jest.mock('providers/Timezone', () => {
const actual = jest.requireActual('providers/Timezone');

View File

@@ -7,9 +7,11 @@ import {
useState,
} from 'react';
import { useLocation } from 'react-router-dom';
import { Button } from '@signozhq/button';
import { Input, InputRef, Popover, Tooltip } from 'antd';
import cx from 'classnames';
import { DATE_TIME_FORMATS } from 'constants/dateTimeFormats';
import { QueryParams } from 'constants/query';
import { DateTimeRangeType } from 'container/TopNav/CustomDateTimeModal';
import {
FixedDurationSuggestionOptions,
@@ -17,9 +19,11 @@ import {
RelativeDurationSuggestionOptions,
} from 'container/TopNav/DateTimeSelectionV2/constants';
import dayjs from 'dayjs';
import { useZoomOut } from 'hooks/useZoomOut';
import { isValidShortHandDateTimeFormat } from 'lib/getMinMax';
import { isZoomOutDisabled } from 'lib/zoomOutUtils';
import { defaultTo, isFunction, noop } from 'lodash-es';
import { ChevronDown, ChevronUp } from 'lucide-react';
import { ChevronDown, ChevronUp, ZoomOut } from 'lucide-react';
import { useTimezone } from 'providers/Timezone';
import { getTimeDifference, validateEpochRange } from 'utils/epochUtils';
import { popupContainer } from 'utils/selectPopupContainer';
@@ -66,6 +70,8 @@ interface CustomTimePickerProps {
showRecentlyUsed?: boolean;
minTime: number;
maxTime: number;
/** When true, zoom-out button is hidden (e.g. in drawer/modal time selection) */
isModalTimeSelection?: boolean;
}
function CustomTimePicker({
@@ -88,6 +94,7 @@ function CustomTimePicker({
showRecentlyUsed = true,
minTime,
maxTime,
isModalTimeSelection = false,
}: CustomTimePickerProps): JSX.Element {
const [
selectedTimePlaceholderValue,
@@ -116,6 +123,14 @@ function CustomTimePicker({
const [isOpenedFromFooter, setIsOpenedFromFooter] = useState(false);
const durationMs = (maxTime - minTime) / 1e6;
const zoomOutDisabled = showLiveLogs || isZoomOutDisabled(durationMs);
const handleZoomOut = useZoomOut({
isDisabled: zoomOutDisabled,
urlParamsToDelete: [QueryParams.activeLogId],
});
// function to get selected time in Last 1m, Last 2h, Last 3d, Last 4w format
// 1m, 2h, 3d, 4w -> Last 1 minute, Last 2 hours, Last 3 days, Last 4 weeks
const getSelectedTimeRangeLabelInRelativeFormat = (
@@ -282,7 +297,11 @@ function CustomTimePicker({
resetErrorStatus();
};
const handleInputPressEnter = (): void => {
const handleInputPressEnter = (
event?: React.KeyboardEvent<HTMLInputElement>,
): void => {
event?.preventDefault();
event?.stopPropagation();
// check if the entered time is in the format of 1m, 2h, 3d, 4w
const isTimeDurationShortHandFormat = /^(\d+)([mhdw])$/.test(inputValue);
@@ -631,6 +650,23 @@ function CustomTimePicker({
/>
</Popover>
</Tooltip>
{!showLiveLogs && !isModalTimeSelection && (
<Tooltip
title={
zoomOutDisabled ? 'Zoom out time range is limited to 1 month' : 'Zoom out'
}
>
<span>
<Button
className="zoom-out-btn"
onClick={handleZoomOut}
disabled={zoomOutDisabled}
data-testid="zoom-out-btn"
prefixIcon={<ZoomOut size={14} />}
/>
</span>
</Tooltip>
)}
</div>
);
}

View File

@@ -0,0 +1,169 @@
import { render, screen } from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import { QueryParams } from 'constants/query';
import { GlobalReducer } from 'types/reducer/globalTime';
import CustomTimePicker from '../CustomTimePicker';
const MS_PER_MIN = 60 * 1000;
const NOW_MS = 1705312800000;
const mockDispatch = jest.fn();
const mockSafeNavigate = jest.fn();
const mockUrlQueryDelete = jest.fn();
const mockUrlQuerySet = jest.fn();
interface MockAppState {
globalTime: Pick<GlobalReducer, 'minTime' | 'maxTime'>;
}
jest.mock('react-redux', () => ({
useDispatch: (): jest.Mock => mockDispatch,
useSelector: (selector: (state: MockAppState) => unknown): unknown => {
const mockState: MockAppState = {
globalTime: {
minTime: (NOW_MS - 15 * MS_PER_MIN) * 1e6,
maxTime: NOW_MS * 1e6,
},
};
return selector(mockState);
},
}));
jest.mock('hooks/useSafeNavigate', () => ({
useSafeNavigate: (): { safeNavigate: jest.Mock } => ({
safeNavigate: mockSafeNavigate,
}),
}));
interface MockUrlQuery {
delete: typeof mockUrlQueryDelete;
set: typeof mockUrlQuerySet;
get: () => null;
toString: () => string;
}
jest.mock('hooks/useUrlQuery', () => ({
__esModule: true,
default: (): MockUrlQuery => ({
delete: mockUrlQueryDelete,
set: mockUrlQuerySet,
get: (): null => null,
toString: (): string => 'relativeTime=45m',
}),
}));
jest.mock('providers/Timezone', () => ({
useTimezone: (): { timezone: { value: string; offset: string } } => ({
timezone: { value: 'UTC', offset: 'UTC' },
}),
}));
jest.mock('react-router-dom', () => ({
useLocation: (): { pathname: string } => ({ pathname: '/logs-explorer' }),
}));
const MS_PER_DAY = 24 * 60 * 60 * 1000;
const now = Date.now();
const defaultProps = {
onSelect: jest.fn(),
onError: jest.fn(),
selectedValue: '15m',
selectedTime: '15m',
onValidCustomDateChange: jest.fn(),
open: false,
setOpen: jest.fn(),
items: [
{ value: '15m', label: 'Last 15 minutes' },
{ value: '1h', label: 'Last 1 hour' },
],
minTime: (now - 15 * 60 * 1000) * 1e6,
maxTime: now * 1e6,
};
describe('CustomTimePicker - zoom out button', () => {
beforeEach(() => {
jest.clearAllMocks();
jest.spyOn(Date, 'now').mockReturnValue(NOW_MS);
});
afterEach(() => {
jest.restoreAllMocks();
});
it('should render zoom out button when showLiveLogs is false', () => {
render(<CustomTimePicker {...defaultProps} showLiveLogs={false} />);
expect(screen.getByTestId('zoom-out-btn')).toBeInTheDocument();
});
it('should not render zoom out button when showLiveLogs is true', () => {
render(<CustomTimePicker {...defaultProps} showLiveLogs={true} />);
expect(screen.queryByTestId('zoom-out-btn')).not.toBeInTheDocument();
});
it('should not render zoom out button when isModalTimeSelection is true', () => {
render(
<CustomTimePicker
{...defaultProps}
showLiveLogs={false}
isModalTimeSelection={true}
/>,
);
expect(screen.queryByTestId('zoom-out-btn')).not.toBeInTheDocument();
});
it('should call handleZoomOut when zoom out button is clicked', async () => {
render(<CustomTimePicker {...defaultProps} showLiveLogs={false} />);
const zoomOutBtn = screen.getByTestId('zoom-out-btn');
await userEvent.click(zoomOutBtn);
expect(mockDispatch).toHaveBeenCalled();
expect(mockUrlQuerySet).toHaveBeenCalledWith(QueryParams.relativeTime, '45m');
expect(mockSafeNavigate).toHaveBeenCalledWith(
expect.stringMatching(/\/logs-explorer\?relativeTime=45m/),
);
});
it('should use real ladder logic: 15m range zooms to 45m preset and updates URL', async () => {
render(<CustomTimePicker {...defaultProps} showLiveLogs={false} />);
const zoomOutBtn = screen.getByTestId('zoom-out-btn');
await userEvent.click(zoomOutBtn);
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.startTime);
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.endTime);
expect(mockUrlQuerySet).toHaveBeenCalledWith(QueryParams.relativeTime, '45m');
expect(mockSafeNavigate).toHaveBeenCalledWith(
expect.stringMatching(/\/logs-explorer\?relativeTime=45m/),
);
expect(mockDispatch).toHaveBeenCalled();
});
it('should delete activeLogId when zoom out is clicked', async () => {
render(<CustomTimePicker {...defaultProps} showLiveLogs={false} />);
const zoomOutBtn = screen.getByTestId('zoom-out-btn');
await userEvent.click(zoomOutBtn);
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.activeLogId);
});
it('should disable zoom button when time range is >= 1 month', () => {
const now = Date.now();
render(
<CustomTimePicker
{...defaultProps}
minTime={(now - 31 * MS_PER_DAY) * 1e6}
maxTime={now * 1e6}
showLiveLogs={false}
/>,
);
const zoomOutBtn = screen.getByTestId('zoom-out-btn');
expect(zoomOutBtn).toBeDisabled();
});
});

View File

@@ -15,6 +15,7 @@ import ROUTES from 'constants/routes';
import { getMetricsListQuery } from 'container/MetricsExplorer/Summary/utils';
import { useGetMetricsList } from 'hooks/metricsExplorer/useGetMetricsList';
import { useGetQueryRange } from 'hooks/queryBuilder/useGetQueryRange';
import { useIsDarkMode } from 'hooks/useDarkMode';
import history from 'lib/history';
import cloneDeep from 'lodash-es/cloneDeep';
import { AnimatePresence } from 'motion/react';
@@ -43,6 +44,7 @@ const homeInterval = 30 * 60 * 1000;
// eslint-disable-next-line sonarjs/cognitive-complexity
export default function Home(): JSX.Element {
const { user } = useAppContext();
const isDarkMode = useIsDarkMode();
const [startTime, setStartTime] = useState<number | null>(null);
const [endTime, setEndTime] = useState<number | null>(null);
@@ -680,7 +682,11 @@ export default function Home(): JSX.Element {
<div className="checklist-img-container">
<img
src="/Images/allInOne.svg"
src={
isDarkMode
? '/Images/allInOne.svg'
: '/Images/allInOneLightMode.svg'
}
alt="checklist-img"
className="checklist-img"
/>

View File

@@ -1,6 +1,7 @@
.right-container {
display: flex;
flex-direction: column;
padding-bottom: 48px;
.header {
display: flex;

View File

@@ -835,56 +835,54 @@ function NewWidget({
</LeftContainerWrapper>
<RightContainerWrapper>
<OverlayScrollbar>
<RightContainer
setGraphHandler={setGraphHandler}
title={title}
setTitle={setTitle}
description={description}
setDescription={setDescription}
stackedBarChart={stackedBarChart}
setStackedBarChart={setStackedBarChart}
opacity={opacity}
yAxisUnit={yAxisUnit}
columnUnits={columnUnits}
setColumnUnits={setColumnUnits}
bucketCount={bucketCount}
bucketWidth={bucketWidth}
combineHistogram={combineHistogram}
setCombineHistogram={setCombineHistogram}
setBucketWidth={setBucketWidth}
setBucketCount={setBucketCount}
setOpacity={setOpacity}
selectedNullZeroValue={selectedNullZeroValue}
setSelectedNullZeroValue={setSelectedNullZeroValue}
selectedGraph={graphType}
setSelectedTime={setSelectedTime}
selectedTime={selectedTime}
setYAxisUnit={setYAxisUnit}
decimalPrecision={decimalPrecision}
setDecimalPrecision={setDecimalPrecision}
thresholds={thresholds}
setThresholds={setThresholds}
selectedWidget={selectedWidget}
isFillSpans={isFillSpans}
setIsFillSpans={setIsFillSpans}
isLogScale={isLogScale}
setIsLogScale={setIsLogScale}
legendPosition={legendPosition}
setLegendPosition={setLegendPosition}
customLegendColors={customLegendColors}
setCustomLegendColors={setCustomLegendColors}
queryResponse={queryResponse}
softMin={softMin}
setSoftMin={setSoftMin}
softMax={softMax}
setSoftMax={setSoftMax}
contextLinks={contextLinks}
setContextLinks={setContextLinks}
enableDrillDown={enableDrillDown}
isNewDashboard={isNewDashboard}
/>
</OverlayScrollbar>
<RightContainer
setGraphHandler={setGraphHandler}
title={title}
setTitle={setTitle}
description={description}
setDescription={setDescription}
stackedBarChart={stackedBarChart}
setStackedBarChart={setStackedBarChart}
opacity={opacity}
yAxisUnit={yAxisUnit}
columnUnits={columnUnits}
setColumnUnits={setColumnUnits}
bucketCount={bucketCount}
bucketWidth={bucketWidth}
combineHistogram={combineHistogram}
setCombineHistogram={setCombineHistogram}
setBucketWidth={setBucketWidth}
setBucketCount={setBucketCount}
setOpacity={setOpacity}
selectedNullZeroValue={selectedNullZeroValue}
setSelectedNullZeroValue={setSelectedNullZeroValue}
selectedGraph={graphType}
setSelectedTime={setSelectedTime}
selectedTime={selectedTime}
setYAxisUnit={setYAxisUnit}
decimalPrecision={decimalPrecision}
setDecimalPrecision={setDecimalPrecision}
thresholds={thresholds}
setThresholds={setThresholds}
selectedWidget={selectedWidget}
isFillSpans={isFillSpans}
setIsFillSpans={setIsFillSpans}
isLogScale={isLogScale}
setIsLogScale={setIsLogScale}
legendPosition={legendPosition}
setLegendPosition={setLegendPosition}
customLegendColors={customLegendColors}
setCustomLegendColors={setCustomLegendColors}
queryResponse={queryResponse}
softMin={softMin}
setSoftMin={setSoftMin}
softMax={softMax}
setSoftMax={setSoftMax}
contextLinks={contextLinks}
setContextLinks={setContextLinks}
enableDrillDown={enableDrillDown}
isNewDashboard={isNewDashboard}
/>
</RightContainerWrapper>
</PanelContainer>
<Modal

View File

@@ -15,7 +15,14 @@ export const RightContainerWrapper = styled(Col)`
overflow-y: auto;
}
&::-webkit-scrollbar {
width: 0rem;
width: 0.3rem;
}
&::-webkit-scrollbar-thumb {
background: rgb(136, 136, 136);
border-radius: 0.625rem;
}
&::-webkit-scrollbar-track {
background: transparent;
}
`;

View File

@@ -30,6 +30,7 @@ import { AppState } from 'store/reducers';
import AppActions from 'types/actions';
import { GlobalReducer } from 'types/reducer/globalTime';
import { addCustomTimeRange } from 'utils/customTimeRangeUtils';
import { persistTimeDurationForRoute } from 'utils/metricsTimeStorageUtils';
import { normalizeTimeToMs } from 'utils/timeUtils';
import { v4 as uuid } from 'uuid';
@@ -234,20 +235,7 @@ function DateTimeSelection({
const updateLocalStorageForRoutes = useCallback(
(value: Time | string): void => {
const preRoutes = getLocalStorageKey(LOCALSTORAGE.METRICS_TIME_IN_DURATION);
if (preRoutes !== null) {
const preRoutesObject = JSON.parse(preRoutes);
const preRoute = {
...preRoutesObject,
};
preRoute[location.pathname] = value;
setLocalStorageKey(
LOCALSTORAGE.METRICS_TIME_IN_DURATION,
JSON.stringify(preRoute),
);
}
persistTimeDurationForRoute(location.pathname, String(value));
},
[location.pathname],
);
@@ -738,6 +726,7 @@ function DateTimeSelection({
showRecentlyUsed={showRecentlyUsed}
minTime={minTimeForDateTimePicker}
maxTime={maxTimeForDateTimePicker}
isModalTimeSelection={isModalTimeSelection}
/>
{showAutoRefresh && selectedTime !== 'custom' && (

View File

@@ -0,0 +1,160 @@
import { act, renderHook } from '@testing-library/react';
import { QueryParams } from 'constants/query';
import { GlobalReducer } from 'types/reducer/globalTime';
import { useZoomOut } from '../useZoomOut';
const mockDispatch = jest.fn();
const mockSafeNavigate = jest.fn();
const mockUrlQueryDelete = jest.fn();
const mockUrlQuerySet = jest.fn();
const mockUrlQueryToString = jest.fn(() => '');
interface MockAppState {
globalTime: Pick<GlobalReducer, 'minTime' | 'maxTime'>;
}
jest.mock('react-redux', () => ({
useDispatch: (): jest.Mock => mockDispatch,
useSelector: <T>(selector: (state: MockAppState) => T): T => {
const mockState: MockAppState = {
globalTime: {
minTime: 15 * 60 * 1000 * 1e6, // 15 min in nanoseconds
maxTime: 30 * 60 * 1000 * 1e6, // 30 min in nanoseconds (mock for getNextZoomOutRange)
},
};
return selector(mockState);
},
}));
jest.mock('react-router-dom', () => ({
useLocation: (): { pathname: string } => ({ pathname: '/logs-explorer' }),
}));
jest.mock('hooks/useSafeNavigate', () => ({
useSafeNavigate: (): { safeNavigate: typeof mockSafeNavigate } => ({
safeNavigate: mockSafeNavigate,
}),
}));
interface MockUrlQuery {
delete: typeof mockUrlQueryDelete;
set: typeof mockUrlQuerySet;
get: () => null;
toString: typeof mockUrlQueryToString;
}
jest.mock('hooks/useUrlQuery', () => ({
__esModule: true,
default: (): MockUrlQuery => ({
delete: mockUrlQueryDelete,
set: mockUrlQuerySet,
get: (): null => null,
toString: mockUrlQueryToString,
}),
}));
const mockGetNextZoomOutRange = jest.fn();
jest.mock('lib/zoomOutUtils', () => ({
getNextZoomOutRange: (
...args: unknown[]
): ReturnType<typeof mockGetNextZoomOutRange> =>
mockGetNextZoomOutRange(...args),
}));
describe('useZoomOut', () => {
beforeEach(() => {
jest.clearAllMocks();
mockUrlQueryToString.mockReturnValue('relativeTime=45m');
});
it('should do nothing when isDisabled is true', () => {
const { result } = renderHook(() => useZoomOut({ isDisabled: true }));
act(() => {
result.current();
});
expect(mockGetNextZoomOutRange).not.toHaveBeenCalled();
expect(mockDispatch).not.toHaveBeenCalled();
expect(mockSafeNavigate).not.toHaveBeenCalled();
});
it('should do nothing when getNextZoomOutRange returns null', () => {
mockGetNextZoomOutRange.mockReturnValue(null);
const { result } = renderHook(() => useZoomOut());
act(() => {
result.current();
});
expect(mockGetNextZoomOutRange).toHaveBeenCalled();
expect(mockDispatch).not.toHaveBeenCalled();
expect(mockSafeNavigate).not.toHaveBeenCalled();
});
it('should dispatch preset and update URL when result has preset', () => {
mockGetNextZoomOutRange.mockReturnValue({
range: [1000, 2000],
preset: '45m',
});
const { result } = renderHook(() => useZoomOut());
act(() => {
result.current();
});
expect(mockDispatch).toHaveBeenCalledWith(expect.any(Function));
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.startTime);
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.endTime);
expect(mockUrlQuerySet).toHaveBeenCalledWith(QueryParams.relativeTime, '45m');
expect(mockSafeNavigate).toHaveBeenCalledWith(
expect.stringContaining('/logs-explorer'),
);
});
it('should dispatch custom range and update URL when result has no preset', () => {
mockGetNextZoomOutRange.mockReturnValue({
range: [1000000, 2000000],
preset: null,
});
const { result } = renderHook(() => useZoomOut());
act(() => {
result.current();
});
expect(mockDispatch).toHaveBeenCalledWith(expect.any(Function));
expect(mockUrlQuerySet).toHaveBeenCalledWith(
QueryParams.startTime,
'1000000',
);
expect(mockUrlQuerySet).toHaveBeenCalledWith(QueryParams.endTime, '2000000');
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.relativeTime);
expect(mockSafeNavigate).toHaveBeenCalledWith(
expect.stringContaining('/logs-explorer'),
);
});
it('should delete urlParamsToDelete when provided', () => {
mockGetNextZoomOutRange.mockReturnValue({
range: [1000, 2000],
preset: '45m',
});
const { result } = renderHook(() =>
useZoomOut({
urlParamsToDelete: [QueryParams.activeLogId],
}),
);
act(() => {
result.current();
});
expect(mockUrlQueryDelete).toHaveBeenCalledWith(QueryParams.activeLogId);
});
});

View File

@@ -0,0 +1,79 @@
import { useCallback, useRef } from 'react';
// eslint-disable-next-line no-restricted-imports
import { useDispatch, useSelector } from 'react-redux';
import { useLocation } from 'react-router-dom';
import { QueryParams } from 'constants/query';
import { useSafeNavigate } from 'hooks/useSafeNavigate';
import useUrlQuery from 'hooks/useUrlQuery';
import { getNextZoomOutRange } from 'lib/zoomOutUtils';
import { UpdateTimeInterval } from 'store/actions';
import { AppState } from 'store/reducers';
import { GlobalReducer } from 'types/reducer/globalTime';
import { persistTimeDurationForRoute } from 'utils/metricsTimeStorageUtils';
export interface UseZoomOutOptions {
/** When true, the zoom out handler does nothing (e.g. when live logs are enabled) */
isDisabled?: boolean;
/** URL params to delete when zooming out (e.g. [QueryParams.activeLogId] for logs) */
urlParamsToDelete?: string[];
}
/**
* Reusable hook for zoom-out functionality in explorers (logs, traces, etc.).
* Computes the next time range using the zoom-out ladder, updates Redux global time,
* and navigates with the new URL params.
*/
const EMPTY_PARAMS: string[] = [];
export function useZoomOut(options: UseZoomOutOptions = {}): () => void {
const { isDisabled = false, urlParamsToDelete = EMPTY_PARAMS } = options;
const urlParamsToDeleteRef = useRef(urlParamsToDelete);
urlParamsToDeleteRef.current = urlParamsToDelete;
const dispatch = useDispatch();
const { minTime, maxTime } = useSelector<AppState, GlobalReducer>(
(state) => state.globalTime,
);
const urlQuery = useUrlQuery();
const location = useLocation();
const { safeNavigate } = useSafeNavigate();
return useCallback((): void => {
if (isDisabled) {
return;
}
const minMs = Math.floor((minTime ?? 0) / 1e6);
const maxMs = Math.floor((maxTime ?? 0) / 1e6);
const result = getNextZoomOutRange(minMs, maxMs);
if (!result) {
return;
}
const [newStartMs, newEndMs] = result.range;
const { preset } = result;
if (preset) {
dispatch(UpdateTimeInterval(preset));
urlQuery.delete(QueryParams.startTime);
urlQuery.delete(QueryParams.endTime);
urlQuery.set(QueryParams.relativeTime, preset);
persistTimeDurationForRoute(location.pathname, preset);
} else {
dispatch(UpdateTimeInterval('custom', [newStartMs, newEndMs]));
urlQuery.set(QueryParams.startTime, String(newStartMs));
urlQuery.set(QueryParams.endTime, String(newEndMs));
urlQuery.delete(QueryParams.relativeTime);
}
for (const param of urlParamsToDeleteRef.current) {
urlQuery.delete(param);
}
safeNavigate(`${location.pathname}?${urlQuery.toString()}`);
}, [
dispatch,
isDisabled,
location.pathname,
maxTime,
minTime,
safeNavigate,
urlQuery,
]);
}

View File

@@ -0,0 +1,147 @@
import {
getNextDurationInLadder,
getNextZoomOutRange,
isZoomOutDisabled,
ZoomOutResult,
} from '../zoomOutUtils';
const MS_PER_MIN = 60 * 1000;
const MS_PER_HOUR = 60 * MS_PER_MIN;
const MS_PER_DAY = 24 * MS_PER_HOUR;
const MS_PER_WEEK = 7 * MS_PER_DAY;
// Fixed "now" for deterministic tests: 2024-01-15 12:00:00 UTC
const NOW_MS = 1705312800000;
describe('zoomOutUtils', () => {
beforeEach(() => {
jest.spyOn(Date, 'now').mockReturnValue(NOW_MS);
});
afterEach(() => {
jest.restoreAllMocks();
});
describe('getNextDurationInLadder', () => {
it('should use 3x zoom out below 15m until reaching 15m', () => {
expect(getNextDurationInLadder(1 * MS_PER_MIN)).toBe(3 * MS_PER_MIN);
expect(getNextDurationInLadder(2 * MS_PER_MIN)).toBe(6 * MS_PER_MIN);
expect(getNextDurationInLadder(3 * MS_PER_MIN)).toBe(9 * MS_PER_MIN);
expect(getNextDurationInLadder(4 * MS_PER_MIN)).toBe(12 * MS_PER_MIN);
expect(getNextDurationInLadder(5 * MS_PER_MIN)).toBe(15 * MS_PER_MIN); // cap at 15m
expect(getNextDurationInLadder(6 * MS_PER_MIN)).toBe(15 * MS_PER_MIN); // 18m capped
});
it('should return next step for each ladder rung from 15m onward', () => {
expect(getNextDurationInLadder(10 * MS_PER_MIN)).toBe(15 * MS_PER_MIN);
expect(getNextDurationInLadder(15 * MS_PER_MIN)).toBe(45 * MS_PER_MIN);
expect(getNextDurationInLadder(45 * MS_PER_MIN)).toBe(2 * MS_PER_HOUR);
expect(getNextDurationInLadder(2 * MS_PER_HOUR)).toBe(7 * MS_PER_HOUR);
expect(getNextDurationInLadder(7 * MS_PER_HOUR)).toBe(21 * MS_PER_HOUR);
expect(getNextDurationInLadder(21 * MS_PER_HOUR)).toBe(1 * MS_PER_DAY);
expect(getNextDurationInLadder(1 * MS_PER_DAY)).toBe(2 * MS_PER_DAY);
expect(getNextDurationInLadder(2 * MS_PER_DAY)).toBe(3 * MS_PER_DAY);
expect(getNextDurationInLadder(3 * MS_PER_DAY)).toBe(1 * MS_PER_WEEK);
expect(getNextDurationInLadder(1 * MS_PER_WEEK)).toBe(2 * MS_PER_WEEK);
expect(getNextDurationInLadder(2 * MS_PER_WEEK)).toBe(30 * MS_PER_DAY);
});
it('should return MAX when at or past 1 month (no wrap)', () => {
expect(getNextDurationInLadder(30 * MS_PER_DAY)).toBe(30 * MS_PER_DAY);
expect(getNextDurationInLadder(31 * MS_PER_DAY)).toBe(30 * MS_PER_DAY);
});
it('should return next step for duration between ladder rungs', () => {
expect(getNextDurationInLadder(1 * MS_PER_HOUR)).toBe(2 * MS_PER_HOUR);
expect(getNextDurationInLadder(5 * MS_PER_HOUR)).toBe(7 * MS_PER_HOUR);
expect(getNextDurationInLadder(12 * MS_PER_HOUR)).toBe(21 * MS_PER_HOUR);
});
});
describe('getNextZoomOutRange', () => {
it('should return null when duration is zero or negative', () => {
expect(getNextZoomOutRange(NOW_MS, NOW_MS)).toBeNull();
expect(getNextZoomOutRange(NOW_MS, NOW_MS - 1000)).toBeNull();
});
it('should return center-anchored range and preset=null when new end does not exceed now (Phase 1)', () => {
// 15m range centered well before now so zoom to 45m keeps end <= now
// Center at now-30m: end = center + 22.5m = now - 7.5m <= now
const centerMs = NOW_MS - 30 * MS_PER_MIN;
const start15m = centerMs - 7.5 * MS_PER_MIN;
const end15m = centerMs + 7.5 * MS_PER_MIN;
const result = getNextZoomOutRange(start15m, end15m) as ZoomOutResult;
expect(result).not.toBeNull();
expect(result.preset).toBeNull(); // Phase 1: preserve center-anchored range, avoid GetMinMax "last X from now"
const [newStart, newEnd] = result.range;
expect(newEnd - newStart).toBe(45 * MS_PER_MIN);
const newCenter = (newStart + newEnd) / 2;
expect(Math.abs(newCenter - centerMs)).toBeLessThan(2000);
expect(newEnd).toBeLessThanOrEqual(NOW_MS + 1000);
});
it('should return end-anchored range when new end would exceed now (Phase 2)', () => {
// 22hr range ending at now - zoom to 1d (24hr) would push end past now
// Next ladder step from 22hr is 1d
const start22h = NOW_MS - 22 * MS_PER_HOUR;
const end22h = NOW_MS;
const result = getNextZoomOutRange(start22h, end22h) as ZoomOutResult;
expect(result).not.toBeNull();
expect(result.preset).toBe('1d');
const [newStart, newEnd] = result.range;
expect(newEnd).toBe(NOW_MS); // End anchored at now
expect(newStart).toBe(NOW_MS - 1 * MS_PER_DAY);
});
it('should return correct preset for each ladder step', () => {
const presets: [number, number, string][] = [
[15 * MS_PER_MIN, 0, '45m'],
[45 * MS_PER_MIN, 0, '2h'],
[2 * MS_PER_HOUR, 0, '7h'],
[7 * MS_PER_HOUR, 0, '21h'],
[21 * MS_PER_HOUR, 0, '1d'],
[1 * MS_PER_DAY, 0, '2d'],
[2 * MS_PER_DAY, 0, '3d'],
[3 * MS_PER_DAY, 0, '1w'],
[1 * MS_PER_WEEK, 0, '2w'],
[2 * MS_PER_WEEK, 0, '1month'],
];
presets.forEach(([durationMs, offset, expectedPreset]) => {
const end = NOW_MS - offset;
const start = end - durationMs;
const result = getNextZoomOutRange(start, end);
expect(result?.preset).toBe(expectedPreset);
});
});
it('isZoomOutDisabled returns true when duration >= 1 month', () => {
expect(isZoomOutDisabled(30 * MS_PER_DAY)).toBe(true);
expect(isZoomOutDisabled(31 * MS_PER_DAY)).toBe(true);
expect(isZoomOutDisabled(29 * MS_PER_DAY)).toBe(false);
expect(isZoomOutDisabled(15 * MS_PER_MIN)).toBe(false);
});
it('should return null when at 1 month (no zoom out beyond max)', () => {
const start1m = NOW_MS - 30 * MS_PER_DAY;
const end1m = NOW_MS;
const result = getNextZoomOutRange(start1m, end1m);
expect(result).toBeNull();
});
it('should zoom out 3x from 5m range to 15m then continue with ladder', () => {
// 5m range ending at now → 3x = 15m
const start5m = NOW_MS - 5 * MS_PER_MIN;
const end5m = NOW_MS;
const result = getNextZoomOutRange(start5m, end5m) as ZoomOutResult;
expect(result).not.toBeNull();
expect(result.preset).toBe('15m');
const [newStart, newEnd] = result.range;
expect(newEnd - newStart).toBe(15 * MS_PER_MIN);
});
});
});

View File

@@ -0,0 +1,139 @@
/**
* Custom Time Picker zoom-out ladder:
* - Until 1 day: 15m → 45m → 2hr → 7hr → 21hr
* - Then fixed: 1d → 2d → 3d → 1w → 2w → 1m
* - At 1 month: zoom out is disabled (max range)
*/
import type {
CustomTimeType,
Time,
} from 'container/TopNav/DateTimeSelectionV2/types';
const MS_PER_MIN = 60 * 1000;
const MS_PER_HOUR = 60 * MS_PER_MIN;
const MS_PER_DAY = 24 * MS_PER_HOUR;
const MS_PER_WEEK = 7 * MS_PER_DAY;
const ZOOM_OUT_LADDER_MS: number[] = [
15 * MS_PER_MIN, // 15m
45 * MS_PER_MIN, // 45m
2 * MS_PER_HOUR, // 2hr
7 * MS_PER_HOUR, // 7hr
21 * MS_PER_HOUR, // 21hr
1 * MS_PER_DAY, // 1d
2 * MS_PER_DAY, // 2d
3 * MS_PER_DAY, // 3d
1 * MS_PER_WEEK, // 1w
2 * MS_PER_WEEK, // 2w
30 * MS_PER_DAY, // 1m
];
const LADDER_LAST_INDEX = ZOOM_OUT_LADDER_MS.length - 1;
const MAX_DURATION = ZOOM_OUT_LADDER_MS[LADDER_LAST_INDEX];
const MIN_LADDER_DURATION_MS = ZOOM_OUT_LADDER_MS[0]; // 15m - below this we use 3x
export const MAX_ZOOM_OUT_DURATION_MS = MAX_DURATION;
/** Returns true when zoom out should be disabled (range at or beyond 1 month) */
export function isZoomOutDisabled(durationMs: number): boolean {
return durationMs >= MAX_ZOOM_OUT_DURATION_MS;
}
/** Preset labels for ladder steps supported by GetMinMax (shows "Last 15 minutes" etc. instead of "Custom") */
const PRESET_FOR_DURATION_MS: Record<number, Time | CustomTimeType> = {
[15 * MS_PER_MIN]: '15m',
[45 * MS_PER_MIN]: '45m',
[2 * MS_PER_HOUR]: '2h',
[7 * MS_PER_HOUR]: '7h',
[21 * MS_PER_HOUR]: '21h',
[1 * MS_PER_DAY]: '1d',
[2 * MS_PER_DAY]: '2d',
[3 * MS_PER_DAY]: '3d',
[1 * MS_PER_WEEK]: '1w',
[2 * MS_PER_WEEK]: '2w',
[30 * MS_PER_DAY]: '1month',
};
/**
* Returns the next duration in the zoom-out ladder for the given current duration.
* Below 15m: zoom out 3x until we reach 15m, then continue with the ladder.
* If at or past 1 month, returns MAX_DURATION (no zoom out - button is disabled).
*/
export function getNextDurationInLadder(durationMs: number): number {
if (durationMs >= MAX_DURATION) {
return MAX_DURATION; // No zoom out beyond 1 month
}
// Below 15m: zoom out 3x until we reach 15m
if (durationMs < MIN_LADDER_DURATION_MS) {
const next = durationMs * 3;
return Math.min(next, MIN_LADDER_DURATION_MS);
}
// At or above 15m: use the fixed ladder
for (let i = 0; i < ZOOM_OUT_LADDER_MS.length; i++) {
if (ZOOM_OUT_LADDER_MS[i] > durationMs) {
return ZOOM_OUT_LADDER_MS[i];
}
}
return MAX_DURATION;
}
export interface ZoomOutResult {
range: [number, number];
/** Preset key (e.g. '15m') when range matches a preset - use for display instead of "Custom Date Range" */
preset: Time | CustomTimeType | null;
}
/**
* Computes the next zoomed-out time range.
* Phase 1 (center-anchored): While new end <= now, expand from center.
* Phase 2 (end-anchored at now): When new end would exceed now, anchor end at now and move start backward.
*
* @returns ZoomOutResult with range and preset (or null if no change)
*/
export function getNextZoomOutRange(
startMs: number,
endMs: number,
): ZoomOutResult | null {
const nowMs = Date.now();
const durationMs = endMs - startMs;
if (durationMs <= 0) {
return null;
}
const newDurationMs = getNextDurationInLadder(durationMs);
// No zoom out when already at max (1 month)
if (newDurationMs <= durationMs) {
return null;
}
const centerMs = startMs + durationMs / 2;
const computedEndMs = centerMs + newDurationMs / 2;
let newStartMs: number;
let newEndMs: number;
const isPhase1 = computedEndMs <= nowMs;
if (isPhase1) {
// Phase 1: center-anchored (historical range not ending at now)
newStartMs = centerMs - newDurationMs / 2;
newEndMs = computedEndMs;
} else {
// Phase 2: end-anchored at now
newStartMs = nowMs - newDurationMs;
newEndMs = nowMs;
}
// Phase 2 only: use preset so GetMinMax produces "last X from now".
// Phase 1: preset=null so the center-anchored range is preserved (GetMinMax would discard it).
const preset = isPhase1 ? null : PRESET_FOR_DURATION_MS[newDurationMs] ?? null;
return {
range: [Math.round(newStartMs), Math.round(newEndMs)],
preset,
};
}

View File

@@ -1,4 +1,5 @@
import { Route } from 'react-router-dom';
import * as getDashboardModule from 'api/v1/dashboards/id/get';
import { PANEL_TYPES } from 'constants/queryBuilder';
import { rest, server } from 'mocks-server/server';
import { render, screen, waitFor } from 'tests/test-utils';
@@ -47,35 +48,33 @@ jest.mock('container/NewWidget', () => ({
default: (): JSX.Element => <div data-testid="new-widget">NewWidget</div>,
}));
// nuqs's useQueryState doesn't read from MemoryRouter, so we mock it to return
// controlled values via the `mockQueryState` map below.
const mockQueryState: Record<string, string | null> = {};
jest.mock('nuqs', () => ({
...jest.requireActual('nuqs'),
useQueryState: (key: string): [string | null, jest.Mock] => [
mockQueryState[key] ?? null,
jest.fn(),
],
}));
// Wrap component in a Route so useParams can resolve dashboardId
// Wrap component in a Route so useParams can resolve dashboardId.
// Query params are passed via the URL so useUrlQuery (react-router) can read them.
function renderAtRoute(
queryState: Record<string, string | null> = {},
): ReturnType<typeof render> {
Object.assign(mockQueryState, queryState);
const params = new URLSearchParams();
Object.entries(queryState).forEach(([k, v]) => {
if (v !== null) {
params.set(k, v);
}
});
const search = params.toString() ? `?${params.toString()}` : '';
return render(
<Route path="/dashboard/:dashboardId/new">
<DashboardWidget />
</Route>,
undefined,
{ initialRoute: `/dashboard/${DASHBOARD_ID}/new` },
{ initialRoute: `/dashboard/${DASHBOARD_ID}/new${search}` },
);
}
beforeEach(() => {
mockSafeNavigate.mockClear();
Object.keys(mockQueryState).forEach((k) => delete mockQueryState[k]);
});
afterEach(() => {
jest.restoreAllMocks();
});
describe('DashboardWidget', () => {
@@ -102,12 +101,10 @@ describe('DashboardWidget', () => {
});
it('shows spinner while dashboard is loading', () => {
server.use(
rest.get(
`http://localhost/api/v1/dashboards/${DASHBOARD_ID}`,
(_req, res, ctx) => res(ctx.delay('infinite')),
),
);
// Spy instead of MSW delay('infinite') to avoid leaving an open network handle.
jest
.spyOn(getDashboardModule, 'default')
.mockReturnValue(new Promise(() => {}));
renderAtRoute({ widgetId: WIDGET_ID, graphType: PANEL_TYPES.TIME_SERIES });

View File

@@ -1,10 +1,11 @@
import { useEffect, useState } from 'react';
import { useEffect, useMemo, useState } from 'react';
import { useQuery } from 'react-query';
import { generatePath, useParams } from 'react-router-dom';
import { Card, Typography } from 'antd';
import getDashboard from 'api/v1/dashboards/id/get';
import Spinner from 'components/Spinner';
import { SOMETHING_WENT_WRONG } from 'constants/api';
import { QueryParams } from 'constants/query';
import { PANEL_TYPES } from 'constants/queryBuilder';
import { DASHBOARD_CACHE_TIME } from 'constants/queryCacheTime';
import { REACT_QUERY_KEY } from 'constants/reactQueryKeys';
@@ -13,7 +14,7 @@ import NewWidget from 'container/NewWidget';
import { isDrilldownEnabled } from 'container/QueryTable/Drilldown/drilldownUtils';
import { useTransformDashboardVariables } from 'hooks/dashboard/useTransformDashboardVariables';
import { useSafeNavigate } from 'hooks/useSafeNavigate';
import { parseAsStringEnum, useQueryState } from 'nuqs';
import useUrlQuery from 'hooks/useUrlQuery';
import { setDashboardVariablesStore } from 'providers/Dashboard/store/dashboardVariables/dashboardVariablesStore';
import { Dashboard } from 'types/api/dashboard/getAll';
@@ -21,11 +22,13 @@ function DashboardWidget(): JSX.Element | null {
const { dashboardId } = useParams<{
dashboardId: string;
}>();
const [widgetId] = useQueryState('widgetId');
const [graphType] = useQueryState(
'graphType',
parseAsStringEnum<PANEL_TYPES>(Object.values(PANEL_TYPES)),
);
const query = useUrlQuery();
const { graphType, widgetId } = useMemo(() => {
return {
graphType: query.get(QueryParams.graphType) as PANEL_TYPES,
widgetId: query.get(QueryParams.widgetId),
};
}, [query]);
const { safeNavigate } = useSafeNavigate();

View File

@@ -0,0 +1,28 @@
import getLocalStorageKey from 'api/browser/localstorage/get';
import setLocalStorageKey from 'api/browser/localstorage/set';
import { LOCALSTORAGE } from 'constants/localStorage';
/**
* Updates the stored time duration for a route in localStorage.
* Used by both DateTimeSelectionV2 (manual time pick) and useZoomOut (zoom out button).
*
* @param pathname - The route path (e.g. /infrastructure-monitoring/hosts)
* @param value - The time value to store (preset string like '1w' or JSON string for custom range)
*/
export function persistTimeDurationForRoute(
pathname: string,
value: string,
): void {
const preRoutes = getLocalStorageKey(LOCALSTORAGE.METRICS_TIME_IN_DURATION);
let preRoutesObject: Record<string, string> = {};
try {
preRoutesObject = preRoutes ? JSON.parse(preRoutes) : {};
} catch {
preRoutesObject = {};
}
const preRoute = { ...preRoutesObject, [pathname]: value };
setLocalStorageKey(
LOCALSTORAGE.METRICS_TIME_IN_DURATION,
JSON.stringify(preRoute),
);
}

View File

@@ -0,0 +1,46 @@
package loghandler
import (
"context"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
)
type filtering struct{}
func NewFiltering() *filtering {
return &filtering{}
}
func (h *filtering) Wrap(next LogHandler) LogHandler {
return LogHandlerFunc(func(ctx context.Context, record slog.Record) error {
if !filterRecord(record) {
return nil
}
return next.Handle(ctx, record)
})
}
// filterRecord determines whether a log record should be written.
// Returns false if the record should be suppressed, true otherwise.
//
// Current filters:
// - context.Canceled: These are expected errors from cancelled operations,
// and create noise in logs.
func filterRecord(record slog.Record) bool {
suppress := false
record.Attrs(func(a slog.Attr) bool {
if a.Value.Kind() == slog.KindAny {
if err, ok := a.Value.Any().(error); ok {
if errors.Is(err, context.Canceled) {
suppress = true
return false
}
}
}
return true
})
return !suppress
}

View File

@@ -0,0 +1,64 @@
package loghandler
import (
"bytes"
"context"
"encoding/json"
"log/slog"
"testing"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFiltering_SuppressesContextCanceled(t *testing.T) {
filtering := NewFiltering()
buf := bytes.NewBuffer(nil)
logger := slog.New(&handler{base: slog.NewJSONHandler(buf, &slog.HandlerOptions{Level: slog.LevelDebug}), wrappers: []Wrapper{filtering}})
logger.ErrorContext(context.Background(), "operation failed", "error", context.Canceled)
assert.Empty(t, buf.String(), "log with context.Canceled should be suppressed")
}
func TestFiltering_SuppressesWrappedContextCanceled(t *testing.T) {
filtering := NewFiltering()
buf := bytes.NewBuffer(nil)
logger := slog.New(&handler{base: slog.NewJSONHandler(buf, &slog.HandlerOptions{Level: slog.LevelDebug}), wrappers: []Wrapper{filtering}})
wrappedErr := errors.Wrapf(context.Canceled, errors.CodeInternal, "wrapped")
logger.ErrorContext(context.Background(), "operation failed", "error", wrappedErr)
assert.Empty(t, buf.String(), "log with wrapped context.Canceled should be suppressed")
}
func TestFiltering_AllowsOtherErrors(t *testing.T) {
filtering := NewFiltering()
buf := bytes.NewBuffer(nil)
logger := slog.New(&handler{base: slog.NewJSONHandler(buf, &slog.HandlerOptions{Level: slog.LevelDebug}), wrappers: []Wrapper{filtering}})
logger.ErrorContext(context.Background(), "operation failed", "error", errors.New(errors.TypeInternal, errors.CodeInternal, "some other error"))
m := make(map[string]any)
err := json.Unmarshal(buf.Bytes(), &m)
require.NoError(t, err)
assert.Equal(t, "operation failed", m["msg"])
}
func TestFiltering_AllowsLogsWithoutErrors(t *testing.T) {
filtering := NewFiltering()
buf := bytes.NewBuffer(nil)
logger := slog.New(&handler{base: slog.NewJSONHandler(buf, &slog.HandlerOptions{Level: slog.LevelDebug}), wrappers: []Wrapper{filtering}})
logger.InfoContext(context.Background(), "normal log", "key", "value")
m := make(map[string]any)
err := json.Unmarshal(buf.Bytes(), &m)
require.NoError(t, err)
assert.Equal(t, "normal log", m["msg"])
}

View File

@@ -116,7 +116,7 @@ func New(ctx context.Context, cfg Config, build version.Build, serviceName strin
meterProvider: meterProvider,
meterProviderShutdownFunc: meterProviderShutdownFunc,
prometheusRegistry: prometheusRegistry,
logger: NewLogger(cfg, loghandler.NewCorrelation()),
logger: NewLogger(cfg, loghandler.NewCorrelation(), loghandler.NewFiltering()),
startCh: make(chan struct{}),
}, nil
}

View File

@@ -7,13 +7,14 @@ import (
"strings"
"time"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
@@ -121,7 +122,7 @@ func (r *Repo) insertConfig(
// allowing empty elements for logs - use case is deleting all pipelines
if len(elements) == 0 && c.ElementType != opamptypes.ElementTypeLogPipelines {
zap.L().Error("insert config called with no elements ", zap.String("ElementType", c.ElementType.StringValue()))
slog.ErrorContext(ctx, "insert config called with no elements", "element_type", c.ElementType.StringValue())
return errors.NewInvalidInputf(CodeConfigElementsRequired, "config must have atleast one element")
}
@@ -129,13 +130,13 @@ func (r *Repo) insertConfig(
// the version can not be set by the user, we want to auto-assign the versions
// in a monotonically increasing order starting with 1. hence, we reject insert
// requests with version anything other than 0. here, 0 indicates un-assigned
zap.L().Error("invalid version assignment while inserting agent config", zap.Int("version", c.Version), zap.String("ElementType", c.ElementType.StringValue()))
slog.ErrorContext(ctx, "invalid version assignment while inserting agent config", "version", c.Version, "element_type", c.ElementType.StringValue())
return errors.NewInvalidInputf(errors.CodeInvalidInput, "user defined versions are not supported in the agent config")
}
configVersion, err := r.GetLatestVersion(ctx, orgId, c.ElementType)
if err != nil && !errors.Ast(err, errors.TypeNotFound) {
zap.L().Error("failed to fetch latest config version", zap.Error(err))
slog.ErrorContext(ctx, "failed to fetch latest config version", "error", err)
return err
}
@@ -155,11 +156,11 @@ func (r *Repo) insertConfig(
// Delete elements first, then version (to respect potential foreign key constraints)
_, delErr := r.store.BunDB().NewDelete().Model(new(opamptypes.AgentConfigElement)).Where("version_id = ?", c.ID).Exec(ctx)
if delErr != nil {
zap.L().Error("failed to delete config elements during cleanup", zap.Error(delErr), zap.String("version_id", c.ID.String()))
slog.ErrorContext(ctx, "failed to delete config elements during cleanup", "error", delErr, "version_id", c.ID.String())
}
_, delErr = r.store.BunDB().NewDelete().Model(new(opamptypes.AgentConfigVersion)).Where("id = ?", c.ID).Where("org_id = ?", orgId).Exec(ctx)
if delErr != nil {
zap.L().Error("failed to delete config version during cleanup", zap.Error(delErr), zap.String("version_id", c.ID.String()))
slog.ErrorContext(ctx, "failed to delete config version during cleanup", "error", delErr, "version_id", c.ID.String())
}
}
}()
@@ -170,7 +171,7 @@ func (r *Repo) insertConfig(
Model(c).
Exec(ctx)
if dbErr != nil {
zap.L().Error("error in inserting config version: ", zap.Error(dbErr))
slog.ErrorContext(ctx, "error in inserting config version", "error", dbErr)
return errors.WrapInternalf(dbErr, CodeConfigVersionInsertFailed, "failed to insert config version")
}
@@ -221,7 +222,7 @@ func (r *Repo) updateDeployStatus(ctx context.Context,
Where("org_id = ?", orgId).
Exec(ctx)
if err != nil {
zap.L().Error("failed to update deploy status", zap.Error(err))
slog.ErrorContext(ctx, "failed to update deploy status", "error", err)
return model.BadRequest(fmt.Errorf("failed to update deploy status"))
}
@@ -239,7 +240,7 @@ func (r *Repo) updateDeployStatusByHash(
Where("org_id = ?", orgId).
Exec(ctx)
if err != nil {
zap.L().Error("failed to update deploy status", zap.Error(err))
slog.ErrorContext(ctx, "failed to update deploy status", "error", err)
return errors.WrapInternalf(err, CodeConfigDeployStatusUpdateFailed, "failed to update deploy status")
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"fmt"
"log/slog"
"strings"
"sync"
"sync/atomic"
@@ -17,7 +18,6 @@ import (
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"go.uber.org/zap"
yaml "gopkg.in/yaml.v3"
)
@@ -36,7 +36,8 @@ type AgentFeatureType string
type Manager struct {
Repo
// lock to make sure only one update is sent to remote agents at a time
lock uint32
lock uint32
logger *slog.Logger
// For AgentConfigProvider implementation
agentFeatures []AgentFeature
@@ -67,6 +68,7 @@ func Initiate(options *ManagerOptions) (*Manager, error) {
m = &Manager{
Repo: Repo{options.Store},
logger: slog.Default(),
agentFeatures: options.AgentFeatures,
configSubscribers: map[string]func(){},
}
@@ -222,19 +224,19 @@ func NotifyConfigUpdate(ctx context.Context) {
func Redeploy(ctx context.Context, orgId valuer.UUID, typ opamptypes.ElementType, version int) error {
configVersion, err := GetConfigVersion(ctx, orgId, typ, version)
if err != nil {
zap.L().Error("failed to fetch config version during redeploy", zap.Error(err))
slog.ErrorContext(ctx, "failed to fetch config version during redeploy", "error", err)
return err
}
if configVersion == nil || (configVersion != nil && configVersion.Config == "") {
zap.L().Debug("config version has no conf yaml", zap.Any("configVersion", configVersion))
slog.DebugContext(ctx, "config version has no conf yaml", "config_version", configVersion)
return errors.NewInvalidInputf(CodeConfigVersionNoConfig, "the config version can not be redeployed")
}
switch typ {
case opamptypes.ElementTypeSamplingRules:
var config *tsp.Config
if err := yaml.Unmarshal([]byte(configVersion.Config), &config); err != nil {
zap.L().Debug("failed to read last conf correctly", zap.Error(err))
slog.DebugContext(ctx, "failed to read last conf correctly", "error", err)
return model.BadRequest(fmt.Errorf("failed to read the stored config correctly"))
}
@@ -246,7 +248,7 @@ func Redeploy(ctx context.Context, orgId valuer.UUID, typ opamptypes.ElementType
opamp.AddToTracePipelineSpec("signoz_tail_sampling")
configHash, err := opamp.UpsertControlProcessors(ctx, "traces", processorConf, m.OnConfigUpdate)
if err != nil {
zap.L().Error("failed to call agent config update for trace processor", zap.Error(err))
slog.ErrorContext(ctx, "failed to call agent config update for trace processor", "error", err)
return errors.WithAdditionalf(err, "failed to deploy the config")
}
@@ -254,7 +256,7 @@ func Redeploy(ctx context.Context, orgId valuer.UUID, typ opamptypes.ElementType
case opamptypes.ElementTypeDropRules:
var filterConfig *filterprocessor.Config
if err := yaml.Unmarshal([]byte(configVersion.Config), &filterConfig); err != nil {
zap.L().Error("failed to read last conf correctly", zap.Error(err))
slog.ErrorContext(ctx, "failed to read last conf correctly", "error", err)
return model.InternalError(fmt.Errorf("failed to read the stored config correctly"))
}
processorConf := map[string]interface{}{
@@ -264,7 +266,7 @@ func Redeploy(ctx context.Context, orgId valuer.UUID, typ opamptypes.ElementType
opamp.AddToMetricsPipelineSpec("filter")
configHash, err := opamp.UpsertControlProcessors(ctx, "metrics", processorConf, m.OnConfigUpdate)
if err != nil {
zap.L().Error("failed to call agent config update for trace processor", zap.Error(err))
slog.ErrorContext(ctx, "failed to call agent config update for trace processor", "error", err)
return err
}
@@ -290,13 +292,13 @@ func UpsertFilterProcessor(ctx context.Context, orgId valuer.UUID, version int,
opamp.AddToMetricsPipelineSpec("filter")
configHash, err := opamp.UpsertControlProcessors(ctx, "metrics", processorConf, m.OnConfigUpdate)
if err != nil {
zap.L().Error("failed to call agent config update for trace processor", zap.Error(err))
slog.ErrorContext(ctx, "failed to call agent config update for trace processor", "error", err)
return err
}
processorConfYaml, yamlErr := yaml.Marshal(config)
if yamlErr != nil {
zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr))
slog.WarnContext(ctx, "unexpected error while transforming processor config to yaml", "error", yamlErr)
}
m.updateDeployStatus(ctx, orgId, opamptypes.ElementTypeDropRules, version, opamptypes.DeployInitiated.StringValue(), "Deployment started", configHash, string(processorConfYaml))
@@ -315,7 +317,7 @@ func (m *Manager) OnConfigUpdate(orgId valuer.UUID, agentId string, hash string,
message := "Deployment was successful"
defer func() {
zap.L().Info(status, zap.String("agentId", agentId), zap.String("agentResponse", message))
m.logger.Info(status, "agent_id", agentId, "agent_response", message)
}()
if err != nil {
@@ -341,13 +343,13 @@ func UpsertSamplingProcessor(ctx context.Context, orgId valuer.UUID, version int
opamp.AddToTracePipelineSpec("signoz_tail_sampling")
configHash, err := opamp.UpsertControlProcessors(ctx, "traces", processorConf, m.OnConfigUpdate)
if err != nil {
zap.L().Error("failed to call agent config update for trace processor", zap.Error(err))
slog.ErrorContext(ctx, "failed to call agent config update for trace processor", "error", err)
return err
}
processorConfYaml, yamlErr := yaml.Marshal(config)
if yamlErr != nil {
zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr))
slog.WarnContext(ctx, "unexpected error while transforming processor config to yaml", "error", yamlErr)
}
m.updateDeployStatus(ctx, orgId, opamptypes.ElementTypeSamplingRules, version, opamptypes.DeployInitiated.StringValue(), "Deployment started", configHash, string(processorConfYaml))

View File

@@ -11,7 +11,6 @@ import (
"github.com/SigNoz/signoz-otel-collector/utils/fingerprint"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"go.uber.org/zap"
)
func (r *ClickHouseReader) GetQBFilterSuggestionsForLogs(
@@ -79,7 +78,7 @@ func (r *ClickHouseReader) GetQBFilterSuggestionsForLogs(
)
if err != nil {
// Do not fail the entire request if only example query generation fails
zap.L().Error("could not find attribute values for creating example query", zap.Error(err))
r.logger.ErrorContext(ctx, "could not find attribute values for creating example query", "error", err)
} else {
// add example queries for as many attributes as possible.
@@ -159,10 +158,7 @@ func (r *ClickHouseReader) getValuesForLogAttributes(
*/
if len(attributes) > 10 {
zap.L().Error(
"log attribute values requested for too many attributes. This can lead to slow and costly queries",
zap.Int("count", len(attributes)),
)
r.logger.ErrorContext(ctx, "log attribute values requested for too many attributes. This can lead to slow and costly queries", "count", len(attributes))
attributes = attributes[:10]
}
@@ -187,7 +183,7 @@ func (r *ClickHouseReader) getValuesForLogAttributes(
rows, err := r.db.Query(ctx, query, tagKeyQueryArgs...)
if err != nil {
zap.L().Error("couldn't query attrib values for suggestions", zap.Error(err))
r.logger.ErrorContext(ctx, "couldn't query attrib values for suggestions", "error", err)
return nil, model.InternalError(fmt.Errorf(
"couldn't query attrib values for suggestions: %w", err,
))

View File

@@ -2,17 +2,18 @@ package queryprogress
import (
"fmt"
"log/slog"
"sync"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/google/uuid"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
// tracks progress and manages subscriptions for all queries
type inMemoryQueryProgressTracker struct {
logger *slog.Logger
queries map[string]*queryTracker
lock sync.RWMutex
}
@@ -30,7 +31,7 @@ func (tracker *inMemoryQueryProgressTracker) ReportQueryStarted(
))
}
tracker.queries[queryId] = newQueryTracker(queryId)
tracker.queries[queryId] = newQueryTracker(tracker.logger, queryId)
return func() {
tracker.onQueryFinished(queryId)
@@ -93,6 +94,7 @@ func (tracker *inMemoryQueryProgressTracker) getQueryTracker(
// Tracks progress and manages subscriptions for a single query
type queryTracker struct {
logger *slog.Logger
queryId string
isFinished bool
@@ -102,8 +104,9 @@ type queryTracker struct {
lock sync.Mutex
}
func newQueryTracker(queryId string) *queryTracker {
func newQueryTracker(logger *slog.Logger, queryId string) *queryTracker {
return &queryTracker{
logger: logger,
queryId: queryId,
subscriptions: map[string]*queryProgressSubscription{},
}
@@ -114,10 +117,7 @@ func (qt *queryTracker) handleProgressUpdate(p *clickhouse.Progress) {
defer qt.lock.Unlock()
if qt.isFinished {
zap.L().Warn(
"received clickhouse progress update for finished query",
zap.String("queryId", qt.queryId), zap.Any("progress", p),
)
qt.logger.Warn("received clickhouse progress update for finished query", "queryId", qt.queryId, "progress", p)
return
}
@@ -146,7 +146,7 @@ func (qt *queryTracker) subscribe() (
}
subscriberId := uuid.NewString()
subscription := newQueryProgressSubscription()
subscription := newQueryProgressSubscription(qt.logger)
qt.subscriptions[subscriberId] = subscription
if qt.progress != nil {
@@ -163,11 +163,7 @@ func (qt *queryTracker) unsubscribe(subscriberId string) {
defer qt.lock.Unlock()
if qt.isFinished {
zap.L().Debug(
"received unsubscribe request after query finished",
zap.String("subscriber", subscriberId),
zap.String("queryId", qt.queryId),
)
qt.logger.Debug("received unsubscribe request after query finished", "subscriber", subscriberId, "queryId", qt.queryId)
return
}
@@ -183,10 +179,7 @@ func (qt *queryTracker) onFinished() {
defer qt.lock.Unlock()
if qt.isFinished {
zap.L().Warn(
"receiver query finish report after query finished",
zap.String("queryId", qt.queryId),
)
qt.logger.Warn("receiver query finish report after query finished", "queryId", qt.queryId)
return
}
@@ -199,15 +192,17 @@ func (qt *queryTracker) onFinished() {
}
type queryProgressSubscription struct {
logger *slog.Logger
ch chan model.QueryProgress
isClosed bool
lock sync.Mutex
}
func newQueryProgressSubscription() *queryProgressSubscription {
func newQueryProgressSubscription(logger *slog.Logger) *queryProgressSubscription {
ch := make(chan model.QueryProgress, 1000)
return &queryProgressSubscription{
ch: ch,
logger: logger,
ch: ch,
}
}
@@ -217,10 +212,7 @@ func (ch *queryProgressSubscription) send(progress model.QueryProgress) {
defer ch.lock.Unlock()
if ch.isClosed {
zap.L().Error(
"can't send query progress: channel already closed.",
zap.Any("progress", progress),
)
ch.logger.Error("can't send query progress: channel already closed.", "progress", progress)
return
}
@@ -228,12 +220,9 @@ func (ch *queryProgressSubscription) send(progress model.QueryProgress) {
// blocking while sending doesn't happen in the happy path
select {
case ch.ch <- progress:
zap.L().Debug("published query progress", zap.Any("progress", progress))
ch.logger.Debug("published query progress", "progress", progress)
default:
zap.L().Error(
"couldn't publish query progress. dropping update.",
zap.Any("progress", progress),
)
ch.logger.Error("couldn't publish query progress. dropping update.", "progress", progress)
}
}

View File

@@ -1,6 +1,8 @@
package queryprogress
import (
"log/slog"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/SigNoz/signoz/pkg/query-service/model"
)
@@ -21,10 +23,11 @@ type QueryProgressTracker interface {
SubscribeToQueryProgress(queryId string) (ch <-chan model.QueryProgress, unsubscribe func(), apiErr *model.ApiError)
}
func NewQueryProgressTracker() QueryProgressTracker {
func NewQueryProgressTracker(logger *slog.Logger) QueryProgressTracker {
// InMemory tracker is useful only for single replica query service setups.
// Multi replica setups must use a centralized store for tracking and subscribing to query progress
return &inMemoryQueryProgressTracker{
logger: logger,
queries: map[string]*queryTracker{},
}
}

View File

@@ -1,6 +1,7 @@
package queryprogress
import (
"log/slog"
"testing"
"time"
@@ -12,7 +13,7 @@ import (
func TestQueryProgressTracking(t *testing.T) {
require := require.New(t)
tracker := NewQueryProgressTracker()
tracker := NewQueryProgressTracker(slog.Default())
testQueryId := "test-query"

File diff suppressed because it is too large Load Diff

View File

@@ -13,6 +13,7 @@ import (
"github.com/SigNoz/signoz/pkg/queryparser"
"io"
"log/slog"
"math"
"net/http"
"regexp"
@@ -73,8 +74,6 @@ import (
"github.com/SigNoz/signoz/pkg/types/ruletypes"
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunneltypes"
"go.uber.org/zap"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations/messagingQueues/kafka"
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
@@ -97,6 +96,7 @@ func NewRouter() *mux.Router {
// APIHandler implements the query service public API
type APIHandler struct {
logger *slog.Logger
reader interfaces.Reader
ruleManager *rules.Manager
querier interfaces.Querier
@@ -212,6 +212,7 @@ func NewAPIHandler(opts APIHandlerOpts, config signoz.Config) (*APIHandler, erro
//quickFilterModule := quickfilter.NewAPI(opts.QuickFilterModule)
aH := &APIHandler{
logger: slog.Default(),
reader: opts.Reader,
temporalityMap: make(map[string]map[v3.Temporality]bool),
ruleManager: opts.RuleManager,
@@ -251,13 +252,13 @@ func NewAPIHandler(opts APIHandlerOpts, config signoz.Config) (*APIHandler, erro
// TODO(nitya): remote this in later for multitenancy.
orgs, err := opts.Signoz.Modules.OrgGetter.ListByOwnedKeyRange(context.Background())
if err != nil {
zap.L().Warn("unexpected error while fetching orgs while initializing base api handler", zap.Error(err))
aH.logger.Warn("unexpected error while fetching orgs while initializing base api handler", "error", err)
}
// if the first org with the first user is created then the setup is complete.
if len(orgs) == 1 {
count, err := opts.Signoz.Modules.UserGetter.CountByOrgID(context.Background(), orgs[0].ID)
if err != nil {
zap.L().Warn("unexpected error while fetch user count while initializing base api handler", zap.Error(err))
aH.logger.Warn("unexpected error while fetching user count while initializing base api handler", "error", err)
}
if count > 0 {
@@ -312,7 +313,7 @@ func RespondError(w http.ResponseWriter, apiErr model.BaseApiError, data interfa
Data: data,
})
if err != nil {
zap.L().Error("error marshalling json response", zap.Error(err))
slog.Error("error marshalling json response", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -344,7 +345,7 @@ func RespondError(w http.ResponseWriter, apiErr model.BaseApiError, data interfa
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
if n, err := w.Write(b); err != nil {
zap.L().Error("error writing response", zap.Int("bytesWritten", n), zap.Error(err))
slog.Error("error writing response", "bytes_written", n, "error", err)
}
}
@@ -356,7 +357,7 @@ func writeHttpResponse(w http.ResponseWriter, data interface{}) {
Data: data,
})
if err != nil {
zap.L().Error("error marshalling json response", zap.Error(err))
slog.Error("error marshalling json response", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -364,7 +365,7 @@ func writeHttpResponse(w http.ResponseWriter, data interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if n, err := w.Write(b); err != nil {
zap.L().Error("error writing response", zap.Int("bytesWritten", n), zap.Error(err))
slog.Error("error writing response", "bytes_written", n, "error", err)
}
}
@@ -936,14 +937,14 @@ func (aH *APIHandler) metaForLinks(ctx context.Context, rule *ruletypes.Gettable
}
keys = model.GetLogFieldsV3(ctx, params, logFields)
} else {
zap.L().Error("failed to get log fields using empty keys; the link might not work as expected", zap.Error(apiErr))
aH.logger.ErrorContext(ctx, "failed to get log fields using empty keys", "error", apiErr)
}
} else if rule.AlertType == ruletypes.AlertTypeTraces {
traceFields, err := aH.reader.GetSpanAttributeKeysByNames(ctx, logsv3.GetFieldNames(rule.PostableRule.RuleCondition.CompositeQuery))
if err == nil {
keys = traceFields
} else {
zap.L().Error("failed to get span attributes using empty keys; the link might not work as expected", zap.Error(err))
aH.logger.ErrorContext(ctx, "failed to get span attributes using empty keys", "error", err)
}
}
@@ -1276,14 +1277,14 @@ func (aH *APIHandler) List(rw http.ResponseWriter, r *http.Request) {
installedIntegrationDashboards, apiErr := aH.IntegrationsController.GetDashboardsForInstalledIntegrations(ctx, orgID)
if apiErr != nil {
zap.L().Error("failed to get dashboards for installed integrations", zap.Error(apiErr))
aH.logger.ErrorContext(ctx, "failed to get dashboards for installed integrations", "error", apiErr)
} else {
dashboards = append(dashboards, installedIntegrationDashboards...)
}
cloudIntegrationDashboards, apiErr := aH.CloudIntegrationsController.AvailableDashboards(ctx, orgID)
if apiErr != nil {
zap.L().Error("failed to get dashboards for cloud integrations", zap.Error(apiErr))
aH.logger.ErrorContext(ctx, "failed to get dashboards for cloud integrations", "error", apiErr)
} else {
dashboards = append(dashboards, cloudIntegrationDashboards...)
}
@@ -1325,7 +1326,7 @@ func (aH *APIHandler) testRule(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
zap.L().Error("Error in getting req body in test rule API", zap.Error(err))
aH.logger.ErrorContext(r.Context(), "error reading request body for test rule", "error", err)
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
@@ -1377,7 +1378,7 @@ func (aH *APIHandler) patchRule(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
zap.L().Error("error in getting req body of patch rule API\n", zap.Error(err))
aH.logger.ErrorContext(r.Context(), "error reading request body for patch rule", "error", err)
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
@@ -1407,7 +1408,7 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
zap.L().Error("error in getting req body of edit rule API", zap.Error(err))
aH.logger.ErrorContext(r.Context(), "error reading request body for edit rule", "error", err)
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
@@ -1432,7 +1433,7 @@ func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
if err != nil {
zap.L().Error("Error in getting req body for create rule API", zap.Error(err))
aH.logger.ErrorContext(r.Context(), "error reading request body for create rule", "error", err)
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
@@ -1456,7 +1457,7 @@ func (aH *APIHandler) queryRangeMetrics(w http.ResponseWriter, r *http.Request)
return
}
// zap.L().Info(query, apiError)
// TODO: add structured logging for query and apiError if needed
ctx := r.Context()
if to := r.FormValue("timeout"); to != "" {
@@ -1478,7 +1479,7 @@ func (aH *APIHandler) queryRangeMetrics(w http.ResponseWriter, r *http.Request)
}
if res.Err != nil {
zap.L().Error("error in query range metrics", zap.Error(res.Err))
aH.logger.ErrorContext(r.Context(), "error in query range metrics", "error", res.Err)
}
if res.Err != nil {
@@ -1511,7 +1512,7 @@ func (aH *APIHandler) queryMetrics(w http.ResponseWriter, r *http.Request) {
return
}
// zap.L().Info(query, apiError)
// TODO: add structured logging for query and apiError if needed
ctx := r.Context()
if to := r.FormValue("timeout"); to != "" {
@@ -1533,7 +1534,7 @@ func (aH *APIHandler) queryMetrics(w http.ResponseWriter, r *http.Request) {
}
if res.Err != nil {
zap.L().Error("error in query range metrics", zap.Error(res.Err))
aH.logger.ErrorContext(r.Context(), "error in query range metrics", "error", res.Err)
}
if res.Err != nil {
@@ -1636,7 +1637,7 @@ func (aH *APIHandler) getServicesTopLevelOps(w http.ResponseWriter, r *http.Requ
var params topLevelOpsParams
err := json.NewDecoder(r.Body).Decode(&params)
if err != nil {
zap.L().Error("Error in getting req body for get top operations API", zap.Error(err))
aH.logger.ErrorContext(r.Context(), "error reading request body for get top operations", "error", err)
}
if params.Service != "" {
@@ -2058,7 +2059,7 @@ func (aH *APIHandler) HandleError(w http.ResponseWriter, err error, statusCode i
return false
}
if statusCode == http.StatusInternalServerError {
zap.L().Error("HTTP handler, Internal Server Error", zap.Error(err))
aH.logger.Error("internal server error in http handler", "error", err)
}
structuredResp := structuredResponse{
Errors: []structuredError{
@@ -2152,7 +2153,7 @@ func (aH *APIHandler) onboardProducers(
) {
messagingQueue, apiErr := ParseKafkaQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse kafka queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -2160,7 +2161,7 @@ func (aH *APIHandler) onboardProducers(
chq, err := kafka.BuildClickHouseQuery(messagingQueue, kafka.KafkaQueue, "onboard_producers")
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build clickhouse query for onboard producers", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -2254,7 +2255,7 @@ func (aH *APIHandler) onboardConsumers(
) {
messagingQueue, apiErr := ParseKafkaQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse kafka queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -2262,7 +2263,7 @@ func (aH *APIHandler) onboardConsumers(
chq, err := kafka.BuildClickHouseQuery(messagingQueue, kafka.KafkaQueue, "onboard_consumers")
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build clickhouse query for onboard consumers", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -2401,7 +2402,7 @@ func (aH *APIHandler) onboardKafka(w http.ResponseWriter, r *http.Request) {
messagingQueue, apiErr := ParseKafkaQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse kafka queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -2409,7 +2410,7 @@ func (aH *APIHandler) onboardKafka(w http.ResponseWriter, r *http.Request) {
queryRangeParams, err := kafka.BuildBuilderQueriesKafkaOnboarding(messagingQueue)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build kafka onboarding queries", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -2511,19 +2512,19 @@ func (aH *APIHandler) getNetworkData(w http.ResponseWriter, r *http.Request) {
messagingQueue, apiErr := ParseKafkaQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse kafka queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := kafka.BuildQRParamsWithCache(messagingQueue, "throughput", attributeCache)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build query range params for throughput", "error", err)
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to validate query range params", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -2562,12 +2563,12 @@ func (aH *APIHandler) getNetworkData(w http.ResponseWriter, r *http.Request) {
queryRangeParams, err = kafka.BuildQRParamsWithCache(messagingQueue, "fetch-latency", attributeCache)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build query range params for fetch latency", "error", err)
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to validate query range params for fetch latency", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -2622,7 +2623,7 @@ func (aH *APIHandler) getProducerData(w http.ResponseWriter, r *http.Request) {
messagingQueue, apiErr := ParseKafkaQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse kafka queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -2636,13 +2637,13 @@ func (aH *APIHandler) getProducerData(w http.ResponseWriter, r *http.Request) {
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build query range params for producer", "error", err)
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to validate query range params for producer", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -2679,7 +2680,7 @@ func (aH *APIHandler) getConsumerData(w http.ResponseWriter, r *http.Request) {
messagingQueue, apiErr := ParseKafkaQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse kafka queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -2689,13 +2690,13 @@ func (aH *APIHandler) getConsumerData(w http.ResponseWriter, r *http.Request) {
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build query range params for consumer", "error", err)
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to validate query range params for consumer", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -2737,7 +2738,7 @@ func (aH *APIHandler) getPartitionOverviewLatencyData(w http.ResponseWriter, r *
messagingQueue, apiErr := ParseKafkaQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse kafka queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -2747,13 +2748,13 @@ func (aH *APIHandler) getPartitionOverviewLatencyData(w http.ResponseWriter, r *
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-topic-throughput", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build query range params for producer topic throughput", "error", err)
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to validate query range params for producer topic throughput", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -2795,7 +2796,7 @@ func (aH *APIHandler) getConsumerPartitionLatencyData(w http.ResponseWriter, r *
messagingQueue, apiErr := ParseKafkaQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse kafka queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -2805,13 +2806,13 @@ func (aH *APIHandler) getConsumerPartitionLatencyData(w http.ResponseWriter, r *
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer_partition_latency", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build query range params for consumer partition latency", "error", err)
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to validate query range params for consumer partition latency", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -2855,7 +2856,7 @@ func (aH *APIHandler) getProducerThroughputOverview(w http.ResponseWriter, r *ht
messagingQueue, apiErr := ParseKafkaQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse kafka queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -2866,13 +2867,13 @@ func (aH *APIHandler) getProducerThroughputOverview(w http.ResponseWriter, r *ht
producerQueryRangeParams, err := kafka.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build query range params for producer throughput overview", "error", err)
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(producerQueryRangeParams); err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to validate query range params for producer throughput overview", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -2908,12 +2909,12 @@ func (aH *APIHandler) getProducerThroughputOverview(w http.ResponseWriter, r *ht
queryRangeParams, err := kafka.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-byte-rate", attributeCache)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build query range params for producer throughput byte rate", "error", err)
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to validate query range params for producer throughput byte rate", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -2971,7 +2972,7 @@ func (aH *APIHandler) getProducerThroughputDetails(w http.ResponseWriter, r *htt
messagingQueue, apiErr := ParseKafkaQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse kafka queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -2981,13 +2982,13 @@ func (aH *APIHandler) getProducerThroughputDetails(w http.ResponseWriter, r *htt
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-throughput-details", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build query range params for producer throughput details", "error", err)
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to validate query range params for producer throughput details", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -3029,7 +3030,7 @@ func (aH *APIHandler) getConsumerThroughputOverview(w http.ResponseWriter, r *ht
messagingQueue, apiErr := ParseKafkaQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse kafka queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -3039,13 +3040,13 @@ func (aH *APIHandler) getConsumerThroughputOverview(w http.ResponseWriter, r *ht
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer-throughput-overview", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build query range params for consumer throughput overview", "error", err)
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to validate query range params for consumer throughput overview", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -3087,7 +3088,7 @@ func (aH *APIHandler) getConsumerThroughputDetails(w http.ResponseWriter, r *htt
messagingQueue, apiErr := ParseKafkaQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse kafka queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -3097,13 +3098,13 @@ func (aH *APIHandler) getConsumerThroughputDetails(w http.ResponseWriter, r *htt
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "consumer-throughput-details", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build query range params for consumer throughput details", "error", err)
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to validate query range params for consumer throughput details", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -3148,7 +3149,7 @@ func (aH *APIHandler) getProducerConsumerEval(w http.ResponseWriter, r *http.Req
messagingQueue, apiErr := ParseKafkaQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse kafka queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -3158,7 +3159,7 @@ func (aH *APIHandler) getProducerConsumerEval(w http.ResponseWriter, r *http.Req
queryRangeParams, err := kafka.BuildQueryRangeParams(messagingQueue, "producer-consumer-eval", kafkaSpanEval)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build query range params for producer consumer eval", "error", err)
RespondError(w, &model.ApiError{
Typ: model.ErrorBadData,
Err: err,
@@ -3167,7 +3168,7 @@ func (aH *APIHandler) getProducerConsumerEval(w http.ResponseWriter, r *http.Req
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to validate query range params for producer consumer eval", "error", err)
RespondError(w, apiErr, nil)
return
}
@@ -4255,7 +4256,7 @@ func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
postable []pipelinetypes.PostablePipeline,
) (*logparsingpipeline.PipelinesResponse, error) {
if len(postable) == 0 {
zap.L().Warn("found no pipelines in the http request, this will delete all the pipelines")
aH.logger.WarnContext(r.Context(), "found no pipelines in the http request, this will delete all the pipelines")
}
err := aH.LogsParsingPipelineController.ValidatePipelines(ctx, postable)
@@ -4453,7 +4454,7 @@ func (aH *APIHandler) QueryRangeV3Format(w http.ResponseWriter, r *http.Request)
queryRangeParams, apiErrorObj := ParseQueryRangeParams(r)
if apiErrorObj != nil {
zap.L().Error(apiErrorObj.Err.Error())
aH.logger.ErrorContext(r.Context(), "error parsing query range params", "error", apiErrorObj.Err)
RespondError(w, apiErrorObj, nil)
return
}
@@ -4515,13 +4516,13 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
// check if traceID is used as filter (with equal/similar operator) in traces query if yes add timestamp filter to queryRange params
isUsed, traceIDs := tracesV3.TraceIdFilterUsedWithEqual(queryRangeParams)
if isUsed && len(traceIDs) > 0 {
zap.L().Debug("traceID used as filter in traces query")
aH.logger.DebugContext(ctx, "trace_id used as filter in traces query")
// query signoz_spans table with traceID to get min and max timestamp
min, max, err := aH.reader.GetMinAndMaxTimestampForTraceID(ctx, traceIDs)
if err == nil {
// add timestamp filter to queryRange params
tracesV3.AddTimestampFilters(min, max, queryRangeParams)
zap.L().Debug("post adding timestamp filter in traces query", zap.Any("queryRangeParams", queryRangeParams))
aH.logger.DebugContext(ctx, "post adding timestamp filter in traces query", "query_range_params", queryRangeParams)
}
}
}
@@ -4532,9 +4533,8 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
onQueryFinished, apiErr := aH.reader.ReportQueryStartForProgressTracking(queryIdHeader)
if apiErr != nil {
zap.L().Error(
"couldn't report query start for progress tracking",
zap.String("queryId", queryIdHeader), zap.Error(apiErr),
aH.logger.ErrorContext(ctx, "failed to report query start for progress tracking",
"query_id", queryIdHeader, "error", apiErr,
)
} else {
@@ -4709,7 +4709,7 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
queryRangeParams, apiErrorObj := ParseQueryRangeParams(r)
if apiErrorObj != nil {
zap.L().Error("error parsing metric query range params", zap.Error(apiErrorObj.Err))
aH.logger.ErrorContext(r.Context(), "error parsing metric query range params", "error", apiErrorObj.Err)
RespondError(w, apiErrorObj, nil)
return
}
@@ -4717,7 +4717,7 @@ func (aH *APIHandler) QueryRangeV3(w http.ResponseWriter, r *http.Request) {
// add temporality for each metric
temporalityErr := aH.PopulateTemporality(r.Context(), orgID, queryRangeParams)
if temporalityErr != nil {
zap.L().Error("Error while adding temporality for metrics", zap.Error(temporalityErr))
aH.logger.ErrorContext(r.Context(), "error adding temporality for metrics", "error", temporalityErr)
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
return
}
@@ -4761,9 +4761,8 @@ func (aH *APIHandler) GetQueryProgressUpdates(w http.ResponseWriter, r *http.Req
progressCh, unsubscribe, apiErr := aH.reader.SubscribeToQueryProgress(queryId)
if apiErr != nil {
// Shouldn't happen unless query progress requested after query finished
zap.L().Warn(
"couldn't subscribe to query progress",
zap.String("queryId", queryId), zap.Any("error", apiErr),
aH.logger.WarnContext(r.Context(), "failed to subscribe to query progress",
"query_id", queryId, "error", apiErr,
)
return
}
@@ -4772,25 +4771,22 @@ func (aH *APIHandler) GetQueryProgressUpdates(w http.ResponseWriter, r *http.Req
for queryProgress := range progressCh {
msg, err := json.Marshal(queryProgress)
if err != nil {
zap.L().Error(
"failed to serialize progress message",
zap.String("queryId", queryId), zap.Any("progress", queryProgress), zap.Error(err),
aH.logger.ErrorContext(r.Context(), "failed to serialize progress message",
"query_id", queryId, "progress", queryProgress, "error", err,
)
continue
}
err = c.WriteMessage(websocket.TextMessage, msg)
if err != nil {
zap.L().Error(
"failed to write progress msg to websocket",
zap.String("queryId", queryId), zap.String("msg", string(msg)), zap.Error(err),
aH.logger.ErrorContext(r.Context(), "failed to write progress message to websocket",
"query_id", queryId, "msg", string(msg), "error", err,
)
break
} else {
zap.L().Debug(
"wrote progress msg to websocket",
zap.String("queryId", queryId), zap.String("msg", string(msg)), zap.Error(err),
aH.logger.DebugContext(r.Context(), "wrote progress message to websocket",
"query_id", queryId, "msg", string(msg),
)
}
}
@@ -4874,13 +4870,13 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que
// check if traceID is used as filter (with equal/similar operator) in traces query if yes add timestamp filter to queryRange params
isUsed, traceIDs := tracesV3.TraceIdFilterUsedWithEqual(queryRangeParams)
if isUsed && len(traceIDs) > 0 {
zap.L().Debug("traceID used as filter in traces query")
aH.logger.DebugContext(ctx, "trace_id used as filter in traces query")
// query signoz_spans table with traceID to get min and max timestamp
min, max, err := aH.reader.GetMinAndMaxTimestampForTraceID(ctx, traceIDs)
if err == nil {
// add timestamp filter to queryRange params
tracesV3.AddTimestampFilters(min, max, queryRangeParams)
zap.L().Debug("post adding timestamp filter in traces query", zap.Any("queryRangeParams", queryRangeParams))
aH.logger.DebugContext(ctx, "post adding timestamp filter in traces query", "query_range_params", queryRangeParams)
}
}
}
@@ -4932,7 +4928,7 @@ func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) {
queryRangeParams, apiErrorObj := ParseQueryRangeParams(r)
if apiErrorObj != nil {
zap.L().Error("error parsing metric query range params", zap.Error(apiErrorObj.Err))
aH.logger.ErrorContext(r.Context(), "error parsing metric query range params", "error", apiErrorObj.Err)
RespondError(w, apiErrorObj, nil)
return
}
@@ -4941,7 +4937,7 @@ func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) {
// add temporality for each metric
temporalityErr := aH.PopulateTemporality(r.Context(), orgID, queryRangeParams)
if temporalityErr != nil {
zap.L().Error("Error while adding temporality for metrics", zap.Error(temporalityErr))
aH.logger.ErrorContext(r.Context(), "error adding temporality for metrics", "error", temporalityErr)
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil)
return
}
@@ -4990,7 +4986,7 @@ func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) {
queueListRequest, apiErr := ParseQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
aH.logger.ErrorContext(r.Context(), "failed to parse queue body", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -4998,7 +4994,7 @@ func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) {
chq, err := queues2.BuildOverviewQuery(queueListRequest)
if err != nil {
zap.L().Error(err.Error())
aH.logger.ErrorContext(r.Context(), "failed to build queue overview query", "error", err)
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
@@ -5029,7 +5025,7 @@ func (aH *APIHandler) getDomainList(w http.ResponseWriter, r *http.Request) {
// Parse the request body to get third-party query parameters
thirdPartyQueryRequest, apiErr := ParseRequestBody(r)
if apiErr != nil {
zap.L().Error("Failed to parse request body", zap.Error(apiErr))
aH.logger.ErrorContext(r.Context(), "failed to parse request body", "error", apiErr)
render.Error(w, errorsV2.New(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, apiErr.Error()))
return
}
@@ -5037,7 +5033,7 @@ func (aH *APIHandler) getDomainList(w http.ResponseWriter, r *http.Request) {
// Build the v5 query range request for domain listing
queryRangeRequest, err := thirdpartyapi.BuildDomainList(thirdPartyQueryRequest)
if err != nil {
zap.L().Error("Failed to build domain list query", zap.Error(err))
aH.logger.ErrorContext(r.Context(), "failed to build domain list query", "error", err)
apiErrObj := errorsV2.New(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, err.Error())
render.Error(w, apiErrObj)
return
@@ -5050,7 +5046,7 @@ func (aH *APIHandler) getDomainList(w http.ResponseWriter, r *http.Request) {
// Execute the query using the v5 querier
result, err := aH.Signoz.Querier.QueryRange(ctx, orgID, queryRangeRequest)
if err != nil {
zap.L().Error("Query execution failed", zap.Error(err))
aH.logger.ErrorContext(r.Context(), "query execution failed", "error", err)
apiErrObj := errorsV2.New(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, err.Error())
render.Error(w, apiErrObj)
return
@@ -5089,7 +5085,7 @@ func (aH *APIHandler) getDomainInfo(w http.ResponseWriter, r *http.Request) {
// Parse the request body to get third-party query parameters
thirdPartyQueryRequest, apiErr := ParseRequestBody(r)
if apiErr != nil {
zap.L().Error("Failed to parse request body", zap.Error(apiErr))
aH.logger.ErrorContext(r.Context(), "failed to parse request body", "error", apiErr)
render.Error(w, errorsV2.New(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, apiErr.Error()))
return
}
@@ -5097,7 +5093,7 @@ func (aH *APIHandler) getDomainInfo(w http.ResponseWriter, r *http.Request) {
// Build the v5 query range request for domain info
queryRangeRequest, err := thirdpartyapi.BuildDomainInfo(thirdPartyQueryRequest)
if err != nil {
zap.L().Error("Failed to build domain info query", zap.Error(err))
aH.logger.ErrorContext(r.Context(), "failed to build domain info query", "error", err)
apiErrObj := errorsV2.New(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, err.Error())
render.Error(w, apiErrObj)
return
@@ -5110,7 +5106,7 @@ func (aH *APIHandler) getDomainInfo(w http.ResponseWriter, r *http.Request) {
// Execute the query using the v5 querier
result, err := aH.Signoz.Querier.QueryRange(ctx, orgID, queryRangeRequest)
if err != nil {
zap.L().Error("Query execution failed", zap.Error(err))
aH.logger.ErrorContext(r.Context(), "query execution failed", "error", err)
apiErrObj := errorsV2.New(errorsV2.TypeInvalidInput, errorsV2.CodeInvalidInput, err.Error())
render.Error(w, apiErrObj)
return

View File

@@ -17,9 +17,10 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
"log/slog"
"github.com/SigNoz/signoz/pkg/types/instrumentationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)
@@ -427,7 +428,7 @@ func (h *HostsRepo) GetHostList(ctx context.Context, orgID valuer.UUID, req mode
step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60))
if step <= 0 {
zap.L().Error("step is less than or equal to 0", zap.Int64("step", step))
slog.ErrorContext(ctx, "step is less than or equal to 0", "step", step)
return resp, errors.New("step is less than or equal to 0")
}

View File

@@ -8,10 +8,11 @@ import (
"gopkg.in/yaml.v3"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"go.uber.org/zap"
)
var lockLogsPipelineSpec sync.RWMutex
@@ -154,14 +155,14 @@ func buildCollectorPipelineProcessorsList(
func checkDuplicateString(pipeline []string) bool {
exists := make(map[string]bool, len(pipeline))
zap.L().Debug("checking duplicate processors in the pipeline:", zap.Any("pipeline", pipeline))
slog.Debug("checking duplicate processors in the pipeline", "pipeline", pipeline)
for _, processor := range pipeline {
name := processor
if _, ok := exists[name]; ok {
zap.L().Error(
slog.Error(
"duplicate processor name detected in generated collector config for log pipelines",
zap.String("processor", processor),
zap.Any("pipeline", pipeline),
"processor", processor,
"pipeline", pipeline,
)
return true
}

View File

@@ -21,7 +21,7 @@ import (
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"go.uber.org/zap"
"log/slog"
)
var (
@@ -175,7 +175,7 @@ func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion(
if version >= 0 {
savedPipelines, err := ic.getPipelinesByVersion(ctx, orgID.String(), version)
if err != nil {
zap.L().Error("failed to get pipelines for version", zap.Int("version", version), zap.Error(err))
slog.ErrorContext(ctx, "failed to get pipelines for version", "version", version, "error", err)
return nil, err
}
result = savedPipelines
@@ -227,7 +227,7 @@ func (ic *LogParsingPipelineController) GetPipelinesByVersion(
) (*PipelinesResponse, error) {
pipelines, err := ic.getEffectivePipelinesByVersion(ctx, orgId, version)
if err != nil {
zap.L().Error("failed to get pipelines for version", zap.Int("version", version), zap.Error(err))
slog.ErrorContext(ctx, "failed to get pipelines for version", "version", version, "error", err)
return nil, err
}
@@ -235,7 +235,7 @@ func (ic *LogParsingPipelineController) GetPipelinesByVersion(
if version >= 0 {
cv, err := agentConf.GetConfigVersion(ctx, orgId, opamptypes.ElementTypeLogPipelines, version)
if err != nil {
zap.L().Error("failed to get config for version", zap.Int("version", version), zap.Error(err))
slog.ErrorContext(ctx, "failed to get config for version", "version", version, "error", err)
return nil, err
}
configVersion = cv

View File

@@ -6,6 +6,8 @@ import (
"fmt"
"time"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
@@ -13,7 +15,6 @@ import (
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
)
// Repo handles DDL and DML ops on ingestion pipeline
@@ -80,7 +81,7 @@ func (r *Repo) insertPipeline(
Model(&insertRow.StoreablePipeline).
Exec(ctx)
if err != nil {
zap.L().Error("error in inserting pipeline data", zap.Error(err))
slog.ErrorContext(ctx, "error in inserting pipeline data", "error", err)
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to insert pipeline")
}
@@ -136,12 +137,12 @@ func (r *Repo) GetPipeline(
Where("org_id = ?", orgID).
Scan(ctx)
if err != nil {
zap.L().Error("failed to get ingestion pipeline from db", zap.Error(err))
slog.ErrorContext(ctx, "failed to get ingestion pipeline from db", "error", err)
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to get ingestion pipeline from db")
}
if len(storablePipelines) == 0 {
zap.L().Warn("No row found for ingestion pipeline id", zap.String("id", id))
slog.WarnContext(ctx, "no row found for ingestion pipeline id", "id", id)
return nil, errors.NewNotFoundf(errors.CodeNotFound, "no row found for ingestion pipeline id %v", id)
}
@@ -149,11 +150,11 @@ func (r *Repo) GetPipeline(
gettablePipeline := pipelinetypes.GettablePipeline{}
gettablePipeline.StoreablePipeline = storablePipelines[0]
if err := gettablePipeline.ParseRawConfig(); err != nil {
zap.L().Error("invalid pipeline config found", zap.String("id", id), zap.Error(err))
slog.ErrorContext(ctx, "invalid pipeline config found", "id", id, "error", err)
return nil, err
}
if err := gettablePipeline.ParseFilter(); err != nil {
zap.L().Error("invalid pipeline filter found", zap.String("id", id), zap.Error(err))
slog.ErrorContext(ctx, "invalid pipeline filter found", "id", id, "error", err)
return nil, err
}
return &gettablePipeline, nil

View File

@@ -2,12 +2,12 @@ package metrics
import (
"fmt"
"log/slog"
"reflect"
"strconv"
"strings"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"go.uber.org/zap"
)
func AddMetricValueFilter(mq *v3.BuilderQuery) *v3.MetricValueFilter {
@@ -69,7 +69,7 @@ func AddMetricValueFilter(mq *v3.BuilderQuery) *v3.MetricValueFilter {
case string:
numericValue, err := strconv.ParseFloat(v, 64)
if err != nil {
zap.L().Warn("invalid type for metric value filter, ignoring", zap.Any("type", reflect.TypeOf(v)), zap.String("value", v))
slog.Warn("invalid type for metric value filter, ignoring", "type", reflect.TypeOf(v), "value", v)
continue
}
metricValueFilter = &v3.MetricValueFilter{
@@ -111,11 +111,11 @@ func FormattedValue(v interface{}) string {
case int, float32, float64, bool:
return strings.Join(strings.Fields(fmt.Sprint(x)), ",")
default:
zap.L().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x[0])))
slog.Error("invalid type for formatted value", "type", reflect.TypeOf(x[0]))
return ""
}
default:
zap.L().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x)))
slog.Error("invalid type for formatted value", "type", reflect.TypeOf(x))
return ""
}
}
@@ -144,11 +144,11 @@ func PromFormattedValue(v interface{}) string {
}
return strings.Join(str, "|")
default:
zap.L().Error("invalid type for prom formatted value", zap.Any("type", reflect.TypeOf(x[0])))
slog.Error("invalid type for prom formatted value", "type", reflect.TypeOf(x[0]))
return ""
}
default:
zap.L().Error("invalid type for prom formatted value", zap.Any("type", reflect.TypeOf(x)))
slog.Error("invalid type for prom formatted value", "type", reflect.TypeOf(x))
return ""
}
}

View File

@@ -8,7 +8,7 @@ import (
"strings"
"time"
"go.uber.org/zap"
"log/slog"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
@@ -173,14 +173,14 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, orgID val
if data != nil {
jsonData, err := json.Marshal(data)
if err != nil {
zap.L().Error("Error marshalling data:", zap.Error(err))
slog.Error("error marshalling data", "error", err)
return &model.ApiError{Typ: "MarshallingErr", Err: err}
}
var dashboards map[string][]metrics_explorer.Dashboard
err = json.Unmarshal(jsonData, &dashboards)
if err != nil {
zap.L().Error("Error unmarshalling data:", zap.Error(err))
slog.Error("error unmarshalling data", "error", err)
return &model.ApiError{Typ: "UnMarshallingErr", Err: err}
}
if _, ok := dashboards[metricName]; ok {
@@ -264,7 +264,7 @@ func (receiver *SummaryService) GetRelatedMetrics(ctx context.Context, params *m
if err != nil {
// If we hit a deadline exceeded error, proceed with only name similarity
if errors.Is(err.Err, context.DeadlineExceeded) {
zap.L().Warn("Attribute similarity calculation timed out, proceeding with name similarity only")
slog.Warn("attribute similarity calculation timed out, proceeding with name similarity only")
attrSimilarityScores = make(map[string]metrics_explorer.RelatedMetricsScore)
} else {
return nil, err
@@ -350,12 +350,12 @@ func (receiver *SummaryService) GetRelatedMetrics(ctx context.Context, params *m
if names != nil {
jsonData, err := json.Marshal(names)
if err != nil {
zap.L().Error("Error marshalling dashboard data", zap.Error(err))
slog.Error("error marshalling dashboard data", "error", err)
return &model.ApiError{Typ: "MarshallingErr", Err: err}
}
err = json.Unmarshal(jsonData, &dashboardsRelatedData)
if err != nil {
zap.L().Error("Error unmarshalling dashboard data", zap.Error(err))
slog.Error("error unmarshalling dashboard data", "error", err)
return &model.ApiError{Typ: "UnMarshallingErr", Err: err}
}
}

View File

@@ -3,6 +3,7 @@ package opamp
import (
"context"
"crypto/sha256"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
model "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
@@ -10,7 +11,6 @@ import (
"github.com/knadh/koanf/parsers/yaml"
"github.com/open-telemetry/opamp-go/protobufs"
"go.opentelemetry.io/collector/confmap"
"go.uber.org/zap"
)
var (
@@ -29,10 +29,10 @@ func UpsertControlProcessors(ctx context.Context, signal string,
// AddToTracePipeline() or RemoveFromTracesPipeline() prior to calling
// this method
zap.L().Debug("initiating ingestion rules deployment config", zap.String("signal", signal), zap.Any("processors", processors))
slog.Debug("initiating ingestion rules deployment config", "signal", signal, "processors", processors)
if signal != string(Metrics) && signal != string(Traces) {
zap.L().Error("received invalid signal int UpsertControlProcessors", zap.String("signal", signal))
slog.Error("received invalid signal in UpsertControlProcessors", "signal", signal)
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "signal not supported in ingestion rules: %s", signal)
}
@@ -46,14 +46,14 @@ func UpsertControlProcessors(ctx context.Context, signal string,
}
if len(agents) > 1 && signal == string(Traces) {
zap.L().Debug("found multiple agents. this feature is not supported for traces pipeline (sampling rules)")
slog.Debug("found multiple agents, this feature is not supported for traces pipeline (sampling rules)")
return "", errors.NewInvalidInputf(CodeMultipleAgentsNotSupported, "multiple agents not supported in sampling rules")
}
hash := ""
for _, agent := range agents {
agenthash, err := addIngestionControlToAgent(agent, signal, processors, false)
if err != nil {
zap.L().Error("failed to push ingestion rules config to agent", zap.String("agentID", agent.AgentID), zap.Error(err))
slog.Error("failed to push ingestion rules config to agent", "agent_id", agent.AgentID, "error", err)
continue
}
@@ -82,7 +82,7 @@ func addIngestionControlToAgent(agent *model.Agent, signal string, processors ma
// add ingestion control spec
err = makeIngestionControlSpec(agentConf, Signal(signal), processors)
if err != nil {
zap.L().Error("failed to prepare ingestion control processors for agent", zap.String("agentID", agent.AgentID), zap.Error(err))
slog.Error("failed to prepare ingestion control processors for agent", "agent_id", agent.AgentID, "error", err)
return confHash, err
}
@@ -92,7 +92,7 @@ func addIngestionControlToAgent(agent *model.Agent, signal string, processors ma
return confHash, err
}
zap.L().Debug("sending new config", zap.String("config", string(configR)))
slog.Debug("sending new config", "config", string(configR))
hash := sha256.New()
_, err = hash.Write(configR)
if err != nil {
@@ -133,7 +133,7 @@ func makeIngestionControlSpec(agentConf *confmap.Conf, signal Signal, processors
// merge tracesPipelinePlan with current pipeline
mergedPipeline, err := buildPipeline(signal, currentPipeline)
if err != nil {
zap.L().Error("failed to build pipeline", zap.String("signal", string(signal)), zap.Error(err))
slog.Error("failed to build pipeline", "signal", string(signal), "error", err)
return err
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/open-telemetry/opamp-go/protobufs"
@@ -305,7 +304,7 @@ func (agent *Agent) processStatusUpdate(
func (agent *Agent) updateRemoteConfig(configProvider AgentConfigProvider) bool {
recommendedConfig, confId, err := configProvider.RecommendAgentConfig(agent.OrgID, []byte(agent.Config))
if err != nil {
zap.L().Error("could not generate config recommendation for agent", zap.String("agentID", agent.AgentID), zap.Error(err))
agent.logger.Error("could not generate config recommendation for agent", "agent_id", agent.AgentID, "error", err)
return false
}
@@ -322,7 +321,7 @@ func (agent *Agent) updateRemoteConfig(configProvider AgentConfigProvider) bool
if len(confId) < 1 {
// Should never happen. Handle gracefully if it does by some chance.
zap.L().Error("config provider recommended a config with empty confId. Using content hash for configId")
agent.logger.Error("config provider recommended a config with empty conf_id, using content hash for config_id")
hash := sha256.New()
for k, v := range cfg.Config.ConfigMap {

View File

@@ -14,7 +14,6 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server/types"
"github.com/pkg/errors"
"go.uber.org/zap"
)
var AllAgents = Agents{
@@ -135,8 +134,8 @@ func (agents *Agents) RecommendLatestConfigToAll(
// Recommendation is same as current config
if string(newConfig) == agent.Config {
zap.L().Info(
"Recommended config same as current effective config for agent", zap.String("agentID", agent.AgentID),
agents.logger.Info(
"recommended config same as current effective config for agent", "agent_id", agent.AgentID,
)
return nil
}

View File

@@ -2,6 +2,7 @@ package opamp
import (
"context"
"log/slog"
"net/http"
"time"
@@ -11,8 +12,6 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server"
"github.com/open-telemetry/opamp-go/server/types"
"go.uber.org/zap"
)
var opAmpServer *Server
@@ -20,6 +19,7 @@ var opAmpServer *Server
type Server struct {
server server.OpAMPServer
agents *model.Agents
logger *slog.Logger
agentConfigProvider AgentConfigProvider
@@ -43,6 +43,7 @@ func InitializeServer(
opAmpServer = &Server{
agents: agents,
agentConfigProvider: agentConfigProvider,
logger: instrumentation.Logger(),
}
opAmpServer.server = server.New(wrappedLogger(instrumentation.Logger()))
return opAmpServer
@@ -70,8 +71,8 @@ func (srv *Server) Start(listener string) error {
unsubscribe := srv.agentConfigProvider.SubscribeToConfigUpdates(func() {
err := srv.agents.RecommendLatestConfigToAll(srv.agentConfigProvider)
if err != nil {
zap.L().Error(
"could not roll out latest config recommendation to connected agents", zap.Error(err),
srv.logger.Error(
"could not roll out latest config recommendation to connected agents", "error", err,
)
}
})
@@ -114,7 +115,7 @@ func (srv *Server) OnMessage(ctx context.Context, conn types.Connection, msg *pr
// agents sends the effective config when we processStatusUpdate.
agent, created, err := srv.agents.FindOrCreateAgent(agentID.String(), conn, orgID)
if err != nil {
zap.L().Error("Failed to find or create agent", zap.String("agentID", agentID.String()), zap.Error(err))
srv.logger.Error("failed to find or create agent", "agent_id", agentID.String(), "error", err)
// Return error response according to OpAMP protocol
return &protobufs.ServerToAgent{
@@ -134,10 +135,10 @@ func (srv *Server) OnMessage(ctx context.Context, conn types.Connection, msg *pr
if created {
agent.CanLB = model.ExtractLbFlag(msg.AgentDescription)
zap.L().Debug(
"New agent added", zap.Bool("canLb", agent.CanLB),
zap.String("agentID", agent.AgentID),
zap.Any("status", agent.Status),
srv.logger.Debug(
"new agent added", "can_lb", agent.CanLB,
"agent_id", agent.AgentID,
"status", agent.Status,
)
}
@@ -158,7 +159,7 @@ func Ready() bool {
return false
}
if opAmpServer.agents.Count() == 0 {
zap.L().Warn("no agents available, all agent config requests will be rejected")
slog.Warn("no agents available, all agent config requests will be rejected")
return false
}
return true

View File

@@ -2,9 +2,8 @@ package opamp
import (
"fmt"
"log/slog"
"sync"
"go.uber.org/zap"
)
var lockTracesPipelineSpec sync.RWMutex
@@ -89,7 +88,7 @@ func RemoveFromMetricsPipelineSpec(name string) {
func checkDuplicates(pipeline []interface{}) bool {
exists := make(map[string]bool, len(pipeline))
zap.L().Debug("checking duplicate processors in the pipeline", zap.Any("pipeline", pipeline))
slog.Debug("checking duplicate processors in the pipeline", "pipeline", pipeline)
for _, processor := range pipeline {
name := processor.(string)
if _, ok := exists[name]; ok {
@@ -149,7 +148,7 @@ func buildPipeline(signal Signal, current []interface{}) ([]interface{}, error)
currentPos := loc + inserts
// if disabled then remove from the pipeline
if !m.Enabled {
zap.L().Debug("build_pipeline: found a disabled item, removing from pipeline at position", zap.Int("position", currentPos-1), zap.String("processor", m.Name))
slog.Debug("build_pipeline: found a disabled item, removing from pipeline at position", "position", currentPos-1, "processor", m.Name)
if currentPos-1 <= 0 {
pipeline = pipeline[currentPos+1:]
} else {
@@ -170,10 +169,10 @@ func buildPipeline(signal Signal, current []interface{}) ([]interface{}, error)
// right after last matched processsor (e.g. insert filters after tail_sampling for existing list of [batch, tail_sampling])
if lastMatched <= 0 {
zap.L().Debug("build_pipeline: found a new item to be inserted, inserting at position 0", zap.String("processor", m.Name))
slog.Debug("build_pipeline: found a new item to be inserted, inserting at position 0", "processor", m.Name)
pipeline = append([]interface{}{m.Name}, pipeline[lastMatched+1:]...)
} else {
zap.L().Debug("build_pipeline: found a new item to be inserted, inserting at position", zap.Int("position", lastMatched), zap.String("processor", m.Name))
slog.Debug("build_pipeline: found a new item to be inserted, inserting at position", "position", lastMatched, "processor", m.Name)
prior := make([]interface{}, len(pipeline[:lastMatched]))
next := make([]interface{}, len(pipeline[lastMatched:]))
copy(prior, pipeline[:lastMatched])

View File

@@ -19,10 +19,11 @@ import (
"github.com/SigNoz/govaluate"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations/messagingQueues/kafka"
queues2 "github.com/SigNoz/signoz/pkg/query-service/app/integrations/messagingQueues/queues"
"log/slog"
"github.com/gorilla/mux"
promModel "github.com/prometheus/common/model"
"go.uber.org/multierr"
"go.uber.org/zap"
errorsV2 "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/app/metrics"
@@ -740,9 +741,9 @@ func chTransformQuery(query string, variables map[string]interface{}) {
transformer := chVariables.NewQueryTransformer(query, varsForTransform)
transformedQuery, err := transformer.Transform()
if err != nil {
zap.L().Warn("failed to transform clickhouse query", zap.String("query", query), zap.Error(err))
slog.Warn("failed to transform clickhouse query", "query", query, "error", err)
}
zap.L().Info("transformed clickhouse query", zap.String("transformedQuery", transformedQuery), zap.String("originalQuery", query))
slog.Info("transformed clickhouse query", "transformed_query", transformedQuery, "original_query", query)
}
func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiError) {

View File

@@ -15,7 +15,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
)
func prepareLogsQuery(
@@ -97,7 +96,7 @@ func (q *querier) runBuilderQuery(
var query string
var err error
if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for logs query", zap.String("queryName", queryName), zap.Int64("start", start), zap.Int64("end", end), zap.Int64("step", builderQuery.StepInterval), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
q.logger.InfoContext(ctx, "skipping cache for logs query", "query_name", queryName, "start", start, "end", end, "step", builderQuery.StepInterval, "no_cache", params.NoCache, "cache_key", cacheKeys[queryName])
query, err = prepareLogsQuery(ctx, start, end, builderQuery, params)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
@@ -109,7 +108,7 @@ func (q *querier) runBuilderQuery(
}
misses := q.queryCache.FindMissingTimeRanges(orgID, start, end, builderQuery.StepInterval, cacheKeys[queryName])
zap.L().Info("cache misses for logs query", zap.Any("misses", misses))
q.logger.InfoContext(ctx, "cache misses for logs query", "misses", misses)
missedSeries := make([]querycache.CachedSeriesData, 0)
filteredMissedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
@@ -217,7 +216,7 @@ func (q *querier) runBuilderQuery(
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.
if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for metrics query", zap.String("queryName", queryName), zap.Int64("start", start), zap.Int64("end", end), zap.Int64("step", builderQuery.StepInterval), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
q.logger.InfoContext(ctx, "skipping cache for metrics query", "query_name", queryName, "start", start, "end", end, "step", builderQuery.StepInterval, "no_cache", params.NoCache, "cache_key", cacheKeys[queryName])
query, err := metricsV3.PrepareMetricQuery(start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{})
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
@@ -230,7 +229,7 @@ func (q *querier) runBuilderQuery(
cacheKey := cacheKeys[queryName]
misses := q.queryCache.FindMissingTimeRanges(orgID, start, end, builderQuery.StepInterval, cacheKey)
zap.L().Info("cache misses for metrics query", zap.Any("misses", misses))
q.logger.InfoContext(ctx, "cache misses for metrics query", "misses", misses)
missedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
query, err := metricsV3.PrepareMetricQuery(
@@ -297,7 +296,7 @@ func (q *querier) runBuilderExpression(
}
if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for expression query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
q.logger.InfoContext(ctx, "skipping cache for expression query", "query_name", queryName, "start", params.Start, "end", params.End, "step", params.Step, "no_cache", params.NoCache, "cache_key", cacheKeys[queryName])
query := queries[queryName]
series, err := q.execClickHouseQuery(ctx, query)
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
@@ -307,7 +306,7 @@ func (q *querier) runBuilderExpression(
cacheKey := cacheKeys[queryName]
step := postprocess.StepIntervalForFunction(params, queryName)
misses := q.queryCache.FindMissingTimeRanges(orgID, params.Start, params.End, step, cacheKey)
zap.L().Info("cache misses for expression query", zap.Any("misses", misses))
q.logger.InfoContext(ctx, "cache misses for expression query", "misses", misses)
missedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
missQueries, _ := q.builder.PrepareQueries(&v3.QueryRangeParamsV3{

View File

@@ -18,12 +18,13 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/valuer"
"log/slog"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
)
type channelResult struct {
@@ -44,6 +45,8 @@ type querier struct {
builder *queryBuilder.QueryBuilder
logger *slog.Logger
// used for testing
// TODO(srikanthccv): remove this once we have a proper mock
testingMode bool
@@ -85,6 +88,8 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
BuildMetricQuery: metricsV3.PrepareMetricQuery,
}),
logger: slog.Default(),
testingMode: opts.TestingMode,
returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr,
@@ -113,7 +118,7 @@ func (q *querier) execClickHouseQuery(ctx context.Context, query string) ([]*v3.
series.Points = points
}
if pointsWithNegativeTimestamps > 0 {
zap.L().Error("found points with negative timestamps for query", zap.String("query", query))
q.logger.ErrorContext(ctx, "found points with negative timestamps for query", "query", query)
}
return result, err
}
@@ -206,14 +211,14 @@ func (q *querier) runPromQueries(ctx context.Context, orgID valuer.UUID, params
cacheKey, ok := cacheKeys[queryName]
if !ok || params.NoCache {
zap.L().Info("skipping cache for metrics prom query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
q.logger.InfoContext(ctx, "skipping cache for metrics prom query", "query_name", queryName, "start", params.Start, "end", params.End, "step", params.Step, "no_cache", params.NoCache, "cache_key", cacheKeys[queryName])
query := metricsV3.BuildPromQuery(promQuery, params.Step, params.Start, params.End)
series, err := q.execPromQuery(ctx, query)
channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: series}
return
}
misses := q.queryCache.FindMissingTimeRanges(orgID, params.Start, params.End, params.Step, cacheKey)
zap.L().Info("cache misses for metrics prom query", zap.Any("misses", misses))
q.logger.InfoContext(ctx, "cache misses for metrics prom query", "misses", misses)
missedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
query := metricsV3.BuildPromQuery(promQuery, params.Step, miss.Start, miss.End)

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"math"
"strings"
"log/slog"
"testing"
"time"
@@ -1403,6 +1404,7 @@ func Test_querier_Traces_runWindowBasedListQueryDesc(t *testing.T) {
// Create reader and querier
reader := clickhouseReader.NewReader(
slog.Default(),
nil,
telemetryStore,
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
@@ -1628,6 +1630,7 @@ func Test_querier_Traces_runWindowBasedListQueryAsc(t *testing.T) {
// Create reader and querier
reader := clickhouseReader.NewReader(
slog.Default(),
nil,
telemetryStore,
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
@@ -1928,6 +1931,7 @@ func Test_querier_Logs_runWindowBasedListQueryDesc(t *testing.T) {
// Create reader and querier
reader := clickhouseReader.NewReader(
slog.Default(),
nil,
telemetryStore,
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
@@ -2155,6 +2159,7 @@ func Test_querier_Logs_runWindowBasedListQueryAsc(t *testing.T) {
// Create reader and querier
reader := clickhouseReader.NewReader(
slog.Default(),
nil,
telemetryStore,
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),

View File

@@ -16,7 +16,6 @@ import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/querycache"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
)
func prepareLogsQuery(
@@ -99,7 +98,7 @@ func (q *querier) runBuilderQuery(
var query string
var err error
if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for logs query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
q.logger.InfoContext(ctx, "skipping cache for logs query", "query_name", queryName, "start", params.Start, "end", params.End, "step", params.Step, "no_cache", params.NoCache, "cache_key", cacheKeys[queryName])
query, err = prepareLogsQuery(ctx, start, end, builderQuery, params)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
@@ -110,7 +109,7 @@ func (q *querier) runBuilderQuery(
return
}
misses := q.queryCache.FindMissingTimeRangesV2(orgID, start, end, builderQuery.StepInterval, cacheKeys[queryName])
zap.L().Info("cache misses for logs query", zap.Any("misses", misses))
q.logger.InfoContext(ctx, "cache misses for logs query", "misses", misses)
missedSeries := make([]querycache.CachedSeriesData, 0)
filteredMissedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
@@ -219,7 +218,7 @@ func (q *querier) runBuilderQuery(
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.
if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for metrics query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
q.logger.InfoContext(ctx, "skipping cache for metrics query", "query_name", queryName, "start", params.Start, "end", params.End, "step", params.Step, "no_cache", params.NoCache, "cache_key", cacheKeys[queryName])
query, err := metricsV4.PrepareMetricQuery(start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{})
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
@@ -231,7 +230,7 @@ func (q *querier) runBuilderQuery(
}
misses := q.queryCache.FindMissingTimeRanges(orgID, start, end, builderQuery.StepInterval, cacheKeys[queryName])
zap.L().Info("cache misses for metrics query", zap.Any("misses", misses))
q.logger.InfoContext(ctx, "cache misses for metrics query", "misses", misses)
missedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
query, err := metricsV4.PrepareMetricQuery(
@@ -286,7 +285,7 @@ func (q *querier) ValidateMetricNames(ctx context.Context, query *v3.CompositeQu
for _, query := range query.PromQueries {
expr, err := parser.ParseExpr(query.Query)
if err != nil {
zap.L().Debug("error parsing promQL expression", zap.String("query", query.Query), zap.Error(err))
q.logger.DebugContext(ctx, "error parsing promql expression", "query", query.Query, "error", err)
continue
}
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
@@ -302,14 +301,14 @@ func (q *querier) ValidateMetricNames(ctx context.Context, query *v3.CompositeQu
}
metrics, err := q.reader.GetNormalizedStatus(ctx, orgID, metricNames)
if err != nil {
zap.L().Debug("error getting corresponding normalized metrics", zap.Error(err))
q.logger.DebugContext(ctx, "error getting corresponding normalized metrics", "error", err)
return
}
for metricName, metricPresent := range metrics {
if metricPresent {
continue
} else {
zap.L().Warn("using normalized metric name", zap.String("metrics", metricName))
q.logger.WarnContext(ctx, "using normalized metric name", "metrics", metricName)
continue
}
}
@@ -320,14 +319,14 @@ func (q *querier) ValidateMetricNames(ctx context.Context, query *v3.CompositeQu
}
metrics, err := q.reader.GetNormalizedStatus(ctx, orgID, metricNames)
if err != nil {
zap.L().Debug("error getting corresponding normalized metrics", zap.Error(err))
q.logger.DebugContext(ctx, "error getting corresponding normalized metrics", "error", err)
return
}
for metricName, metricPresent := range metrics {
if metricPresent {
continue
} else {
zap.L().Warn("using normalized metric name", zap.String("metrics", metricName))
q.logger.WarnContext(ctx, "using normalized metric name", "metrics", metricName)
continue
}
}

View File

@@ -18,12 +18,13 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/valuer"
"log/slog"
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/query-service/interfaces"
"github.com/SigNoz/signoz/pkg/query-service/model"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
)
type channelResult struct {
@@ -44,6 +45,8 @@ type querier struct {
builder *queryBuilder.QueryBuilder
logger *slog.Logger
// used for testing
// TODO(srikanthccv): remove this once we have a proper mock
testingMode bool
@@ -85,6 +88,8 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
BuildMetricQuery: metricsV4.PrepareMetricQuery,
}),
logger: slog.Default(),
testingMode: opts.TestingMode,
returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr,
@@ -115,7 +120,7 @@ func (q *querier) execClickHouseQuery(ctx context.Context, query string) ([]*v3.
series.Points = points
}
if pointsWithNegativeTimestamps > 0 {
zap.L().Error("found points with negative timestamps for query", zap.String("query", query))
q.logger.ErrorContext(ctx, "found points with negative timestamps for query", "query", query)
}
return result, err
}
@@ -167,7 +172,7 @@ func (q *querier) runBuilderQueries(ctx context.Context, orgID valuer.UUID, para
wg.Wait()
close(ch)
zap.L().Info("time taken to run builder queries", zap.Duration("multiQueryDuration", time.Since(now)), zap.Int("num_queries", len(params.CompositeQuery.BuilderQueries)))
q.logger.InfoContext(ctx, "time taken to run builder queries", "multi_query_duration", time.Since(now), "num_queries", len(params.CompositeQuery.BuilderQueries))
results := make([]*v3.Result, 0)
errQueriesByName := make(map[string]error)
@@ -208,14 +213,14 @@ func (q *querier) runPromQueries(ctx context.Context, orgID valuer.UUID, params
cacheKey, ok := cacheKeys[queryName]
if !ok || params.NoCache {
zap.L().Info("skipping cache for metrics prom query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
q.logger.InfoContext(ctx, "skipping cache for metrics prom query", "query_name", queryName, "start", params.Start, "end", params.End, "step", params.Step, "no_cache", params.NoCache, "cache_key", cacheKeys[queryName])
query := metricsV4.BuildPromQuery(promQuery, params.Step, params.Start, params.End)
series, err := q.execPromQuery(ctx, query)
channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: series}
return
}
misses := q.queryCache.FindMissingTimeRanges(orgID, params.Start, params.End, params.Step, cacheKey)
zap.L().Info("cache misses for metrics prom query", zap.Any("misses", misses))
q.logger.InfoContext(ctx, "cache misses for metrics prom query", "misses", misses)
missedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
query := metricsV4.BuildPromQuery(promQuery, params.Step, miss.Start, miss.End)

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"math"
"strings"
"log/slog"
"testing"
"time"
@@ -1455,6 +1456,7 @@ func Test_querier_Traces_runWindowBasedListQueryDesc(t *testing.T) {
// Create reader and querier
reader := clickhouseReader.NewReader(
slog.Default(),
nil,
telemetryStore,
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
@@ -1680,6 +1682,7 @@ func Test_querier_Traces_runWindowBasedListQueryAsc(t *testing.T) {
// Create reader and querier
reader := clickhouseReader.NewReader(
slog.Default(),
nil,
telemetryStore,
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
@@ -1979,6 +1982,7 @@ func Test_querier_Logs_runWindowBasedListQueryDesc(t *testing.T) {
// Create reader and querier
reader := clickhouseReader.NewReader(
slog.Default(),
nil,
telemetryStore,
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),
@@ -2206,6 +2210,7 @@ func Test_querier_Logs_runWindowBasedListQueryAsc(t *testing.T) {
// Create reader and querier
reader := clickhouseReader.NewReader(
slog.Default(),
nil,
telemetryStore,
prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore),

View File

@@ -4,12 +4,13 @@ import (
"fmt"
"strings"
"log/slog"
"github.com/SigNoz/govaluate"
"github.com/SigNoz/signoz/pkg/cache"
metricsV3 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v3"
"github.com/SigNoz/signoz/pkg/query-service/constants"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"go.uber.org/zap"
)
var SupportedFunctions = []string{
@@ -238,7 +239,7 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3) (map[strin
}
queries[queryName] = queryString
default:
zap.L().Error("Unknown data source", zap.String("dataSource", string(query.DataSource)))
slog.Error("unknown data source", "data_source", string(query.DataSource))
}
}
}

View File

@@ -44,7 +44,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/utils"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/zap"
"log/slog"
)
// Server runs HTTP, Mux and a grpc server
@@ -87,6 +87,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
}
reader := clickhouseReader.NewReader(
signoz.Instrumentation.Logger(),
signoz.SQLStore,
signoz.TelemetryStore,
signoz.Prometheus,
@@ -259,7 +260,7 @@ func (s *Server) initListeners() error {
return err
}
zap.L().Info(fmt.Sprintf("Query server started listening on %s...", s.httpHostPort))
slog.Info(fmt.Sprintf("Query server started listening on %s...", s.httpHostPort))
return nil
}
@@ -279,31 +280,31 @@ func (s *Server) Start(ctx context.Context) error {
}
go func() {
zap.L().Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.httpHostPort))
slog.Info("Starting HTTP server", "port", httpPort, "addr", s.httpHostPort)
switch err := s.httpServer.Serve(s.httpConn); err {
case nil, http.ErrServerClosed, cmux.ErrListenerClosed:
// normal exit, nothing to do
default:
zap.L().Error("Could not start HTTP server", zap.Error(err))
slog.Error("Could not start HTTP server", "error", err)
}
s.unavailableChannel <- healthcheck.Unavailable
}()
go func() {
zap.L().Info("Starting pprof server", zap.String("addr", constants.DebugHttpPort))
slog.Info("Starting pprof server", "addr", constants.DebugHttpPort)
err = http.ListenAndServe(constants.DebugHttpPort, nil)
if err != nil {
zap.L().Error("Could not start pprof server", zap.Error(err))
slog.Error("Could not start pprof server", "error", err)
}
}()
go func() {
zap.L().Info("Starting OpAmp Websocket server", zap.String("addr", constants.OpAmpWsEndpoint))
slog.Info("Starting OpAmp Websocket server", "addr", constants.OpAmpWsEndpoint)
err := s.opampServer.Start(constants.OpAmpWsEndpoint)
if err != nil {
zap.L().Info("opamp ws server failed to start", zap.Error(err))
slog.Error("opamp ws server failed to start", "error", err)
s.unavailableChannel <- healthcheck.Unavailable
}
}()
@@ -348,10 +349,9 @@ func makeRulesManager(
MetadataStore: metadataStore,
Prometheus: prometheus,
Context: context.Background(),
Logger: zap.L(),
Reader: ch,
Querier: querier,
SLogger: providerSettings.Logger,
Logger: providerSettings.Logger,
Cache: cache,
EvalDelay: constants.GetEvalDelay(),
OrgGetter: orgGetter,
@@ -368,7 +368,7 @@ func makeRulesManager(
return nil, fmt.Errorf("rule manager error: %v", err)
}
zap.L().Info("rules manager is ready")
slog.Info("rules manager is ready")
return manager, nil
}

View File

@@ -11,8 +11,9 @@ import (
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
"log/slog"
explorer "github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer"
"go.uber.org/zap"
)
func (aH *APIHandler) FilterKeysSuggestion(w http.ResponseWriter, r *http.Request) {
@@ -22,13 +23,13 @@ func (aH *APIHandler) FilterKeysSuggestion(w http.ResponseWriter, r *http.Reques
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
params, apiError := explorer.ParseFilterKeySuggestions(r)
if apiError != nil {
zap.L().Error("error parsing summary filter keys request", zap.Error(apiError.Err))
slog.ErrorContext(ctx, "error parsing summary filter keys request", "error", apiError.Err)
RespondError(w, apiError, nil)
return
}
keys, apiError := aH.SummaryService.FilterKeys(ctx, params)
if apiError != nil {
zap.L().Error("error getting filter keys", zap.Error(apiError.Err))
slog.ErrorContext(ctx, "error getting filter keys", "error", apiError.Err)
RespondError(w, apiError, nil)
return
}
@@ -52,14 +53,14 @@ func (aH *APIHandler) FilterValuesSuggestion(w http.ResponseWriter, r *http.Requ
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
params, apiError := explorer.ParseFilterValueSuggestions(r)
if apiError != nil {
zap.L().Error("error parsing summary filter values request", zap.Error(apiError.Err))
slog.ErrorContext(ctx, "error parsing summary filter values request", "error", apiError.Err)
RespondError(w, apiError, nil)
return
}
values, apiError := aH.SummaryService.FilterValues(ctx, orgID, params)
if apiError != nil {
zap.L().Error("error getting filter values", zap.Error(apiError.Err))
slog.ErrorContext(ctx, "error getting filter values", "error", apiError.Err)
RespondError(w, apiError, nil)
return
}
@@ -82,7 +83,7 @@ func (aH *APIHandler) GetMetricsDetails(w http.ResponseWriter, r *http.Request)
metricName := mux.Vars(r)["metric_name"]
metricsDetail, apiError := aH.SummaryService.GetMetricsSummary(ctx, orgID, metricName)
if apiError != nil {
zap.L().Error("error getting metrics summary error", zap.Error(apiError.Err))
slog.ErrorContext(ctx, "error getting metrics summary", "error", apiError.Err)
RespondError(w, apiError, nil)
return
}
@@ -106,14 +107,14 @@ func (aH *APIHandler) ListMetrics(w http.ResponseWriter, r *http.Request) {
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
params, apiErr := explorer.ParseSummaryListMetricsParams(r)
if apiErr != nil {
zap.L().Error("error parsing metric list metric summary api request", zap.Error(apiErr.Err))
slog.ErrorContext(ctx, "error parsing metric list metric summary api request", "error", apiErr.Err)
RespondError(w, model.BadRequest(apiErr), nil)
return
}
slmr, apiErr := aH.SummaryService.ListMetricsWithSummary(ctx, orgID, params)
if apiErr != nil {
zap.L().Error("error in getting list metrics summary", zap.Error(apiErr.Err))
slog.ErrorContext(ctx, "error in getting list metrics summary", "error", apiErr.Err)
RespondError(w, apiErr, nil)
return
}
@@ -126,13 +127,13 @@ func (aH *APIHandler) GetTreeMap(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
params, apiError := explorer.ParseTreeMapMetricsParams(r)
if apiError != nil {
zap.L().Error("error parsing tree map metric params", zap.Error(apiError.Err))
slog.ErrorContext(ctx, "error parsing tree map metric params", "error", apiError.Err)
RespondError(w, apiError, nil)
return
}
result, apiError := aH.SummaryService.GetMetricsTreemap(ctx, params)
if apiError != nil {
zap.L().Error("error getting tree map data", zap.Error(apiError.Err))
slog.ErrorContext(ctx, "error getting tree map data", "error", apiError.Err)
RespondError(w, apiError, nil)
return
}
@@ -146,13 +147,13 @@ func (aH *APIHandler) GetRelatedMetrics(w http.ResponseWriter, r *http.Request)
ctx := r.Context()
params, apiError := explorer.ParseRelatedMetricsParams(r)
if apiError != nil {
zap.L().Error("error parsing related metric params", zap.Error(apiError.Err))
slog.ErrorContext(ctx, "error parsing related metric params", "error", apiError.Err)
RespondError(w, apiError, nil)
return
}
result, apiError := aH.SummaryService.GetRelatedMetrics(ctx, params)
if apiError != nil {
zap.L().Error("error getting related metrics", zap.Error(apiError.Err))
slog.ErrorContext(ctx, "error getting related metrics", "error", apiError.Err)
RespondError(w, apiError, nil)
return
}
@@ -166,13 +167,13 @@ func (aH *APIHandler) GetInspectMetricsData(w http.ResponseWriter, r *http.Reque
ctx := r.Context()
params, apiError := explorer.ParseInspectMetricsParams(r)
if apiError != nil {
zap.L().Error("error parsing inspect metric params", zap.Error(apiError.Err))
slog.ErrorContext(ctx, "error parsing inspect metric params", "error", apiError.Err)
RespondError(w, apiError, nil)
return
}
result, apiError := aH.SummaryService.GetInspectMetrics(ctx, params)
if apiError != nil {
zap.L().Error("error getting inspect metrics data", zap.Error(apiError.Err))
slog.ErrorContext(ctx, "error getting inspect metrics data", "error", apiError.Err)
RespondError(w, apiError, nil)
return
}
@@ -197,13 +198,13 @@ func (aH *APIHandler) UpdateMetricsMetadata(w http.ResponseWriter, r *http.Reque
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
params, apiError := explorer.ParseUpdateMetricsMetadataParams(r)
if apiError != nil {
zap.L().Error("error parsing update metrics metadata params", zap.Error(apiError.Err))
slog.ErrorContext(ctx, "error parsing update metrics metadata params", "error", apiError.Err)
RespondError(w, apiError, nil)
return
}
apiError = aH.SummaryService.UpdateMetricsMetadata(ctx, orgID, params)
if apiError != nil {
zap.L().Error("error updating metrics metadata", zap.Error(apiError.Err))
slog.ErrorContext(ctx, "error updating metrics metadata", "error", apiError.Err)
RespondError(w, apiError, nil)
return
}

View File

@@ -2,10 +2,10 @@ package smart
import (
"errors"
"log/slog"
"strconv"
basemodel "github.com/SigNoz/signoz/pkg/query-service/model"
"go.uber.org/zap"
)
// SmartTraceAlgorithm is an algorithm to find the target span and build a tree of spans around it with the given levelUp and levelDown parameters and the given spanLimit
@@ -53,7 +53,7 @@ func SmartTraceAlgorithm(payload []basemodel.SearchSpanResponseItem, targetSpanI
break
}
if err != nil {
zap.L().Error("Error during BreadthFirstSearch()", zap.Error(err))
slog.Error("error during breadth first search", "error", err)
return nil, err
}
}
@@ -191,7 +191,7 @@ func buildSpanTrees(spansPtr *[]*SpanForTraceDetails) ([]*SpanForTraceDetails, e
// If the parent span is not found, add current span to list of roots
if parent == nil {
// zap.L().Debug("Parent Span not found parent_id: ", span.ParentID)
// slog.Debug("parent span not found", "parent_id", span.ParentID)
roots = append(roots, span)
span.ParentID = ""
continue

View File

@@ -1,11 +1,11 @@
package v3
import (
"log/slog"
"strconv"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"go.uber.org/zap"
)
var TracesListViewDefaultSelectedColumns = []v3.AttributeKey{
@@ -68,7 +68,7 @@ func TraceIdFilterUsedWithEqual(params *v3.QueryRangeParamsV3) (bool, []string)
val := item.Value
val, err = utils.ValidateAndCastValue(val, item.Key.DataType)
if err != nil {
zap.L().Error("invalid value for key", zap.String("key", item.Key.Key), zap.Error(err))
slog.Error("invalid value for key", "key", item.Key.Key, "error", err)
return false, []string{}
}
if val != nil {
@@ -81,7 +81,7 @@ func TraceIdFilterUsedWithEqual(params *v3.QueryRangeParamsV3) (bool, []string)
}
zap.L().Debug("traceIds", zap.Any("traceIds", traceIds))
slog.Debug("trace_ids", "trace_ids", traceIds)
return traceIdFilterUsed, traceIds
}

View File

@@ -9,9 +9,10 @@ import (
"strings"
"time"
"log/slog"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/pkg/errors"
"go.uber.org/zap"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
)
@@ -937,7 +938,7 @@ func (b *BuilderQuery) SetShiftByFromFunc() {
} else if shift, ok := function.Args[0].(string); ok {
shiftBy, err := strconv.ParseFloat(shift, 64)
if err != nil {
zap.L().Error("failed to parse time shift by", zap.String("shift", shift), zap.Error(err))
slog.Error("failed to parse time shift by", "shift", shift, "error", err)
}
timeShiftBy = int64(shiftBy)
}

View File

@@ -1,10 +1,11 @@
package postprocess
import (
"log/slog"
"github.com/SigNoz/govaluate"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"go.uber.org/zap"
)
// postProcessResult applies having clause, metric limit, reduce function to the result
@@ -55,12 +56,12 @@ func PostProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParam
expression, err := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs())
// This shouldn't happen here, because it should have been caught earlier in validation
if err != nil {
zap.L().Error("error in expression", zap.Error(err))
slog.Error("error in expression", "error", err)
return nil, err
}
formulaResult, err := processResults(result, expression, canDefaultZero)
if err != nil {
zap.L().Error("error in expression", zap.Error(err))
slog.Error("error in expression", "error", err)
return nil, err
}
formulaResult.QueryName = query.QueryName

View File

@@ -5,10 +5,11 @@ import (
"reflect"
"strings"
"log/slog"
"github.com/SigNoz/signoz/pkg/errors"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
expr "github.com/antonmedv/expr"
"go.uber.org/zap"
)
var (
@@ -159,11 +160,11 @@ func exprFormattedValue(v interface{}) string {
case uint8, uint16, uint32, uint64, int, int8, int16, int32, int64, float32, float64, bool:
return strings.Join(strings.Fields(fmt.Sprint(x)), ",")
default:
zap.L().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x[0])))
slog.Error("invalid type for formatted value", "type", reflect.TypeOf(x[0]))
return ""
}
default:
zap.L().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x)))
slog.Error("invalid type for formatted value", "type", reflect.TypeOf(x))
return ""
}
}

View File

@@ -10,10 +10,11 @@ import (
"github.com/SigNoz/signoz/pkg/cache"
"github.com/SigNoz/signoz/pkg/errors"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"log/slog"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/types/cachetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
)
type queryCache struct {
@@ -87,7 +88,7 @@ func (q *queryCache) FindMissingTimeRangesV2(orgID valuer.UUID, start, end int64
return cachedSeriesDataList[i].Start < cachedSeriesDataList[j].Start
})
zap.L().Info("Number of non-overlapping cached series data", zap.Int("count", len(cachedSeriesDataList)))
slog.Info("number of non-overlapping cached series data", "count", len(cachedSeriesDataList))
// Exclude the flux interval from the cached end time
@@ -180,7 +181,7 @@ func (q *queryCache) FindMissingTimeRanges(orgID valuer.UUID, start, end, step i
return cachedSeriesDataList[i].Start < cachedSeriesDataList[j].Start
})
zap.L().Info("Number of non-overlapping cached series data", zap.Int("count", len(cachedSeriesDataList)))
slog.Info("number of non-overlapping cached series data", "count", len(cachedSeriesDataList))
// Exclude the flux interval from the cached end time
@@ -291,7 +292,7 @@ func (q *queryCache) storeMergedData(orgID valuer.UUID, cacheKey string, mergedD
cacheableSeriesData := CacheableSeriesData{Series: mergedData}
err := q.cache.Set(context.TODO(), orgID, cacheKey, &cacheableSeriesData, 0)
if err != nil {
zap.L().Error("error storing merged data", zap.Error(err))
slog.Error("error storing merged data", "error", err)
}
}

View File

@@ -20,7 +20,6 @@ import (
"github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
)
// BaseRule contains common fields and methods for all rule types
@@ -400,7 +399,7 @@ func (r *BaseRule) ForEachActiveAlert(f func(*ruletypes.Alert)) {
}
func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []model.RuleStateHistory) error {
zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd))
r.logger.DebugContext(ctx, "recording rule state history", "ruleid", r.ID(), "prevState", prevState, "currentState", currentState, "itemsToAdd", itemsToAdd)
revisedItemsToAdd := map[uint64]model.RuleStateHistory{}
lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID())
@@ -410,7 +409,7 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren
// if the query-service has been restarted, or the rule has been modified (which re-initializes the rule),
// the state would reset so we need to add the corresponding state changes to previously saved states
if !r.handledRestart && len(lastSavedState) > 0 {
zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState))
r.logger.DebugContext(ctx, "handling restart", "ruleid", r.ID(), "lastSavedState", lastSavedState)
l := map[uint64]model.RuleStateHistory{}
for _, item := range itemsToAdd {
l[item.Fingerprint] = item
@@ -442,7 +441,7 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren
// do not add this item to revisedItemsToAdd as it is already processed
shouldSkip[item.Fingerprint] = true
}
zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
r.logger.DebugContext(ctx, "after lastSavedState loop", "ruleid", r.ID(), "revisedItemsToAdd", revisedItemsToAdd)
// if there are any new state changes that were not saved, add them to the revised items
for _, item := range itemsToAdd {
@@ -450,7 +449,7 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren
revisedItemsToAdd[item.Fingerprint] = item
}
}
zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
r.logger.DebugContext(ctx, "after itemsToAdd loop", "ruleid", r.ID(), "revisedItemsToAdd", revisedItemsToAdd)
newState := model.StateInactive
for _, item := range revisedItemsToAdd {
@@ -459,7 +458,7 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren
break
}
}
zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState))
r.logger.DebugContext(ctx, "newState", "ruleid", r.ID(), "newState", newState)
// if there is a change in the overall state, update the overall state
if lastSavedState[0].OverallState != newState {
@@ -469,7 +468,7 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren
revisedItemsToAdd[fingerprint] = item
}
}
zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
r.logger.DebugContext(ctx, "revisedItemsToAdd after newState", "ruleid", r.ID(), "revisedItemsToAdd", revisedItemsToAdd)
} else {
for _, item := range itemsToAdd {
@@ -478,7 +477,7 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren
}
if len(revisedItemsToAdd) > 0 && r.reader != nil {
zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd))
r.logger.DebugContext(ctx, "writing rule state history", "ruleid", r.ID(), "revisedItemsToAdd", revisedItemsToAdd)
entries := make([]model.RuleStateHistory, 0, len(revisedItemsToAdd))
for _, item := range revisedItemsToAdd {
@@ -486,7 +485,7 @@ func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, curren
}
err := r.reader.AddRuleStateHistory(ctx, entries)
if err != nil {
zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd))
r.logger.ErrorContext(ctx, "error while inserting rule state history", "error", err, "itemsToAdd", itemsToAdd)
}
}
r.handledRestart = true

View File

@@ -2,6 +2,7 @@ package rules
import (
"context"
"log/slog"
"testing"
"time"
@@ -694,6 +695,7 @@ func TestBaseRule_FilterNewSeries(t *testing.T) {
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(
slog.Default(),
nil,
telemetryStore,
prometheustest.New(context.Background(), settings, prometheus.Config{}, telemetryStore),

View File

@@ -13,8 +13,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/queryparser"
"go.uber.org/zap"
"github.com/go-openapi/strfmt"
"github.com/SigNoz/signoz/pkg/alertmanager"
@@ -40,10 +38,9 @@ type PrepareTaskOptions struct {
TaskName string
RuleStore ruletypes.RuleStore
MaintenanceStore ruletypes.MaintenanceStore
Logger *zap.Logger
Reader interfaces.Reader
Querier querierV5.Querier
SLogger *slog.Logger
Logger *slog.Logger
Cache cache.Cache
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
@@ -55,10 +52,9 @@ type PrepareTestRuleOptions struct {
Rule *ruletypes.PostableRule
RuleStore ruletypes.RuleStore
MaintenanceStore ruletypes.MaintenanceStore
Logger *zap.Logger
Reader interfaces.Reader
Querier querierV5.Querier
SLogger *slog.Logger
Logger *slog.Logger
Cache cache.Cache
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
@@ -90,11 +86,10 @@ type ManagerOptions struct {
Prometheus prometheus.Prometheus
Context context.Context
Logger *zap.Logger
ResendDelay time.Duration
Reader interfaces.Reader
Querier querierV5.Querier
SLogger *slog.Logger
Logger *slog.Logger
Cache cache.Cache
EvalDelay valuer.TextDuration
@@ -120,7 +115,7 @@ type Manager struct {
ruleStore ruletypes.RuleStore
maintenanceStore ruletypes.MaintenanceStore
logger *zap.Logger
logger *slog.Logger
reader interfaces.Reader
cache cache.Cache
prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
@@ -138,7 +133,7 @@ func defaultOptions(o *ManagerOptions) *ManagerOptions {
o.ResendDelay = 1 * time.Minute
}
if o.Logger == nil {
o.Logger = zap.L()
o.Logger = slog.Default()
}
if o.PrepareTaskFunc == nil {
o.PrepareTaskFunc = defaultPrepareTaskFunc
@@ -169,7 +164,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
opts.Rule,
opts.Reader,
opts.Querier,
opts.SLogger,
opts.Logger,
WithEvalDelay(opts.ManagerOpts.EvalDelay),
WithSQLStore(opts.SQLStore),
WithQueryParser(opts.ManagerOpts.QueryParser),
@@ -192,7 +187,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
ruleId,
opts.OrgID,
opts.Rule,
opts.SLogger,
opts.Logger,
opts.Reader,
opts.ManagerOpts.Prometheus,
WithSQLStore(opts.SQLStore),
@@ -239,13 +234,13 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
queryParser: o.QueryParser,
}
zap.L().Debug("Manager created successfully with NotificationGroup")
m.logger.Debug("manager created successfully with notification group")
return m, nil
}
func (m *Manager) Start(ctx context.Context) {
if err := m.initiate(ctx); err != nil {
zap.L().Error("failed to initialize alerting rules manager", zap.Error(err))
m.logger.ErrorContext(ctx, "failed to initialize alerting rules manager", "error", err)
}
m.run(ctx)
}
@@ -288,7 +283,7 @@ func (m *Manager) initiate(ctx context.Context) error {
err := json.Unmarshal([]byte(rec.Data), &parsedRule)
if err != nil {
zap.L().Info("failed to load rule in json format", zap.String("name", taskName))
m.logger.InfoContext(ctx, "failed to load rule in json format", "name", taskName)
loadErrors = append(loadErrors, err)
continue
}
@@ -297,13 +292,13 @@ func (m *Manager) initiate(ctx context.Context) error {
err = m.alertmanager.SetNotificationConfig(ctx, org.ID, rec.ID.StringValue(), &config)
if err != nil {
loadErrors = append(loadErrors, err)
zap.L().Info("failed to set rule notification config", zap.String("ruleId", rec.ID.StringValue()))
m.logger.InfoContext(ctx, "failed to set rule notification config", "rule_id", rec.ID.StringValue())
}
}
if !parsedRule.Disabled {
err := m.addTask(ctx, org.ID, &parsedRule, taskName)
if err != nil {
zap.L().Error("failed to load the rule definition", zap.String("name", taskName), zap.Error(err))
m.logger.ErrorContext(ctx, "failed to load the rule definition", "name", taskName, "error", err)
}
}
}
@@ -327,13 +322,13 @@ func (m *Manager) Stop(_ context.Context) {
m.mtx.Lock()
defer m.mtx.Unlock()
zap.L().Info("Stopping rule manager...")
m.logger.Info("stopping rule manager")
for _, t := range m.tasks {
t.Stop()
}
zap.L().Info("Rule manager stopped")
m.logger.Info("rule manager stopped")
}
// EditRule writes the rule definition to the
@@ -406,17 +401,16 @@ func (m *Manager) editTask(_ context.Context, orgID valuer.UUID, rule *ruletypes
m.mtx.Lock()
defer m.mtx.Unlock()
zap.L().Debug("editing a rule task", zap.String("name", taskName))
m.logger.Debug("editing a rule task", "name", taskName)
newTask, err := m.prepareTaskFunc(PrepareTaskOptions{
Rule: rule,
TaskName: taskName,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Logger: m.logger,
Reader: m.reader,
Querier: m.opts.Querier,
SLogger: m.opts.SLogger,
Logger: m.opts.Logger,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
@@ -425,7 +419,7 @@ func (m *Manager) editTask(_ context.Context, orgID valuer.UUID, rule *ruletypes
})
if err != nil {
zap.L().Error("loading tasks failed", zap.Error(err))
m.logger.Error("loading tasks failed", "error", err)
return errors.NewInvalidInputf(errors.CodeInvalidInput, "error preparing rule with given parameters, previous rule set restored")
}
@@ -437,7 +431,7 @@ func (m *Manager) editTask(_ context.Context, orgID valuer.UUID, rule *ruletypes
// it to finish the current iteration. Then copy it into the new group.
oldTask, ok := m.tasks[taskName]
if !ok {
zap.L().Warn("rule task not found, a new task will be created", zap.String("name", taskName))
m.logger.Warn("rule task not found, a new task will be created", "name", taskName)
}
delete(m.tasks, taskName)
@@ -461,7 +455,7 @@ func (m *Manager) editTask(_ context.Context, orgID valuer.UUID, rule *ruletypes
func (m *Manager) DeleteRule(ctx context.Context, idStr string) error {
id, err := valuer.NewUUID(idStr)
if err != nil {
zap.L().Error("delete rule received an rule id in invalid format, must be a valid uuid-v7", zap.String("id", idStr), zap.Error(err))
m.logger.Error("delete rule received a rule id in invalid format, must be a valid uuid-v7", "id", idStr, "error", err)
return fmt.Errorf("delete rule received an rule id in invalid format, must be a valid uuid-v7")
}
@@ -521,16 +515,16 @@ func (m *Manager) DeleteRule(ctx context.Context, idStr string) error {
func (m *Manager) deleteTask(taskName string) {
m.mtx.Lock()
defer m.mtx.Unlock()
zap.L().Debug("deleting a rule task", zap.String("name", taskName))
m.logger.Debug("deleting a rule task", "name", taskName)
oldg, ok := m.tasks[taskName]
if ok {
oldg.Stop()
delete(m.tasks, taskName)
delete(m.rules, RuleIdFromTaskName(taskName))
zap.L().Debug("rule task deleted", zap.String("name", taskName))
m.logger.Debug("rule task deleted", "name", taskName)
} else {
zap.L().Info("rule not found for deletion", zap.String("name", taskName))
m.logger.Info("rule not found for deletion", "name", taskName)
}
}
@@ -617,16 +611,15 @@ func (m *Manager) addTask(_ context.Context, orgID valuer.UUID, rule *ruletypes.
m.mtx.Lock()
defer m.mtx.Unlock()
zap.L().Debug("adding a new rule task", zap.String("name", taskName))
m.logger.Debug("adding a new rule task", "name", taskName)
newTask, err := m.prepareTaskFunc(PrepareTaskOptions{
Rule: rule,
TaskName: taskName,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Logger: m.logger,
Reader: m.reader,
Querier: m.opts.Querier,
SLogger: m.opts.SLogger,
Logger: m.opts.Logger,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
@@ -635,7 +628,7 @@ func (m *Manager) addTask(_ context.Context, orgID valuer.UUID, rule *ruletypes.
})
if err != nil {
zap.L().Error("creating rule task failed", zap.String("name", taskName), zap.Error(err))
m.logger.Error("creating rule task failed", "name", taskName, "error", err)
return errors.NewInvalidInputf(errors.CodeInvalidInput, "error loading rules, previous rule set restored")
}
@@ -791,7 +784,7 @@ func (m *Manager) prepareTestNotifyFunc() NotifyFunc {
}
err := m.alertmanager.TestAlert(ctx, orgID, ruleID, receiverMap)
if err != nil {
zap.L().Error("failed to send test notification", zap.Error(err))
m.logger.ErrorContext(ctx, "failed to send test notification", "error", err)
return
}
}
@@ -826,7 +819,7 @@ func (m *Manager) ListRuleStates(ctx context.Context) (*ruletypes.GettableRules,
ruleResponse := ruletypes.GettableRule{}
err = json.Unmarshal([]byte(s.Data), &ruleResponse)
if err != nil {
zap.L().Error("failed to unmarshal rule from db", zap.String("id", s.ID.StringValue()), zap.Error(err))
m.logger.ErrorContext(ctx, "failed to unmarshal rule from db", "id", s.ID.StringValue(), "error", err)
continue
}
@@ -857,7 +850,7 @@ func (m *Manager) GetRule(ctx context.Context, id valuer.UUID) (*ruletypes.Getta
r := ruletypes.GettableRule{}
err = json.Unmarshal([]byte(s.Data), &r)
if err != nil {
zap.L().Error("failed to unmarshal rule from db", zap.String("id", s.ID.StringValue()), zap.Error(err))
m.logger.ErrorContext(ctx, "failed to unmarshal rule from db", "id", s.ID.StringValue(), "error", err)
return nil, err
}
r.Id = id.StringValue()
@@ -926,30 +919,30 @@ func (m *Manager) PatchRule(ctx context.Context, ruleStr string, id valuer.UUID)
// retrieve rule from DB
storedJSON, err := m.ruleStore.GetStoredRule(ctx, id)
if err != nil {
zap.L().Error("failed to get stored rule with given id", zap.String("id", id.StringValue()), zap.Error(err))
m.logger.ErrorContext(ctx, "failed to get stored rule with given id", "id", id.StringValue(), "error", err)
return nil, err
}
storedRule := ruletypes.PostableRule{}
if err := json.Unmarshal([]byte(storedJSON.Data), &storedRule); err != nil {
zap.L().Error("failed to unmarshal rule from db", zap.String("id", id.StringValue()), zap.Error(err))
m.logger.ErrorContext(ctx, "failed to unmarshal rule from db", "id", id.StringValue(), "error", err)
return nil, err
}
if err := json.Unmarshal([]byte(ruleStr), &storedRule); err != nil {
zap.L().Error("failed to unmarshal patched rule with given id", zap.String("id", id.StringValue()), zap.Error(err))
m.logger.ErrorContext(ctx, "failed to unmarshal patched rule with given id", "id", id.StringValue(), "error", err)
return nil, err
}
// deploy or un-deploy task according to patched (new) rule state
if err := m.syncRuleStateWithTask(ctx, orgID, taskName, &storedRule); err != nil {
zap.L().Error("failed to sync stored rule state with the task", zap.String("taskName", taskName), zap.Error(err))
m.logger.ErrorContext(ctx, "failed to sync stored rule state with the task", "task_name", taskName, "error", err)
return nil, err
}
newStoredJson, err := json.Marshal(&storedRule)
if err != nil {
zap.L().Error("failed to marshal new stored rule with given id", zap.String("id", id.StringValue()), zap.Error(err))
m.logger.ErrorContext(ctx, "failed to marshal new stored rule with given id", "id", id.StringValue(), "error", err)
return nil, err
}
@@ -961,7 +954,7 @@ func (m *Manager) PatchRule(ctx context.Context, ruleStr string, id valuer.UUID)
err = m.ruleStore.EditRule(ctx, storedJSON, func(ctx context.Context) error { return nil })
if err != nil {
if err := m.syncRuleStateWithTask(ctx, orgID, taskName, &storedRule); err != nil {
zap.L().Error("failed to restore rule after patch failure", zap.String("taskName", taskName), zap.Error(err))
m.logger.ErrorContext(ctx, "failed to restore rule after patch failure", "task_name", taskName, "error", err)
}
return nil, err
}
@@ -1007,10 +1000,9 @@ func (m *Manager) TestNotification(ctx context.Context, orgID valuer.UUID, ruleS
Rule: &parsedRule,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Logger: m.logger,
SLogger: m.opts.SLogger,
Reader: m.reader,
Querier: m.opts.Querier,
Logger: m.opts.Logger,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareTestNotifyFunc(),
@@ -1030,7 +1022,7 @@ func (m *Manager) GetAlertDetailsForMetricNames(ctx context.Context, metricNames
result := make(map[string][]ruletypes.GettableRule)
rules, err := m.ruleStore.GetStoredRules(ctx, claims.OrgID)
if err != nil {
zap.L().Error("Error getting stored rules", zap.Error(err))
m.logger.ErrorContext(ctx, "error getting stored rules", "error", err)
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
@@ -1040,7 +1032,7 @@ func (m *Manager) GetAlertDetailsForMetricNames(ctx context.Context, metricNames
var rule ruletypes.GettableRule
err = json.Unmarshal([]byte(storedRule.Data), &rule)
if err != nil {
zap.L().Error("failed to unmarshal rule from db", zap.String("id", storedRule.ID.StringValue()), zap.Error(err))
m.logger.ErrorContext(ctx, "failed to unmarshal rule from db", "id", storedRule.ID.StringValue(), "error", err)
continue
}

View File

@@ -22,7 +22,6 @@ import (
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/telemetrystore/telemetrystoretest"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
type queryMatcherAny struct {
@@ -102,6 +101,7 @@ func NewTestManager(t *testing.T, testOpts *TestManagerOptions) *Manager {
providerSettings := instrumentationtest.New().ToProviderSettings()
prometheus := prometheustest.New(context.Background(), providerSettings, prometheus.Config{}, telemetryStore)
reader := clickhouseReader.NewReader(
instrumentationtest.New().Logger(),
nil,
telemetryStore,
prometheus,
@@ -123,8 +123,7 @@ func NewTestManager(t *testing.T, testOpts *TestManagerOptions) *Manager {
require.NoError(t, err)
mgrOpts := &ManagerOptions{
Logger: zap.NewNop(),
SLogger: instrumentationtest.New().Logger(),
Logger: instrumentationtest.New().Logger(),
Cache: cacheObj,
Alertmanager: fAlert,
Querier: mockQuerier,

View File

@@ -12,7 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/valuer"
opentracing "github.com/opentracing/opentracing-go"
plabels "github.com/prometheus/prometheus/model/labels"
"go.uber.org/zap"
"log/slog"
)
// PromRuleTask is a promql rule executor
@@ -34,7 +34,7 @@ type PromRuleTask struct {
terminated chan struct{}
pause bool
logger *zap.Logger
logger *slog.Logger
notify NotifyFunc
maintenanceStore ruletypes.MaintenanceStore
@@ -44,7 +44,7 @@ type PromRuleTask struct {
// NewPromRuleTask holds rules that have promql condition
// and evaluates the rule at a given frequency
func NewPromRuleTask(name, file string, frequency time.Duration, rules []Rule, opts *ManagerOptions, notify NotifyFunc, maintenanceStore ruletypes.MaintenanceStore, orgID valuer.UUID) *PromRuleTask {
zap.L().Info("Initiating a new rule group", zap.String("name", name), zap.Duration("frequency", frequency))
opts.Logger.Info("initiating a new rule group", "name", name, "frequency", frequency)
if frequency == 0 {
frequency = DefaultFrequency
@@ -322,14 +322,14 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
defer func() {
if r := recover(); r != nil {
zap.L().Error("panic during promql rule evaluation", zap.Any("panic", r))
g.logger.ErrorContext(ctx, "panic during promql rule evaluation", "panic", r)
}
}()
zap.L().Info("promql rule task", zap.String("name", g.name), zap.Time("eval started at", ts))
g.logger.InfoContext(ctx, "promql rule task", "name", g.name, "eval_started_at", ts)
maintenance, err := g.maintenanceStore.GetAllPlannedMaintenance(ctx, g.orgID.StringValue())
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
g.logger.ErrorContext(ctx, "error in processing sql query", "error", err)
}
for i, rule := range g.rules {
@@ -339,7 +339,7 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
shouldSkip := false
for _, m := range maintenance {
zap.L().Info("checking if rule should be skipped", zap.String("rule", rule.ID()), zap.Any("maintenance", m))
g.logger.InfoContext(ctx, "checking if rule should be skipped", "rule", rule.ID(), "maintenance", m)
if m.ShouldSkip(rule.ID(), ts) {
shouldSkip = true
break
@@ -347,7 +347,7 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
}
if shouldSkip {
zap.L().Info("rule should be skipped", zap.String("rule", rule.ID()))
g.logger.InfoContext(ctx, "rule should be skipped", "rule", rule.ID())
continue
}
@@ -379,7 +379,7 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
rule.SetHealth(ruletypes.HealthBad)
rule.SetLastError(err)
zap.L().Warn("Evaluating rule failed", zap.String("ruleid", rule.ID()), zap.Error(err))
g.logger.WarnContext(ctx, "evaluating rule failed", "rule_id", rule.ID(), "error", err)
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.

View File

@@ -3,6 +3,7 @@ package rules
import (
"context"
"strings"
"log/slog"
"testing"
"time"
@@ -964,7 +965,7 @@ func TestPromRuleUnitCombinations(t *testing.T) {
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
rule, err := NewPromRule("69", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider)
if err != nil {
assert.NoError(t, err)
@@ -1080,7 +1081,7 @@ func _Enable_this_after_9146_issue_fix_is_merged_TestPromRuleNoData(t *testing.T
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
rule, err := NewPromRule("69", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider)
if err != nil {
assert.NoError(t, err)
@@ -1312,7 +1313,7 @@ func TestMultipleThresholdPromRule(t *testing.T) {
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
rule, err := NewPromRule("69", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider)
if err != nil {
assert.NoError(t, err)
@@ -1448,7 +1449,7 @@ func TestPromRule_NoData(t *testing.T) {
}()
options := clickhouseReader.NewOptions("primaryNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
rule, err := NewPromRule("some-id", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider)
require.NoError(t, err)
@@ -1597,7 +1598,7 @@ func TestPromRule_NoData_AbsentFor(t *testing.T) {
}()
options := clickhouseReader.NewOptions("primaryNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
rule, err := NewPromRule("some-id", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider)
require.NoError(t, err)
@@ -1755,7 +1756,7 @@ func TestPromRuleEval_RequireMinPoints(t *testing.T) {
}()
options := clickhouseReader.NewOptions("primaryNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, promProvider, "", time.Second, nil, nil, options)
rule, err := NewPromRule("some-id", valuer.GenerateUUID(), &postableRule, logger, reader, promProvider)
require.NoError(t, err)

View File

@@ -7,12 +7,13 @@ import (
"sync"
"time"
"log/slog"
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
"github.com/SigNoz/signoz/pkg/types/ctxtypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
opentracing "github.com/opentracing/opentracing-go"
"go.uber.org/zap"
)
// RuleTask holds a rule (with composite queries)
@@ -23,6 +24,7 @@ type RuleTask struct {
frequency time.Duration
rules []Rule
opts *ManagerOptions
logger *slog.Logger
mtx sync.Mutex
evaluationDuration time.Duration
evaluationTime time.Duration
@@ -46,7 +48,7 @@ func NewRuleTask(name, file string, frequency time.Duration, rules []Rule, opts
if frequency == 0 {
frequency = DefaultFrequency
}
zap.L().Info("initiating a new rule task", zap.String("name", name), zap.Duration("frequency", frequency))
opts.Logger.Info("initiating a new rule task", "name", name, "frequency", frequency)
return &RuleTask{
name: name,
@@ -55,6 +57,7 @@ func NewRuleTask(name, file string, frequency time.Duration, rules []Rule, opts
frequency: frequency,
rules: rules,
opts: opts,
logger: opts.Logger,
done: make(chan struct{}),
terminated: make(chan struct{}),
notify: notify,
@@ -98,7 +101,7 @@ func (g *RuleTask) Run(ctx context.Context) {
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.frequency)
zap.L().Debug("group run to begin at", zap.Time("evalTimestamp", evalTimestamp))
g.logger.DebugContext(ctx, "group run to begin at", "eval_timestamp", evalTimestamp)
select {
case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
@@ -304,16 +307,16 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
defer func() {
if r := recover(); r != nil {
zap.L().Error("panic during threshold rule evaluation", zap.Any("panic", r))
g.logger.ErrorContext(ctx, "panic during threshold rule evaluation", "panic", r)
}
}()
zap.L().Debug("rule task eval started", zap.String("name", g.name), zap.Time("start time", ts))
g.logger.DebugContext(ctx, "rule task eval started", "name", g.name, "start_time", ts)
maintenance, err := g.maintenanceStore.GetAllPlannedMaintenance(ctx, g.orgID.StringValue())
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
g.logger.ErrorContext(ctx, "error in processing sql query", "error", err)
}
for i, rule := range g.rules {
@@ -323,7 +326,7 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
shouldSkip := false
for _, m := range maintenance {
zap.L().Info("checking if rule should be skipped", zap.String("rule", rule.ID()), zap.Any("maintenance", m))
g.logger.InfoContext(ctx, "checking if rule should be skipped", "rule", rule.ID(), "maintenance", m)
if m.ShouldSkip(rule.ID(), ts) {
shouldSkip = true
break
@@ -331,7 +334,7 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
}
if shouldSkip {
zap.L().Info("rule should be skipped", zap.String("rule", rule.ID()))
g.logger.InfoContext(ctx, "rule should be skipped", "rule", rule.ID())
continue
}
@@ -363,7 +366,7 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
rule.SetHealth(ruletypes.HealthBad)
rule.SetLastError(err)
zap.L().Warn("Evaluating rule failed", zap.String("ruleid", rule.ID()), zap.Error(err))
g.logger.WarnContext(ctx, "evaluating rule failed", "rule_id", rule.ID(), "error", err)
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.

View File

@@ -9,7 +9,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/utils/labels"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
"github.com/google/uuid"
"go.uber.org/zap"
"log/slog"
)
// TestNotification prepares a dummy rule for given rule parameters and
@@ -48,7 +48,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
parsedRule,
opts.Reader,
opts.Querier,
opts.SLogger,
opts.Logger,
WithSendAlways(),
WithSendUnmatched(),
WithSQLStore(opts.SQLStore),
@@ -57,7 +57,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
)
if err != nil {
zap.L().Error("failed to prepare a new threshold rule for test", zap.Error(err))
slog.Error("failed to prepare a new threshold rule for test", "error", err)
return 0, model.BadRequest(err)
}
@@ -68,7 +68,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
alertname,
opts.OrgID,
parsedRule,
opts.SLogger,
opts.Logger,
opts.Reader,
opts.ManagerOpts.Prometheus,
WithSendAlways(),
@@ -79,7 +79,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
)
if err != nil {
zap.L().Error("failed to prepare a new promql rule for test", zap.Error(err))
slog.Error("failed to prepare a new promql rule for test", "error", err)
return 0, model.BadRequest(err)
}
} else {
@@ -91,7 +91,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
alertsFound, err := rule.Eval(ctx, ts)
if err != nil {
zap.L().Error("evaluating rule failed", zap.String("rule", rule.Name()), zap.Error(err))
slog.Error("evaluating rule failed", "rule", rule.Name(), "error", err)
return 0, model.InternalError(fmt.Errorf("rule evaluation failed"))
}
rule.SendAlerts(ctx, ts, 0, time.Duration(1*time.Minute), opts.NotifyFunc)

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"math"
"strings"
"log/slog"
"testing"
"time"
@@ -778,7 +779,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
},
)
require.NoError(t, err)
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, readerCache, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, readerCache, options)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
@@ -893,7 +894,7 @@ func TestThresholdRuleNoData(t *testing.T) {
)
assert.NoError(t, err)
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, readerCache, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, readerCache, options)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
@@ -1013,7 +1014,7 @@ func TestThresholdRuleTracesLink(t *testing.T) {
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, nil, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, nil, options)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
@@ -1150,7 +1151,7 @@ func TestThresholdRuleLogsLink(t *testing.T) {
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, nil, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Duration(time.Second), nil, nil, options)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
@@ -1417,7 +1418,7 @@ func TestMultipleThresholdRule(t *testing.T) {
},
)
require.NoError(t, err)
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Second, nil, readerCache, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore), "", time.Second, nil, readerCache, options)
rule, err := NewThresholdRule("69", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
@@ -2220,7 +2221,7 @@ func TestThresholdEval_RequireMinPoints(t *testing.T) {
require.NoError(t, err)
prometheusProvider := prometheustest.New(context.Background(), instrumentationtest.New().ToProviderSettings(), prometheus.Config{}, telemetryStore)
reader := clickhouseReader.NewReader(nil, telemetryStore, prometheusProvider, "", time.Second, nil, readerCache, options)
reader := clickhouseReader.NewReader(slog.Default(), nil, telemetryStore, prometheusProvider, "", time.Second, nil, readerCache, options)
rule, err := NewThresholdRule("some-id", valuer.GenerateUUID(), &postableRule, reader, nil, logger)
t.Run(fmt.Sprintf("%d Version=%s, %s", idx, version, c.description), func(t *testing.T) {

View File

@@ -7,10 +7,11 @@ import (
"strconv"
"strings"
"log/slog"
"github.com/SigNoz/signoz/pkg/query-service/constants"
"github.com/SigNoz/signoz/pkg/query-service/metrics"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"go.uber.org/zap"
)
// ValidateAndCastValue validates and casts the value of a key to the corresponding data type of the key
@@ -209,7 +210,7 @@ func ClickHouseFormattedValue(v interface{}) string {
case uint8, uint16, uint32, uint64, int, int8, int16, int32, int64, float32, float64, bool:
return strings.Join(strings.Fields(fmt.Sprint(x)), ",")
default:
zap.L().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x[0])))
slog.Error("invalid type for formatted value", "type", reflect.TypeOf(x[0]))
return "[]"
}
case []string:
@@ -226,7 +227,7 @@ func ClickHouseFormattedValue(v interface{}) string {
str += "]"
return str
default:
zap.L().Error("invalid type for formatted value", zap.Any("type", reflect.TypeOf(x)))
slog.Error("invalid type for formatted value", "type", reflect.TypeOf(x))
return ""
}
}

View File

@@ -1,19 +0,0 @@
package utils
import (
"time"
"go.uber.org/zap"
)
func Elapsed(funcName string, args map[string]interface{}) func() {
start := time.Now()
return func() {
var zapFields []zap.Field
zapFields = append(zapFields, zap.String("func_name", funcName), zap.Duration("duration", time.Since(start)))
for k, v := range args {
zapFields = append(zapFields, zap.Any(k, v))
}
zap.L().Info("Elapsed time", zapFields...)
}
}