Compare commits

..

3 Commits

Author SHA1 Message Date
Yunus M
8c7dc942d0 fix: handling of input changes in CustomTimePicker to ensure value is applied on popover close (#10484)
* fix: enhance input handling to prevent popover closure on input click

* fix: handling of input changes in CustomTimePicker to ensure value is applied on popover close

* chore: add test cases
2026-03-05 14:19:14 +05:30
Naman Verma
0e1bb5fd91 fix: exclude internal attributes from promQL results (#10465)
* fix: exclude internal attributes from promQL results

* fix: __name__ can stay
2026-03-05 13:07:06 +05:30
primus-bot[bot]
d7a743cea9 chore(release): bump to v0.114.0 (#10496)
Co-authored-by: primus-bot[bot] <171087277+primus-bot[bot]@users.noreply.github.com>
2026-03-05 12:43:16 +05:30
16 changed files with 398 additions and 211 deletions

View File

@@ -190,7 +190,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:v0.113.0
image: signoz/signoz:v0.114.0
ports:
- "8080:8080" # signoz port
# - "6060:6060" # pprof port
@@ -213,7 +213,7 @@ services:
retries: 3
otel-collector:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:v0.144.1
image: signoz/signoz-otel-collector:v0.144.2
entrypoint:
- /bin/sh
command:
@@ -241,7 +241,7 @@ services:
replicas: 3
signoz-telemetrystore-migrator:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:v0.144.1
image: signoz/signoz-otel-collector:v0.144.2
environment:
- SIGNOZ_OTEL_COLLECTOR_CLICKHOUSE_DSN=tcp://clickhouse:9000
- SIGNOZ_OTEL_COLLECTOR_CLICKHOUSE_CLUSTER=cluster

View File

@@ -117,7 +117,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:v0.113.0
image: signoz/signoz:v0.114.0
ports:
- "8080:8080" # signoz port
volumes:
@@ -139,7 +139,7 @@ services:
retries: 3
otel-collector:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:v0.144.1
image: signoz/signoz-otel-collector:v0.144.2
entrypoint:
- /bin/sh
command:
@@ -167,7 +167,7 @@ services:
replicas: 3
signoz-telemetrystore-migrator:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:v0.144.1
image: signoz/signoz-otel-collector:v0.144.2
environment:
- SIGNOZ_OTEL_COLLECTOR_CLICKHOUSE_DSN=tcp://clickhouse:9000
- SIGNOZ_OTEL_COLLECTOR_CLICKHOUSE_CLUSTER=cluster

View File

@@ -181,7 +181,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:${VERSION:-v0.113.0}
image: signoz/signoz:${VERSION:-v0.114.0}
container_name: signoz
ports:
- "8080:8080" # signoz port
@@ -204,7 +204,7 @@ services:
retries: 3
otel-collector:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.144.1}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.144.2}
container_name: signoz-otel-collector
entrypoint:
- /bin/sh
@@ -229,7 +229,7 @@ services:
- "4318:4318" # OTLP HTTP receiver
signoz-telemetrystore-migrator:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.144.1}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.144.2}
container_name: signoz-telemetrystore-migrator
environment:
- SIGNOZ_OTEL_COLLECTOR_CLICKHOUSE_DSN=tcp://clickhouse:9000

View File

@@ -109,7 +109,7 @@ services:
# - ../common/clickhouse/storage.xml:/etc/clickhouse-server/config.d/storage.xml
signoz:
!!merge <<: *db-depend
image: signoz/signoz:${VERSION:-v0.113.0}
image: signoz/signoz:${VERSION:-v0.114.0}
container_name: signoz
ports:
- "8080:8080" # signoz port
@@ -132,7 +132,7 @@ services:
retries: 3
otel-collector:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.144.1}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.144.2}
container_name: signoz-otel-collector
entrypoint:
- /bin/sh
@@ -157,7 +157,7 @@ services:
- "4318:4318" # OTLP HTTP receiver
signoz-telemetrystore-migrator:
!!merge <<: *db-depend
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.144.1}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-v0.144.2}
container_name: signoz-telemetrystore-migrator
environment:
- SIGNOZ_OTEL_COLLECTOR_CLICKHOUSE_DSN=tcp://clickhouse:9000

View File

@@ -0,0 +1,279 @@
import { useState } from 'react';
import { fireEvent, render, screen } from '@testing-library/react';
import dayjs from 'dayjs';
import * as timeUtils from 'utils/timeUtils';
import CustomTimePicker from './CustomTimePicker';
jest.mock('react-router-dom', () => {
const actual = jest.requireActual('react-router-dom');
return {
...actual,
useLocation: jest.fn().mockReturnValue({
pathname: '/test-path',
}),
};
});
jest.mock('providers/Timezone', () => {
const actual = jest.requireActual('providers/Timezone');
return {
...actual,
useTimezone: jest.fn().mockReturnValue({
timezone: {
value: 'UTC',
offset: '+00:00',
name: 'UTC',
},
browserTimezone: {
value: 'UTC',
offset: '+00:00',
name: 'UTC',
},
}),
};
});
interface WrapperProps {
initialValue?: string;
showLiveLogs?: boolean;
onValidCustomDateChange?: () => void;
onError?: () => void;
onSelect?: (value: string) => void;
onCustomDateHandler?: () => void;
onCustomTimeStatusUpdate?: () => void;
}
function Wrapper({
initialValue = '2024-01-01 00:00:00 - 2024-01-01 01:00:00',
showLiveLogs = false,
onValidCustomDateChange = (): void => {},
onError = (): void => {},
onSelect = (): void => {},
onCustomDateHandler = (): void => {},
onCustomTimeStatusUpdate = (): void => {},
}: WrapperProps): JSX.Element {
const [open, setOpen] = useState(false);
const [selectedTime, setSelectedTime] = useState('custom');
const [selectedValue, setSelectedValue] = useState(initialValue);
const handleSelect = (value: string): void => {
setSelectedTime(value);
onSelect(value);
};
return (
<CustomTimePicker
open={open}
setOpen={setOpen}
onSelect={handleSelect}
onError={onError}
selectedTime={selectedTime}
selectedValue={selectedValue}
onValidCustomDateChange={({ timeStr }): void => {
setSelectedValue(timeStr);
onValidCustomDateChange();
}}
onCustomDateHandler={(): void => {
onCustomDateHandler();
}}
onCustomTimeStatusUpdate={(): void => {
onCustomTimeStatusUpdate();
}}
items={[
{ label: 'Last 5 minutes', value: '5m' },
{ label: 'Custom', value: 'custom' },
]}
minTime={dayjs('2024-01-01 00:00:00').valueOf() * 1000_000}
maxTime={dayjs('2024-01-01 01:00:00').valueOf() * 1000_000}
showLiveLogs={showLiveLogs}
/>
);
}
describe('CustomTimePicker', () => {
it('does not close or reset when clicking input while open', () => {
render(<Wrapper />);
const input = screen.getByRole('textbox');
// Open popover
fireEvent.focus(input);
// Type some text
fireEvent.change(input, { target: { value: '5m' } });
// Click the input again while open
fireEvent.mouseDown(input);
fireEvent.click(input);
// Value should remain as typed
expect((input as HTMLInputElement).value).toBe('5m');
});
it('applies valid shorthand on Enter', () => {
const onValid = jest.fn();
const onError = jest.fn();
render(<Wrapper onValidCustomDateChange={onValid} onError={onError} />);
const input = screen.getByRole('textbox');
fireEvent.focus(input);
fireEvent.change(input, { target: { value: '5m' } });
fireEvent.keyDown(input, { key: 'Enter', code: 'Enter' });
expect(onValid).toHaveBeenCalledTimes(1);
// onError(false) may be called by internal reset logic; we only assert that
// it was never called with a truthy error state
expect(onError).not.toHaveBeenCalledWith(true);
});
it('sets error and updates custom time status for invalid shorthand exceeding max allowed window', () => {
const onValid = jest.fn();
const onError = jest.fn();
const onCustomTimeStatusUpdate = jest.fn();
render(
<Wrapper
onValidCustomDateChange={onValid}
onError={onError}
onCustomTimeStatusUpdate={onCustomTimeStatusUpdate}
/>,
);
const input = screen.getByRole('textbox');
fireEvent.focus(input);
// large number of days to ensure it exceeds the 15 months allowed window
fireEvent.change(input, { target: { value: '9999d' } });
fireEvent.keyDown(input, { key: 'Enter', code: 'Enter' });
expect(onError).toHaveBeenCalledWith(true);
expect(onCustomTimeStatusUpdate).toHaveBeenCalledWith();
expect(onValid).not.toHaveBeenCalled();
});
it('treats close after change like pressing Enter (blur + chevron)', () => {
const onValid = jest.fn();
const onError = jest.fn();
render(<Wrapper onValidCustomDateChange={onValid} onError={onError} />);
const input = screen.getByRole('textbox');
// Open and change value so "changed since open" is true
fireEvent.focus(input);
fireEvent.change(input, { target: { value: '5m' } });
fireEvent.blur(input);
// Click the chevron (which triggers handleClose)
const chevron = document.querySelector(
'.time-input-suffix-icon-badge',
) as HTMLElement;
fireEvent.click(chevron);
// Should have applied the value (same as Enter)
expect(onValid).toHaveBeenCalledTimes(1);
expect(onError).not.toHaveBeenCalledWith(true);
});
it('applies epoch start/end range on Enter via onCustomDateHandler', () => {
const onCustomDateHandler = jest.fn();
const onError = jest.fn();
render(
<Wrapper onCustomDateHandler={onCustomDateHandler} onError={onError} />,
);
const now = dayjs().valueOf();
const later = dayjs().add(1, 'hour').valueOf();
const input = screen.getByRole('textbox');
fireEvent.focus(input);
fireEvent.change(input, {
target: { value: `${now} - ${later}` },
});
fireEvent.keyDown(input, { key: 'Enter', code: 'Enter' });
expect(onCustomDateHandler).toHaveBeenCalledTimes(1);
expect(onError).not.toHaveBeenCalledWith(true);
});
it('uses validateTimeRange result for generic formatted ranges (valid case)', () => {
const validateTimeRangeSpy = jest.spyOn(timeUtils, 'validateTimeRange');
const onCustomDateHandler = jest.fn();
const onError = jest.fn();
validateTimeRangeSpy.mockReturnValue({
isValid: true,
errorDetails: undefined,
startTimeMs: dayjs('2024-01-01 00:00:00').valueOf(),
endTimeMs: dayjs('2024-01-01 01:00:00').valueOf(),
});
render(
<Wrapper onCustomDateHandler={onCustomDateHandler} onError={onError} />,
);
const input = screen.getByRole('textbox');
fireEvent.focus(input);
fireEvent.change(input, {
target: { value: 'foo - bar' },
});
fireEvent.keyDown(input, { key: 'Enter', code: 'Enter' });
expect(validateTimeRangeSpy).toHaveBeenCalled();
expect(onCustomDateHandler).toHaveBeenCalledTimes(1);
expect(onError).not.toHaveBeenCalledWith(true);
validateTimeRangeSpy.mockRestore();
});
it('uses validateTimeRange result for generic formatted ranges (invalid case)', () => {
const validateTimeRangeSpy = jest.spyOn(timeUtils, 'validateTimeRange');
const onValid = jest.fn();
const onError = jest.fn();
validateTimeRangeSpy.mockReturnValue({
isValid: false,
errorDetails: {
message: 'Invalid range',
code: 'INVALID_RANGE',
description: 'Start must be before end',
},
startTimeMs: 0,
endTimeMs: 0,
});
render(<Wrapper onValidCustomDateChange={onValid} onError={onError} />);
const input = screen.getByRole('textbox');
fireEvent.focus(input);
fireEvent.change(input, {
target: { value: 'foo - bar' },
});
fireEvent.keyDown(input, { key: 'Enter', code: 'Enter' });
expect(validateTimeRangeSpy).toHaveBeenCalled();
expect(onError).toHaveBeenCalledWith(true);
expect(onValid).not.toHaveBeenCalled();
validateTimeRangeSpy.mockRestore();
});
it('opens live mode with correct label', () => {
render(<Wrapper showLiveLogs />);
const input = screen.getByRole('textbox');
fireEvent.focus(input);
expect((input as HTMLInputElement).value).toBe('Live');
});
});

View File

@@ -104,6 +104,10 @@ function CustomTimePicker({
const location = useLocation();
const inputRef = useRef<InputRef>(null);
const initialInputValueOnOpenRef = useRef<string>('');
const hasChangedSinceOpenRef = useRef<boolean>(false);
// Tracks if the last pointer down was on the input so we don't close the popover when user clicks the input again
const isClickFromInputRef = useRef(false);
const [activeView, setActiveView] = useState<ViewType>(DEFAULT_VIEW);
@@ -238,6 +242,21 @@ function CustomTimePicker({
};
const handleOpenChange = (newOpen: boolean): void => {
// Don't close when the user clicked the input (trigger); Ant Design treats trigger as "outside" overlay
if (!newOpen && isClickFromInputRef.current) {
isClickFromInputRef.current = false;
return;
}
isClickFromInputRef.current = false;
// If the popover is trying to close and the value changed since opening,
// treat it as if the user pressed Enter (attempt to apply the value)
if (!newOpen && hasChangedSinceOpenRef.current) {
hasChangedSinceOpenRef.current = false;
handleInputPressEnter();
return;
}
setOpen(newOpen);
if (!newOpen) {
@@ -406,10 +425,18 @@ function CustomTimePicker({
const handleOpen = (e?: React.SyntheticEvent): void => {
e?.stopPropagation?.();
// If the popover is already open, avoid resetting the input value
// so that any in-progress edits are preserved.
if (open) {
return;
}
if (showLiveLogs) {
setOpen(true);
setSelectedTimePlaceholderValue('Live');
setInputValue('Live');
initialInputValueOnOpenRef.current = 'Live';
hasChangedSinceOpenRef.current = false;
return;
}
@@ -424,11 +451,21 @@ function CustomTimePicker({
.tz(timezone.value)
.format(DATE_TIME_FORMATS.UK_DATETIME_SECONDS);
setInputValue(`${startTime} - ${endTime}`);
const nextValue = `${startTime} - ${endTime}`;
setInputValue(nextValue);
initialInputValueOnOpenRef.current = nextValue;
hasChangedSinceOpenRef.current = false;
};
const handleClose = (e: React.MouseEvent): void => {
e.stopPropagation();
// If the value changed since opening, treat this like pressing Enter
if (hasChangedSinceOpenRef.current) {
hasChangedSinceOpenRef.current = false;
handleInputPressEnter();
return;
}
setOpen(false);
setCustomDTPickerVisible?.(false);
@@ -450,6 +487,9 @@ function CustomTimePicker({
}, [location.pathname]);
const handleInputBlur = (): void => {
// Track whether the value was changed since the input was opened for editing
hasChangedSinceOpenRef.current =
inputValue !== initialInputValueOnOpenRef.current;
resetErrorStatus();
};
@@ -552,6 +592,12 @@ function CustomTimePicker({
readOnly={!open || showLiveLogs}
placeholder={selectedTimePlaceholderValue}
value={inputValue}
onMouseDown={(e): void => {
// Only treat as "click from input" when the actual input element is clicked (not suffix/chevron)
if (e.target === inputRef.current?.input) {
isClickFromInputRef.current = true;
}
}}
onFocus={handleOpen}
onClick={handleOpen}
onChange={handleInputChange}

View File

@@ -237,11 +237,21 @@ func (q *promqlQuery) Execute(ctx context.Context) (*qbv5.Result, error) {
return nil, errors.WrapInternalf(promErr, errors.CodeInternal, "error getting matrix from promql query %q", query)
}
excludeLabel := func(labelName string) bool {
if labelName == "__name__" {
return false
}
return strings.HasPrefix(labelName, "__") || labelName == "fingerprint"
}
var series []*qbv5.TimeSeries
for _, v := range matrix {
var s qbv5.TimeSeries
lbls := make([]*qbv5.Label, 0, len(v.Metric))
for name, value := range v.Metric.Copy().Map() {
if excludeLabel(name) {
continue
}
lbls = append(lbls, &qbv5.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: name},
Value: value,

View File

@@ -114,6 +114,7 @@ func (r *Repo) GetLatestVersion(
func (r *Repo) insertConfig(
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, c *opamptypes.AgentConfigVersion, elements []string,
) error {
if c.ElementType.StringValue() == "" {
return errors.NewInvalidInputf(CodeElementTypeRequired, "element type is required for creating agent config version")
}
@@ -227,55 +228,6 @@ func (r *Repo) updateDeployStatus(ctx context.Context,
return nil
}
// GetDeployStatusByHash returns the DeployStatus for the given config hash
// (stored with orgId prefix). Returns DeployStatusUnknown when no matching row exists.
func (r *Repo) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
var version opamptypes.AgentConfigVersion
err := r.store.BunDB().NewSelect().
Model(&version).
ColumnExpr("deploy_status").
Where("hash = ?", configHash).
Where("org_id = ?", orgId).
Scan(ctx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return opamptypes.DeployStatusUnknown, nil
}
return opamptypes.DeployStatusUnknown, errors.WrapInternalf(err, errors.CodeInternal, "failed to query deploy status by hash")
}
return version.DeployStatus, nil
}
// GetPendingDeployments returns all config versions with in_progress deploy status.
// Used on server startup to rehydrate coordinator subscribers that were lost on crash/restart.
func (r *Repo) GetPendingDeployments(ctx context.Context) ([]opamptypes.AgentConfigVersion, error) {
var versions []opamptypes.AgentConfigVersion
err := r.store.BunDB().NewSelect().
Model(&versions).
ColumnExpr("org_id, hash").
// Only consider non-terminal deployment states (i.e. anything except failed or deployed).
Where("deploy_status NOT IN (?, ?)",
opamptypes.DeployFailed.StringValue(),
opamptypes.Deployed.StringValue(),
).
// For each org, keep only the latest pending version.
Where("version = (SELECT MAX(version) FROM agent_config_version WHERE org_id = acv.org_id AND deploy_status NOT IN (?, ?))",
opamptypes.DeployFailed.StringValue(),
opamptypes.Deployed.StringValue(),
).
// Exclude any pending version that is before a terminal (deployed/failed) version for the same org.
Where("NOT EXISTS (SELECT 1 FROM agent_config_version acv2 WHERE acv2.org_id = acv.org_id AND acv2.version > acv.version AND acv2.deploy_status IN (?, ?))",
opamptypes.Deployed.StringValue(),
opamptypes.DeployFailed.StringValue(),
).
Where("hash IS NOT NULL AND hash != ''").
Scan(ctx)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to query pending deployments")
}
return versions, nil
}
func (r *Repo) updateDeployStatusByHash(
ctx context.Context, orgId valuer.UUID, confighash string, status string, result string,
) error {

View File

@@ -178,31 +178,6 @@ func (m *Manager) ReportConfigDeploymentStatus(
}
}
// Implements model.AgentConfigProvider
func (m *Manager) GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error) {
return m.Repo.GetDeployStatusByHash(ctx, orgId, configHash)
}
// Implements opamp.AgentConfigProvider
func (m *Manager) GetPendingDeployments(ctx context.Context) ([]opamp.PendingDeployment, error) {
versions, err := m.Repo.GetPendingDeployments(ctx)
if err != nil {
return nil, err
}
result := make([]opamp.PendingDeployment, 0, len(versions))
for _, v := range versions {
rawHash := strings.TrimPrefix(v.Hash, v.OrgID.String())
if rawHash == "" {
continue
}
result = append(result, opamp.PendingDeployment{
OrgID: v.OrgID,
RawConfigHash: rawHash,
})
}
return result, nil
}
func GetLatestVersion(
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType,
) (*opamptypes.AgentConfigVersion, error) {

View File

@@ -1,21 +1,6 @@
package opamp
import (
"context"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/SigNoz/signoz/pkg/valuer"
)
// PendingDeployment is an agent config deployment still in the in_progress state.
// These are re-registered in the coordinator on server startup so that
// notifySubscribers can find them when the agent reconnects after a crash.
type PendingDeployment struct {
OrgID valuer.UUID
// RawConfigHash is the hash without the orgId prefix, matching the
// ConfigHash sent in AgentRemoteConfig and reported back by the agent.
RawConfigHash string
}
import "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
// Interface for a source of otel collector config recommendations.
type AgentConfigProvider interface {
@@ -24,9 +9,4 @@ type AgentConfigProvider interface {
// Subscribe to be notified on changes in config provided by this source.
// Used for rolling out latest config recommendation to all connected agents when settings change
SubscribeToConfigUpdates(callback func()) (unsubscribe func())
// GetPendingDeployments returns all config deployments currently in_progress.
// Called on server startup to re-register coordinator subscribers that were
// lost when the server previously crashed or restarted.
GetPendingDeployments(ctx context.Context) ([]PendingDeployment, error)
}

View File

@@ -5,7 +5,6 @@ import (
"log"
"net"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"github.com/knadh/koanf"
@@ -128,16 +127,6 @@ func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus(orgID valuer.UUID
return exists
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) GetDeployStatusByHash(_ context.Context, _ valuer.UUID, _ string) (opamptypes.DeployStatus, error) {
return opamptypes.DeployStatusUnknown, nil
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) GetPendingDeployments(_ context.Context) ([]PendingDeployment, error) {
return nil, nil
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) SubscribeToConfigUpdates(callback func()) func() {
subscriberId := uuid.NewString()

View File

@@ -112,69 +112,51 @@ func ExtractLbFlag(agentDescr *protobufs.AgentDescription) bool {
return false
}
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) (agentDescrChanged bool) {
func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
prevStatus := agent.Status
if agent.Status == nil {
// initialize the remote config status to unset
agent.Status = &protobufs.AgentToServer{
RemoteConfigStatus: &protobufs.RemoteConfigStatus{
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
},
}
// First time this Agent reports a status, remember it.
agent.Status = newStatus
agentDescrChanged = true
} else {
// Not a new Agent. Update the Status.
agent.Status.SequenceNum = newStatus.SequenceNum
rawHash := string(newStatus.RemoteConfigStatus.LastRemoteConfigHash)
dbHash := agent.OrgID.String() + rawHash
deployStatus, err := configProvider.GetDeployStatusByHash(context.Background(), agent.OrgID, dbHash)
if err == nil {
// Set the agent config status to the status from the database
agent.Status.RemoteConfigStatus.Status = opamptypes.DeployStatusToProtoStatus[deployStatus]
}
// First message from this agent instance (new connect or server restart).
// If the agent brings a RemoteConfigStatus, consult the DB to decide
// whether this resolves a pending deployment. This is the authoritative
// answer: if DB says in_progress and agent says APPLIED/FAILED, we notify.
// No mock status, no guessing — the DB IS the source of truth.
if newStatus.RemoteConfigStatus != nil {
// Agent just started, i.e. it doesn't have a remote config status yet.
if newStatus.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET {
agentDescrChanged = true
agent.Status.RemoteConfigStatus.Status = protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET
// Check what's changed in the AgentDescription.
if newStatus.AgentDescription != nil {
// If the AgentDescription field is set it means the Agent tells us
// something is changed in the field since the last status report
// (or this is the first report).
// Make full comparison of previous and new descriptions to see if it
// really is different.
if prevStatus != nil && proto.Equal(prevStatus.AgentDescription, newStatus.AgentDescription) {
// Agent description didn't change.
agentDescrChanged = false
} else {
// else Agent was already running, Server just reconnected.
// Yes, the description is different, update it.
agent.Status.AgentDescription = newStatus.AgentDescription
// database has already recorded the final status of the deployment, So here we don't need to prepare status for the agent
// Instead we directly Copy it from newStatus
switch agent.Status.RemoteConfigStatus.Status {
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED:
agent.Status.RemoteConfigStatus.Status = newStatus.RemoteConfigStatus.Status
}
agentDescrChanged = true
}
}
}
// Subsequent message — update sequence number and diff fields.
agent.Status.SequenceNum = newStatus.SequenceNum
if newStatus.AgentDescription != nil {
if proto.Equal(agent.Status.AgentDescription, newStatus.AgentDescription) {
agentDescrChanged = false
} else {
agent.Status.AgentDescription = newStatus.AgentDescription
agentDescrChanged = true
// AgentDescription field is not set, which means description didn't change.
agentDescrChanged = false
}
}
// Notify subscribers when RemoteConfigStatus changes.
if newStatus.RemoteConfigStatus != nil &&
!proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
hash := string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash)
switch agent.Status.RemoteConfigStatus.Status {
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED:
onConfigSuccess(agent.OrgID, agent.AgentID, hash)
case protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED:
onConfigFailure(agent.OrgID, agent.AgentID, hash, agent.Status.RemoteConfigStatus.ErrorMessage)
// Update remote config status if it is included and is different from what we have.
if newStatus.RemoteConfigStatus != nil &&
!proto.Equal(agent.Status.RemoteConfigStatus, newStatus.RemoteConfigStatus) {
agent.Status.RemoteConfigStatus = newStatus.RemoteConfigStatus
// todo: need to address multiple agent scenario here
// for now, the first response will be sent back to the UI
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED {
onConfigSuccess(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
}
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED {
onConfigFailure(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
}
}
}
@@ -204,8 +186,14 @@ func (agent *Agent) updateRemoteConfigStatus(newStatus *protobufs.AgentToServer)
}
}
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer, configProvider AgentConfigProvider) (agentDescrChanged bool) {
agentDescrChanged = agent.updateAgentDescription(newStatus, configProvider)
func (agent *Agent) updateStatusField(newStatus *protobufs.AgentToServer) (agentDescrChanged bool) {
if agent.Status == nil {
// First time this Agent reports a status, remember it.
agent.Status = newStatus
agentDescrChanged = true
}
agentDescrChanged = agent.updateAgentDescription(newStatus) || agentDescrChanged
agent.updateRemoteConfigStatus(newStatus)
agent.updateHealth(newStatus)
return agentDescrChanged
@@ -250,7 +238,7 @@ func (agent *Agent) processStatusUpdate(
// current status is not up-to-date.
lostPreviousUpdate := (agent.Status == nil) || (agent.Status != nil && agent.Status.SequenceNum+1 != newStatus.SequenceNum)
agentDescrChanged := agent.updateStatusField(newStatus, configProvider)
agentDescrChanged := agent.updateStatusField(newStatus)
// Check if any fields were omitted in the status report.
effectiveConfigOmitted := newStatus.EffectiveConfig == nil &&

View File

@@ -1,11 +1,6 @@
package model
import (
"context"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
import "github.com/SigNoz/signoz/pkg/valuer"
// Interface for source of otel collector config recommendations.
type AgentConfigProvider interface {
@@ -25,10 +20,4 @@ type AgentConfigProvider interface {
configId string,
err error,
)
// GetDeployStatusByHash returns the DeployStatus for the given config hash
// (with orgId prefix as stored in the DB). Returns DeployStatusUnknown when
// no matching row exists. Used by the agent's first-connect handler to
// determine whether the reported RemoteConfigStatus resolves a pending deployment.
GetDeployStatusByHash(ctx context.Context, orgId valuer.UUID, configHash string) (opamptypes.DeployStatus, error)
}

View File

@@ -41,6 +41,7 @@ func onConfigFailure(orgId valuer.UUID, agentId string, hash string, errorMessag
notifySubscribers(orgId, agentId, key, errors.New(errorMessage))
}
// OnSuccess listens to config changes and notifies subscribers
func notifySubscribers(orgId valuer.UUID, agentId string, key string, err error) {
// this method currently does not handle multi-agent scenario.
// as soon as a message is delivered, we release all the subscribers
@@ -65,7 +66,6 @@ func ListenToConfigUpdate(orgId valuer.UUID, agentId string, hash string, ss OnC
defer coordinator.mutex.Unlock()
key := getSubscriberKey(orgId, hash)
if subs, ok := coordinator.subscribers[key]; ok {
subs = append(subs, ss)
coordinator.subscribers[key] = subs

View File

@@ -66,17 +66,6 @@ func (srv *Server) Start(listener string) error {
ListenEndpoint: listener,
}
// Re-register coordinator subscribers for any deployments that were in_progress
// when the server last shut down or crashed. Without this, notifySubscribers
// would find an empty map and the deployment status would stay stuck in_progress.
if pending, err := srv.agentConfigProvider.GetPendingDeployments(context.Background()); err != nil {
return err
} else {
for _, dep := range pending {
model.ListenToConfigUpdate(dep.OrgID, "", dep.RawConfigHash, srv.agentConfigProvider.ReportConfigDeploymentStatus)
}
}
// This will have to send request to all the agents of all tenants
unsubscribe := srv.agentConfigProvider.SubscribeToConfigUpdates(func() {
err := srv.agents.RecommendLatestConfigToAll(srv.agentConfigProvider)

View File

@@ -6,7 +6,6 @@ import (
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/uptrace/bun"
)
@@ -79,15 +78,6 @@ var (
DeployStatusUnknown = DeployStatus{valuer.NewString("unknown")}
)
var DeployStatusToProtoStatus = map[DeployStatus]protobufs.RemoteConfigStatuses{
PendingDeploy: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
Deploying: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
Deployed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
DeployInitiated: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING,
DeployFailed: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
DeployStatusUnknown: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_UNSET,
}
type AgentConfigVersion struct {
bun.BaseModel `bun:"table:agent_config_version,alias:acv"`