mirror of
https://github.com/SigNoz/signoz.git
synced 2026-05-08 19:40:30 +01:00
Compare commits
1 Commits
issue_4522
...
make-sa-ro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6c725d903 |
@@ -15,8 +15,8 @@ interface OverviewTabProps {
|
||||
account: ServiceAccountRow;
|
||||
localName: string;
|
||||
onNameChange: (v: string) => void;
|
||||
localRole: string;
|
||||
onRoleChange: (v: string | undefined) => void;
|
||||
localRoles: string[];
|
||||
onRolesChange: (v: string[]) => void;
|
||||
isDisabled: boolean;
|
||||
availableRoles: AuthtypesRoleDTO[];
|
||||
rolesLoading?: boolean;
|
||||
@@ -30,8 +30,8 @@ function OverviewTab({
|
||||
account,
|
||||
localName,
|
||||
onNameChange,
|
||||
localRole,
|
||||
onRoleChange,
|
||||
localRoles,
|
||||
onRolesChange,
|
||||
isDisabled,
|
||||
availableRoles,
|
||||
rolesLoading,
|
||||
@@ -94,10 +94,15 @@ function OverviewTab({
|
||||
{isDisabled ? (
|
||||
<div className="sa-drawer__input-wrapper sa-drawer__input-wrapper--disabled">
|
||||
<div className="sa-drawer__disabled-roles">
|
||||
{localRole ? (
|
||||
<Badge color="vanilla">
|
||||
{availableRoles.find((r) => r.id === localRole)?.name ?? localRole}
|
||||
</Badge>
|
||||
{localRoles.length > 0 ? (
|
||||
localRoles.map((roleId) => {
|
||||
const role = availableRoles.find((r) => r.id === roleId);
|
||||
return (
|
||||
<Badge key={roleId} color="vanilla">
|
||||
{role?.name ?? roleId}
|
||||
</Badge>
|
||||
);
|
||||
})
|
||||
) : (
|
||||
<span className="sa-drawer__input-text">—</span>
|
||||
)}
|
||||
@@ -107,14 +112,15 @@ function OverviewTab({
|
||||
) : (
|
||||
<RolesSelect
|
||||
id="sa-roles"
|
||||
mode="multiple"
|
||||
roles={availableRoles}
|
||||
loading={rolesLoading}
|
||||
isError={rolesError}
|
||||
error={rolesErrorObj}
|
||||
onRefetch={onRefetchRoles}
|
||||
value={localRole}
|
||||
onChange={onRoleChange}
|
||||
placeholder="Select role"
|
||||
value={localRoles}
|
||||
onChange={onRolesChange}
|
||||
placeholder="Select roles"
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
|
||||
@@ -11,9 +11,7 @@ import {
|
||||
import { Pagination, Skeleton } from 'antd';
|
||||
import { convertToApiError } from 'api/ErrorResponseHandlerForGeneratedAPIs';
|
||||
import {
|
||||
getGetServiceAccountRolesQueryKey,
|
||||
getListServiceAccountsQueryKey,
|
||||
useDeleteServiceAccountRole,
|
||||
useGetServiceAccount,
|
||||
useListServiceAccountKeys,
|
||||
useUpdateServiceAccount,
|
||||
@@ -40,7 +38,7 @@ import {
|
||||
useQueryState,
|
||||
} from 'nuqs';
|
||||
import APIError from 'types/api/error';
|
||||
import { retryOn429, toAPIError } from 'utils/errorUtils';
|
||||
import { toAPIError } from 'utils/errorUtils';
|
||||
|
||||
import AddKeyModal from './AddKeyModal';
|
||||
import DeleteAccountModal from './DeleteAccountModal';
|
||||
@@ -95,7 +93,7 @@ function ServiceAccountDrawer({
|
||||
parseAsBoolean.withDefault(false),
|
||||
);
|
||||
const [localName, setLocalName] = useState('');
|
||||
const [localRole, setLocalRole] = useState('');
|
||||
const [localRoles, setLocalRoles] = useState<string[]>([]);
|
||||
const [isSaving, setIsSaving] = useState(false);
|
||||
const [saveErrors, setSaveErrors] = useState<SaveError[]>([]);
|
||||
|
||||
@@ -143,7 +141,7 @@ function ServiceAccountDrawer({
|
||||
if (!account?.id) {
|
||||
roleSessionRef.current = null;
|
||||
} else if (account.id !== roleSessionRef.current && !isRolesLoading) {
|
||||
setLocalRole(currentRoles[0]?.id ?? '');
|
||||
setLocalRoles(currentRoles.map((r) => r.id).filter(Boolean) as string[]);
|
||||
roleSessionRef.current = account.id;
|
||||
}
|
||||
}, [account?.id, currentRoles, isRolesLoading]);
|
||||
@@ -154,7 +152,13 @@ function ServiceAccountDrawer({
|
||||
const isDirty =
|
||||
account !== null &&
|
||||
(localName !== (account.name ?? '') ||
|
||||
localRole !== (currentRoles[0]?.id ?? ''));
|
||||
JSON.stringify([...localRoles].sort()) !==
|
||||
JSON.stringify(
|
||||
currentRoles
|
||||
.map((r) => r.id)
|
||||
.filter(Boolean)
|
||||
.sort(),
|
||||
));
|
||||
|
||||
const {
|
||||
roles: availableRoles,
|
||||
@@ -182,27 +186,6 @@ function ServiceAccountDrawer({
|
||||
|
||||
// the retry for this mutation is safe due to the api being idempotent on backend
|
||||
const { mutateAsync: updateMutateAsync } = useUpdateServiceAccount();
|
||||
const { mutateAsync: deleteRole } = useDeleteServiceAccountRole({
|
||||
mutation: {
|
||||
retry: retryOn429,
|
||||
},
|
||||
});
|
||||
|
||||
const executeRolesOperation = useCallback(
|
||||
async (accountId: string): Promise<RoleUpdateFailure[]> => {
|
||||
if (localRole === '' && currentRoles[0]?.id) {
|
||||
await deleteRole({
|
||||
pathParams: { id: accountId, rid: currentRoles[0].id },
|
||||
});
|
||||
await queryClient.invalidateQueries(
|
||||
getGetServiceAccountRolesQueryKey({ id: accountId }),
|
||||
);
|
||||
return [];
|
||||
}
|
||||
return applyDiff([localRole].filter(Boolean), availableRoles);
|
||||
},
|
||||
[localRole, currentRoles, availableRoles, applyDiff, deleteRole, queryClient],
|
||||
);
|
||||
|
||||
const retryNameUpdate = useCallback(async (): Promise<void> => {
|
||||
if (!account) {
|
||||
@@ -270,7 +253,7 @@ function ServiceAccountDrawer({
|
||||
|
||||
const retryRolesUpdate = useCallback(async (): Promise<void> => {
|
||||
try {
|
||||
const failures = await executeRolesOperation(selectedAccountId ?? '');
|
||||
const failures = await applyDiff([...localRoles], availableRoles);
|
||||
if (failures.length === 0) {
|
||||
setSaveErrors((prev) => prev.filter((e) => e.context !== 'Roles update'));
|
||||
} else {
|
||||
@@ -286,7 +269,7 @@ function ServiceAccountDrawer({
|
||||
),
|
||||
);
|
||||
}
|
||||
}, [selectedAccountId, executeRolesOperation, failuresToSaveErrors]);
|
||||
}, [localRoles, availableRoles, applyDiff, failuresToSaveErrors]);
|
||||
|
||||
const handleSave = useCallback(async (): Promise<void> => {
|
||||
if (!account || !isDirty) {
|
||||
@@ -305,7 +288,7 @@ function ServiceAccountDrawer({
|
||||
|
||||
const [nameResult, rolesResult] = await Promise.allSettled([
|
||||
namePromise,
|
||||
executeRolesOperation(account.id),
|
||||
applyDiff([...localRoles], availableRoles),
|
||||
]);
|
||||
|
||||
const errors: SaveError[] = [];
|
||||
@@ -346,8 +329,10 @@ function ServiceAccountDrawer({
|
||||
account,
|
||||
isDirty,
|
||||
localName,
|
||||
localRoles,
|
||||
availableRoles,
|
||||
updateMutateAsync,
|
||||
executeRolesOperation,
|
||||
applyDiff,
|
||||
refetchAccount,
|
||||
onSuccess,
|
||||
queryClient,
|
||||
@@ -446,9 +431,9 @@ function ServiceAccountDrawer({
|
||||
account={account}
|
||||
localName={localName}
|
||||
onNameChange={handleNameChange}
|
||||
localRole={localRole}
|
||||
onRoleChange={(role): void => {
|
||||
setLocalRole(role ?? '');
|
||||
localRoles={localRoles}
|
||||
onRolesChange={(roles): void => {
|
||||
setLocalRoles(roles);
|
||||
clearRoleErrors();
|
||||
}}
|
||||
isDisabled={isDeleted}
|
||||
|
||||
@@ -147,7 +147,7 @@ describe('ServiceAccountDrawer', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('changing roles enables Save; clicking Save sends role add request without delete', async () => {
|
||||
it('adding a role fires POST for the new role and no DELETE for existing roles', async () => {
|
||||
const roleSpy = jest.fn();
|
||||
const deleteSpy = jest.fn();
|
||||
const user = userEvent.setup({ pointerEventsCheck: 0 });
|
||||
@@ -167,6 +167,7 @@ describe('ServiceAccountDrawer', () => {
|
||||
|
||||
await screen.findByDisplayValue('CI Bot');
|
||||
|
||||
// Add signoz-viewer while keeping signoz-admin selected
|
||||
await user.click(screen.getByLabelText('Roles'));
|
||||
await user.click(await screen.findByTitle('signoz-viewer'));
|
||||
|
||||
@@ -184,6 +185,43 @@ describe('ServiceAccountDrawer', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('removing a role fires DELETE for the removed role and no POST', async () => {
|
||||
const roleSpy = jest.fn();
|
||||
const deleteSpy = jest.fn();
|
||||
const user = userEvent.setup({ pointerEventsCheck: 0 });
|
||||
|
||||
server.use(
|
||||
rest.post(SA_ROLES_ENDPOINT, async (req, res, ctx) => {
|
||||
roleSpy(await req.json());
|
||||
return res(ctx.status(200), ctx.json({ status: 'success', data: {} }));
|
||||
}),
|
||||
rest.delete(SA_ROLE_DELETE_ENDPOINT, (_, res, ctx) => {
|
||||
deleteSpy();
|
||||
return res(ctx.status(200), ctx.json({ status: 'success', data: {} }));
|
||||
}),
|
||||
);
|
||||
|
||||
renderDrawer();
|
||||
|
||||
await screen.findByDisplayValue('CI Bot');
|
||||
|
||||
// Remove the signoz-admin tag from the multi-select
|
||||
const adminTag = await screen.findByTitle('signoz-admin');
|
||||
const removeBtn = adminTag.querySelector(
|
||||
'.ant-select-selection-item-remove',
|
||||
) as Element;
|
||||
await user.click(removeBtn);
|
||||
|
||||
const saveBtn = screen.getByRole('button', { name: /Save Changes/i });
|
||||
await waitFor(() => expect(saveBtn).not.toBeDisabled());
|
||||
await user.click(saveBtn);
|
||||
|
||||
await waitFor(() => {
|
||||
expect(deleteSpy).toHaveBeenCalled();
|
||||
expect(roleSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it('"Delete Service Account" opens confirm dialog; confirming sends delete request', async () => {
|
||||
const deleteSpy = jest.fn();
|
||||
const user = userEvent.setup({ pointerEventsCheck: 0 });
|
||||
|
||||
@@ -3,6 +3,7 @@ import { useQueryClient } from 'react-query';
|
||||
import {
|
||||
getGetServiceAccountRolesQueryKey,
|
||||
useCreateServiceAccountRole,
|
||||
useDeleteServiceAccountRole,
|
||||
useGetServiceAccountRoles,
|
||||
} from 'api/generated/services/serviceaccount';
|
||||
import type { AuthtypesRoleDTO } from 'api/generated/services/sigNoz.schemas';
|
||||
@@ -44,6 +45,9 @@ export function useServiceAccountRoleManager(
|
||||
const { mutateAsync: createRole } = useCreateServiceAccountRole({
|
||||
mutation: { retry: retryOn429 },
|
||||
});
|
||||
const { mutateAsync: deleteRole } = useDeleteServiceAccountRole({
|
||||
mutation: { retry: retryOn429 },
|
||||
});
|
||||
|
||||
const invalidateRoles = useCallback(
|
||||
() =>
|
||||
@@ -68,14 +72,21 @@ export function useServiceAccountRoleManager(
|
||||
const addedRoles = availableRoles.filter(
|
||||
(r) => r.id && desiredRoleIds.has(r.id) && !currentRoleIds.has(r.id),
|
||||
);
|
||||
const removedRoles = currentRoles.filter(
|
||||
(r) => r.id && !desiredRoleIds.has(r.id),
|
||||
);
|
||||
|
||||
// TODO: re-enable deletes once BE for this is streamlined
|
||||
const allOperations = [
|
||||
...addedRoles.map((role) => ({
|
||||
role,
|
||||
run: (): ReturnType<typeof createRole> =>
|
||||
createRole({ pathParams: { id: accountId }, data: { id: role.id } }),
|
||||
})),
|
||||
...removedRoles.map((role) => ({
|
||||
role,
|
||||
run: (): ReturnType<typeof deleteRole> =>
|
||||
deleteRole({ pathParams: { id: accountId, rid: role.id ?? '' } }),
|
||||
})),
|
||||
];
|
||||
|
||||
const results = await Promise.allSettled(
|
||||
@@ -106,7 +117,7 @@ export function useServiceAccountRoleManager(
|
||||
|
||||
return failures;
|
||||
},
|
||||
[accountId, currentRoles, createRole, invalidateRoles],
|
||||
[accountId, currentRoles, createRole, deleteRole, invalidateRoles],
|
||||
);
|
||||
|
||||
return {
|
||||
|
||||
@@ -186,7 +186,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
column := columns[0]
|
||||
if len(key.Evolutions) > 0 {
|
||||
// we will use the corresponding column and its evolution entry for the query
|
||||
newColumns, _, err := qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
|
||||
newColumns, _, err := selectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -3,7 +3,11 @@ package telemetrylogs
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz-otel-collector/utils"
|
||||
@@ -133,6 +137,113 @@ func (m *fieldMapper) getColumn(ctx context.Context, key *telemetrytypes.Telemet
|
||||
return nil, qbtypes.ErrColumnNotFound
|
||||
}
|
||||
|
||||
// selectEvolutionsForColumns selects the appropriate evolution entries for each column based on the time range.
|
||||
// Logic:
|
||||
// - Finds the latest base evolution (<= tsStartTime) across ALL columns
|
||||
// - Rejects all evolutions before this latest base evolution
|
||||
// - For duplicate evolutions it considers the oldest one (first in ReleaseTime)
|
||||
// - For each column, includes its evolution if it's >= latest base evolution and <= tsEndTime
|
||||
// - Results are sorted by ReleaseTime descending (newest first)
|
||||
func selectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetrytypes.EvolutionEntry, tsStart, tsEnd uint64) ([]*schema.Column, []*telemetrytypes.EvolutionEntry, error) {
|
||||
|
||||
sortedEvolutions := make([]*telemetrytypes.EvolutionEntry, len(evolutions))
|
||||
copy(sortedEvolutions, evolutions)
|
||||
|
||||
// sort the evolutions by ReleaseTime ascending
|
||||
sort.Slice(sortedEvolutions, func(i, j int) bool {
|
||||
return sortedEvolutions[i].ReleaseTime.Before(sortedEvolutions[j].ReleaseTime)
|
||||
})
|
||||
|
||||
tsStartTime := time.Unix(0, int64(tsStart))
|
||||
tsEndTime := time.Unix(0, int64(tsEnd))
|
||||
|
||||
// Build evolution map: column name -> evolution
|
||||
evolutionMap := make(map[string]*telemetrytypes.EvolutionEntry)
|
||||
for _, evolution := range sortedEvolutions {
|
||||
if _, exists := evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))]; exists {
|
||||
// since if there is duplicate we would just use the oldest one.
|
||||
continue
|
||||
}
|
||||
evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))] = evolution
|
||||
}
|
||||
|
||||
// Find the latest base evolution (<= tsStartTime) across ALL columns
|
||||
// Evolutions are sorted, so we can break early
|
||||
var latestBaseEvolutionAcrossAll *telemetrytypes.EvolutionEntry
|
||||
for _, evolution := range sortedEvolutions {
|
||||
if evolution.ReleaseTime.After(tsStartTime) {
|
||||
break
|
||||
}
|
||||
latestBaseEvolutionAcrossAll = evolution
|
||||
}
|
||||
|
||||
// We shouldn't reach this, it basically means there is something wrong with the evolutions data
|
||||
if latestBaseEvolutionAcrossAll == nil {
|
||||
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "no base evolution found for columns %v", columns)
|
||||
}
|
||||
|
||||
columnLookUpMap := make(map[string]*schema.Column)
|
||||
for _, column := range columns {
|
||||
columnLookUpMap[column.Name] = column
|
||||
}
|
||||
|
||||
// Collect column-evolution pairs
|
||||
type colEvoPair struct {
|
||||
column *schema.Column
|
||||
evolution *telemetrytypes.EvolutionEntry
|
||||
}
|
||||
pairs := []colEvoPair{}
|
||||
|
||||
for _, evolution := range evolutionMap {
|
||||
// Reject evolutions before the latest base evolution
|
||||
if evolution.ReleaseTime.Before(latestBaseEvolutionAcrossAll.ReleaseTime) {
|
||||
continue
|
||||
}
|
||||
// skip evolutions after tsEndTime
|
||||
if evolution.ReleaseTime.After(tsEndTime) || evolution.ReleaseTime.Equal(tsEndTime) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, exists := columnLookUpMap[evolution.ColumnName]; !exists {
|
||||
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "evolution column %s not found in columns %v", evolution.ColumnName, columns)
|
||||
}
|
||||
|
||||
pairs = append(pairs, colEvoPair{columnLookUpMap[evolution.ColumnName], evolution})
|
||||
}
|
||||
|
||||
// If no pairs found, fall back to latestBaseEvolutionAcrossAll for matching columns
|
||||
if len(pairs) == 0 {
|
||||
for _, column := range columns {
|
||||
// Use latestBaseEvolutionAcrossAll if this column name matches its column name
|
||||
if column.Name == latestBaseEvolutionAcrossAll.ColumnName {
|
||||
pairs = append(pairs, colEvoPair{column, latestBaseEvolutionAcrossAll})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by ReleaseTime descending (newest first)
|
||||
slices.SortFunc(pairs, func(a, b colEvoPair) int {
|
||||
// Sort by ReleaseTime descending (newest first)
|
||||
if a.evolution.ReleaseTime.After(b.evolution.ReleaseTime) {
|
||||
return -1
|
||||
}
|
||||
if a.evolution.ReleaseTime.Before(b.evolution.ReleaseTime) {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
|
||||
// Extract results
|
||||
newColumns := make([]*schema.Column, len(pairs))
|
||||
evolutionsEntries := make([]*telemetrytypes.EvolutionEntry, len(pairs))
|
||||
for i, pair := range pairs {
|
||||
newColumns[i] = pair.column
|
||||
evolutionsEntries[i] = pair.evolution
|
||||
}
|
||||
|
||||
return newColumns, evolutionsEntries, nil
|
||||
}
|
||||
|
||||
func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *telemetrytypes.TelemetryFieldKey) (string, error) {
|
||||
columns, err := m.getColumn(ctx, key)
|
||||
if err != nil {
|
||||
@@ -143,7 +254,7 @@ func (m *fieldMapper) FieldFor(ctx context.Context, tsStart, tsEnd uint64, key *
|
||||
var evolutionsEntries []*telemetrytypes.EvolutionEntry
|
||||
if len(key.Evolutions) > 0 {
|
||||
// we will use the corresponding column and its evolution entry for the query
|
||||
newColumns, evolutionsEntries, err = qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, tsStart, tsEnd)
|
||||
newColumns, evolutionsEntries, err = selectEvolutionsForColumns(columns, key.Evolutions, tsStart, tsEnd)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -886,7 +886,7 @@ func TestSelectEvolutionsForColumns(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
resultColumns, resultEvols, err := qbtypes.SelectEvolutionsForColumns(tc.columns, tc.evolutions, tc.tsStart, tc.tsEnd)
|
||||
resultColumns, resultEvols, err := selectEvolutionsForColumns(tc.columns, tc.evolutions, tc.tsStart, tc.tsEnd)
|
||||
|
||||
if tc.expectedError {
|
||||
assert.Contains(t, err.Error(), tc.errorStr)
|
||||
|
||||
@@ -344,11 +344,6 @@ func (t *telemetryMetaStore) getTracesKeys(ctx context.Context, fieldKeySelector
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := t.updateColumnEvolutionMetadataForKeys(ctx, keys); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
return keys, complete, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -89,20 +89,6 @@ func TestGetKeys(t *testing.T) {
|
||||
{Name: "tag_data_type", Type: "String"},
|
||||
{Name: "priority", Type: "UInt8"},
|
||||
}, [][]any{{"http.method", "tag", "String", 1}, {"http.method", "tag", "String", 1}}))
|
||||
|
||||
// Two rows above produce two evolution selectors (each contributing 4 bound args).
|
||||
mock.ExpectQuery(`FROM signoz_metadata\.distributed_column_evolution_metadata`).
|
||||
WithArgs(nil, nil, nil, nil, nil, nil, nil, nil).
|
||||
WillReturnRows(cmock.NewRows([]cmock.ColumnType{
|
||||
{Name: "signal", Type: "String"},
|
||||
{Name: "column_name", Type: "String"},
|
||||
{Name: "column_type", Type: "String"},
|
||||
{Name: "field_context", Type: "String"},
|
||||
{Name: "field_name", Type: "String"},
|
||||
{Name: "version", Type: "UInt32"},
|
||||
{Name: "release_time", Type: "Float64"},
|
||||
}, [][]any{}))
|
||||
|
||||
keys, _, err := metadata.GetKeys(context.Background(), &telemetrytypes.FieldKeySelector{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
@@ -261,27 +247,6 @@ func TestApplyBackwardCompatibleKeys(t *testing.T) {
|
||||
}, rows))
|
||||
}
|
||||
|
||||
// getTracesKeys / getLogsKeys both fetch evolution metadata; return an empty
|
||||
// result so the existing test data flows through unchanged. Each input key
|
||||
// becomes one selector contributing four bound args.
|
||||
if hasTraces || hasLogs {
|
||||
evoArgs := make([]any, 0, len(tt.inputKeys)*4)
|
||||
for range tt.inputKeys {
|
||||
evoArgs = append(evoArgs, nil, nil, nil, nil)
|
||||
}
|
||||
mock.ExpectQuery(`FROM signoz_metadata\.distributed_column_evolution_metadata`).
|
||||
WithArgs(evoArgs...).
|
||||
WillReturnRows(cmock.NewRows([]cmock.ColumnType{
|
||||
{Name: "signal", Type: "String"},
|
||||
{Name: "column_name", Type: "String"},
|
||||
{Name: "column_type", Type: "String"},
|
||||
{Name: "field_context", Type: "String"},
|
||||
{Name: "field_name", Type: "String"},
|
||||
{Name: "version", Type: "UInt32"},
|
||||
{Name: "release_time", Type: "Float64"},
|
||||
}, [][]any{}))
|
||||
}
|
||||
|
||||
selectors := []*telemetrytypes.FieldKeySelector{}
|
||||
for _, key := range tt.inputKeys {
|
||||
selectors = append(selectors, &telemetrytypes.FieldKeySelector{
|
||||
|
||||
@@ -161,41 +161,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
|
||||
|
||||
var value any
|
||||
column := columns[0]
|
||||
if len(key.Evolutions) > 0 {
|
||||
// we will use the corresponding column and its evolution entry for the query
|
||||
newColumns, _, err := qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(newColumns) == 0 {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "no valid evolution found for field %s in the given time range", key.Name)
|
||||
}
|
||||
|
||||
// Multiple columns means fieldExpression is a multiIf returning NULL when none match,
|
||||
// so a simple null check is sufficient.
|
||||
if len(newColumns) > 1 {
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(fieldExpression), nil
|
||||
} else {
|
||||
return sb.IsNull(fieldExpression), nil
|
||||
}
|
||||
}
|
||||
|
||||
// otherwise we have to find the correct exist operator based on the column type
|
||||
column = newColumns[0]
|
||||
} else if len(columns) > 1 {
|
||||
// Resource fields without evolution data still produce a multiIf in FieldFor;
|
||||
// fall back to a null check on the multiIf result.
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(fieldExpression), nil
|
||||
} else {
|
||||
return sb.IsNull(fieldExpression), nil
|
||||
}
|
||||
}
|
||||
|
||||
switch column.Type.GetType() {
|
||||
switch columns[0].Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(fieldExpression), nil
|
||||
@@ -212,7 +178,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
return sb.E(fieldExpression, value), nil
|
||||
}
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
switch elementType := columns[0].Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
value = ""
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
@@ -236,14 +202,14 @@ func (c *conditionBuilder) conditionFor(
|
||||
return sb.E(fieldExpression, value), nil
|
||||
}
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
keyType := columns[0].Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, columns[0].Type)
|
||||
}
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
switch valueType := columns[0].Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", columns[0].Name, key.Name)
|
||||
if key.Materialized {
|
||||
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
}
|
||||
@@ -256,7 +222,7 @@ func (c *conditionBuilder) conditionFor(
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", columns[0].Type)
|
||||
}
|
||||
}
|
||||
return "", nil
|
||||
|
||||
@@ -3,7 +3,6 @@ package telemetrytraces
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
@@ -217,7 +216,7 @@ func TestConditionFor(t *testing.T) {
|
||||
},
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
value: nil,
|
||||
expectedSQL: "WHERE multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) IS NOT NULL",
|
||||
expectedSQL: "WHERE multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -229,7 +228,7 @@ func TestConditionFor(t *testing.T) {
|
||||
},
|
||||
operator: qbtypes.FilterOperatorNotExists,
|
||||
value: nil,
|
||||
expectedSQL: "WHERE multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) IS NULL",
|
||||
expectedSQL: "WHERE multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NULL",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -303,85 +302,3 @@ func TestConditionFor(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestConditionForResourceWithEvolution(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
releaseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
evolutions := mockEvolutionData(releaseTime)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
operator qbtypes.FilterOperator
|
||||
tsStart uint64
|
||||
tsEnd uint64
|
||||
expectedSQL string
|
||||
}{
|
||||
{
|
||||
name: "Exists - window after release - JSON only",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedSQL: "WHERE resource.`service.name`::String IS NOT NULL",
|
||||
},
|
||||
{
|
||||
name: "NotExists - window after release - JSON only",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorNotExists,
|
||||
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedSQL: "WHERE resource.`service.name`::String IS NULL",
|
||||
},
|
||||
{
|
||||
name: "Exists - window before release - map only",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
tsStart: uint64(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedSQL: "WHERE mapContains(resources_string, 'service.name') = ?",
|
||||
},
|
||||
{
|
||||
name: "Exists - window straddles release - multiIf null check",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
tsStart: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedSQL: "WHERE multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) IS NOT NULL",
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
conditionBuilder := NewConditionBuilder(fm)
|
||||
|
||||
for _, tc := range testCases {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
cond, err := conditionBuilder.ConditionFor(ctx, tc.tsStart, tc.tsEnd, &tc.key, tc.operator, nil, sb)
|
||||
require.NoError(t, err)
|
||||
sb.Where(cond)
|
||||
sql, _ := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
assert.Contains(t, sql, tc.expectedSQL)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,7 +174,7 @@ func (m *defaultFieldMapper) getColumn(
|
||||
) ([]*schema.Column, error) {
|
||||
switch key.FieldContext {
|
||||
case telemetrytypes.FieldContextResource:
|
||||
return []*schema.Column{indexV3Columns["resources_string"], indexV3Columns["resource"]}, nil
|
||||
return []*schema.Column{indexV3Columns["resource"]}, nil
|
||||
case telemetrytypes.FieldContextScope:
|
||||
return []*schema.Column{}, qbtypes.ErrColumnNotFound
|
||||
case telemetrytypes.FieldContextAttribute:
|
||||
@@ -254,92 +254,63 @@ func (m *defaultFieldMapper) FieldFor(
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var newColumns []*schema.Column
|
||||
var evolutionsEntries []*telemetrytypes.EvolutionEntry
|
||||
if len(key.Evolutions) > 0 {
|
||||
// we will use the corresponding column and its evolution entry for the query
|
||||
newColumns, evolutionsEntries, err = qbtypes.SelectEvolutionsForColumns(columns, key.Evolutions, startNs, endNs)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
} else {
|
||||
newColumns = columns
|
||||
if len(columns) != 1 {
|
||||
return "", errors.Newf(errors.TypeInternal, errors.CodeInternal, "expected exactly 1 column, got %d", len(columns))
|
||||
}
|
||||
column := columns[0]
|
||||
|
||||
exprs := []string{}
|
||||
existExpr := []string{}
|
||||
for i, column := range newColumns {
|
||||
// Use evolution column name if available, otherwise use the column name
|
||||
columnName := column.Name
|
||||
if evolutionsEntries != nil && evolutionsEntries[i] != nil {
|
||||
columnName = evolutionsEntries[i].ColumnName
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
// json is only supported for resource context as of now
|
||||
if key.FieldContext != telemetrytypes.FieldContextResource {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
}
|
||||
oldColumn := indexV3Columns["resources_string"]
|
||||
oldKeyName := fmt.Sprintf("%s['%s']", oldColumn.Name, key.Name)
|
||||
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
|
||||
// once clickHouse dependency is updated, we need to check if we can remove it.
|
||||
if key.Materialized {
|
||||
oldKeyName = telemetrytypes.FieldKeyToMaterializedColumnName(key)
|
||||
oldKeyNameExists := telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, %s==true, %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldKeyNameExists, oldKeyName), nil
|
||||
} else {
|
||||
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
|
||||
}
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumUInt64,
|
||||
schema.ColumnTypeEnumUInt32,
|
||||
schema.ColumnTypeEnumInt8,
|
||||
schema.ColumnTypeEnumInt16,
|
||||
schema.ColumnTypeEnumBool,
|
||||
schema.ColumnTypeEnumDateTime64,
|
||||
schema.ColumnTypeEnumFixedString:
|
||||
return column.Name, nil
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
return column.Name, nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for low cardinality column type %s", elementType, column.Type)
|
||||
}
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
// json is only supported for resource context as of now
|
||||
if key.FieldContext != telemetrytypes.FieldContextResource {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
}
|
||||
// have to add ::string as clickHouse throws an error :- data types Variant/Dynamic are not allowed in GROUP BY
|
||||
// once clickHouse dependency is updated, we need to check if we can remove it.
|
||||
exprs = append(exprs, fmt.Sprintf("%s.`%s`::String", columnName, key.Name))
|
||||
existExpr = append(existExpr, fmt.Sprintf("%s.`%s` IS NOT NULL", columnName, key.Name))
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumUInt64,
|
||||
schema.ColumnTypeEnumUInt32,
|
||||
schema.ColumnTypeEnumInt8,
|
||||
schema.ColumnTypeEnumInt16,
|
||||
schema.ColumnTypeEnumBool,
|
||||
schema.ColumnTypeEnumDateTime64,
|
||||
schema.ColumnTypeEnumFixedString:
|
||||
exprs = append(exprs, column.Name)
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
exprs = append(exprs, column.Name)
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for low cardinality column type %s", elementType, column.Type)
|
||||
}
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumFloat64, schema.ColumnTypeEnumBool:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
exprs = append(exprs, telemetrytypes.FieldKeyToMaterializedColumnName(key))
|
||||
existExpr = append(existExpr, fmt.Sprintf("%s==true", telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)))
|
||||
} else {
|
||||
exprs = append(exprs, fmt.Sprintf("%s['%s']", columnName, key.Name))
|
||||
existExpr = append(existExpr, fmt.Sprintf("mapContains(%s, '%s')", columnName, key.Name))
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for map column type %s", valueType, column.Type)
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumFloat64, schema.ColumnTypeEnumBool:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for map column type %s", valueType, column.Type)
|
||||
}
|
||||
}
|
||||
|
||||
if len(exprs) == 1 {
|
||||
return exprs[0], nil
|
||||
} else if len(exprs) > 1 {
|
||||
// Ensure existExpr has the same length as exprs
|
||||
if len(existExpr) != len(exprs) {
|
||||
return "", errors.New(errors.TypeInternal, errors.CodeInternal, "length of exist exprs doesn't match to that of exprs")
|
||||
}
|
||||
finalExprs := []string{}
|
||||
for i, expr := range exprs {
|
||||
finalExprs = append(finalExprs, fmt.Sprintf("%s, %s", existExpr[i], expr))
|
||||
}
|
||||
return "multiIf(" + strings.Join(finalExprs, ", ") + ", NULL)", nil
|
||||
}
|
||||
|
||||
// should not reach here
|
||||
return columns[0].Name, nil
|
||||
return column.Name, nil
|
||||
}
|
||||
|
||||
// ColumnExpressionFor returns the column expression for the given field
|
||||
|
||||
@@ -3,7 +3,6 @@ package telemetrytraces
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
@@ -65,7 +64,7 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
},
|
||||
expectedResult: "multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL)",
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -76,7 +75,7 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
},
|
||||
expectedResult: "multiIf(`resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, NULL)",
|
||||
expectedResult: "multiIf(resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, `resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, NULL)",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
@@ -104,86 +103,3 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFieldForResourceWithEvolution(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
releaseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
evolutions := mockEvolutionData(releaseTime)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
tsStart uint64
|
||||
tsEnd uint64
|
||||
expectedResult string
|
||||
}{
|
||||
{
|
||||
name: "Window straddles release - both columns",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
tsStart: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
|
||||
},
|
||||
{
|
||||
name: "Window fully after release - JSON column only",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedResult: "resource.`service.name`::String",
|
||||
},
|
||||
{
|
||||
name: "Window fully before release - map column only",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
tsStart: uint64(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedResult: "resources_string['service.name']",
|
||||
},
|
||||
{
|
||||
name: "Window fully after release - materialized resource",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "deployment.environment",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
tsStart: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedResult: "resource.`deployment.environment`::String",
|
||||
},
|
||||
{
|
||||
name: "Window straddles release - materialized resource",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "deployment.environment",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
Evolutions: evolutions,
|
||||
},
|
||||
tsStart: uint64(time.Date(2024, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
tsEnd: uint64(time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC).UnixNano()),
|
||||
expectedResult: "multiIf(resource.`deployment.environment` IS NOT NULL, resource.`deployment.environment`::String, `resource_string_deployment$$environment_exists`==true, `resource_string_deployment$$environment`, NULL)",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
result, err := fm.FieldFor(ctx, tc.tsStart, tc.tsEnd, &tc.key)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedResult, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,9 +16,6 @@ import (
|
||||
)
|
||||
|
||||
func TestStatementBuilder(t *testing.T) {
|
||||
// releaseTime is chosen so it lands inside the standard [1747947419000, 1747983448000]ms
|
||||
// test window, keeping the multiIf SQL form for resource fields.
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
@@ -358,7 +355,7 @@ func TestStatementBuilder(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
|
||||
@@ -397,7 +394,6 @@ func TestStatementBuilder(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatementBuilderListQuery(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
@@ -654,7 +650,7 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
|
||||
@@ -687,7 +683,6 @@ func TestStatementBuilderListQuery(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
@@ -716,7 +711,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
Limit: 10,
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? LIMIT ?",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -749,7 +744,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
}},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY timestamp AS `timestamp` asc LIMIT ?",
|
||||
Query: "SELECT duration_nano AS `duration_nano`, name AS `name`, response_status_code AS `response_status_code`, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name`, span_id AS `span_id`, timestamp AS `timestamp`, trace_id AS `trace_id` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? ORDER BY timestamp AS `timestamp` asc LIMIT ?",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -763,7 +758,7 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = c.keysMap
|
||||
if mockMetadataStore.KeysMap == nil {
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
}
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
@@ -794,7 +789,6 @@ func TestStatementBuilderListQueryWithCorruptData(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStatementBuilderTraceQuery(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
@@ -917,7 +911,7 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
|
||||
@@ -950,7 +944,6 @@ func TestStatementBuilderTraceQuery(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAdjustKey(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
inputKey telemetrytypes.TelemetryFieldKey
|
||||
@@ -964,7 +957,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedKey: IntrinsicFields["trace_id"],
|
||||
},
|
||||
{
|
||||
@@ -974,7 +967,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextBody, // incorrect context
|
||||
FieldDataType: telemetrytypes.FieldDataTypeInt64,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "duration_nano",
|
||||
FieldContext: telemetrytypes.FieldContextSpan, // should be corrected
|
||||
@@ -988,7 +981,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextSpan, // correct context
|
||||
FieldDataType: telemetrytypes.FieldDataTypeInt64,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "duration_nano",
|
||||
FieldContext: telemetrytypes.FieldContextSpan, // should be corrected
|
||||
@@ -1002,8 +995,8 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedKey: *buildCompleteFieldKeyMap(releaseTime)["service.name"][0],
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedKey: *buildCompleteFieldKeyMap()["service.name"][0],
|
||||
},
|
||||
{
|
||||
name: "single matching key with context specified - override",
|
||||
@@ -1012,8 +1005,8 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
expectedKey: *buildCompleteFieldKeyMap(releaseTime)["cart.items_count"][0],
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedKey: *buildCompleteFieldKeyMap()["cart.items_count"][0],
|
||||
},
|
||||
{
|
||||
name: "multiple matching keys - all materialized",
|
||||
@@ -1050,7 +1043,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "mixed.materialization.key",
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
@@ -1064,7 +1057,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "mixed.materialization.key",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
@@ -1079,7 +1072,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "unknown.field",
|
||||
Materialized: false,
|
||||
@@ -1092,7 +1085,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
@@ -1107,7 +1100,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "cart.items_count",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
@@ -1122,7 +1115,7 @@ func TestAdjustKey(t *testing.T) {
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedKey: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user.id",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
@@ -1165,7 +1158,6 @@ func TestAdjustKey(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAdjustKeys(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
query qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]
|
||||
@@ -1191,7 +1183,7 @@ func TestAdjustKeys(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedSelectFields: []telemetrytypes.TelemetryFieldKey{
|
||||
{
|
||||
Name: "service.name",
|
||||
@@ -1228,7 +1220,7 @@ func TestAdjustKeys(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedGroupBy: []qbtypes.GroupByKey{
|
||||
{
|
||||
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{
|
||||
@@ -1275,7 +1267,7 @@ func TestAdjustKeys(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
{
|
||||
Key: qbtypes.OrderByKey{
|
||||
@@ -1334,7 +1326,7 @@ func TestAdjustKeys(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
expectedSelectFields: []telemetrytypes.TelemetryFieldKey{
|
||||
{
|
||||
Name: "trace_id",
|
||||
@@ -1389,7 +1381,7 @@ func TestAdjustKeys(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
keysMap: buildCompleteFieldKeyMap(releaseTime),
|
||||
keysMap: buildCompleteFieldKeyMap(),
|
||||
// After alias adjustment, name becomes "span.duration" with FieldContextUnspecified
|
||||
// "span.duration" is not in keysMap, so context stays unspecified
|
||||
expectedOrder: []qbtypes.OrderBy{
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
package telemetrytraces
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
func buildCompleteFieldKeyMap(releaseTime time.Time) map[string][]*telemetrytypes.TelemetryFieldKey {
|
||||
func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey {
|
||||
keysMap := map[string][]*telemetrytypes.TelemetryFieldKey{
|
||||
"service.name": {
|
||||
{
|
||||
@@ -117,33 +115,7 @@ func buildCompleteFieldKeyMap(releaseTime time.Time) map[string][]*telemetrytype
|
||||
for _, keys := range keysMap {
|
||||
for _, key := range keys {
|
||||
key.Signal = telemetrytypes.SignalTraces
|
||||
if key.FieldContext == telemetrytypes.FieldContextResource {
|
||||
key.Evolutions = mockEvolutionData(releaseTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
return keysMap
|
||||
}
|
||||
|
||||
// mockEvolutionData returns the canonical resource-column evolution timeline used in tests:
|
||||
// the legacy resources_string map at epoch 0 and the JSON resource column released at releaseTime.
|
||||
func mockEvolutionData(releaseTime time.Time) []*telemetrytypes.EvolutionEntry {
|
||||
return []*telemetrytypes.EvolutionEntry{
|
||||
{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
ColumnName: "resources_string",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
ColumnType: "Map(LowCardinality(String), String)",
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: time.Unix(0, 0),
|
||||
},
|
||||
{
|
||||
Signal: telemetrytypes.SignalTraces,
|
||||
ColumnName: "resource",
|
||||
ColumnType: "JSON()",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldName: "__all__",
|
||||
ReleaseTime: releaseTime,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
)
|
||||
|
||||
func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
requestType qbtypes.RequestType
|
||||
@@ -68,7 +67,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expected: qbtypes.Statement{
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_DIR_DESC_B AS (SELECT p.* FROM A AS p INNER JOIN B AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id, multiIf(mapContains(resources_string, 'service.name'), resources_string['service.name'], resource.`service.name` IS NOT NULL, resource.`service.name`::String, NULL) AS `service.name` FROM A_DIR_DESC_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Query: "WITH toDateTime64(1747947419000000000, 9) AS t_from, toDateTime64(1747983448000000000, 9) AS t_to, 1747945619 AS bucket_from, 1747983448 AS bucket_to, all_spans AS (SELECT *, resource_string_service$$name AS `service.name` FROM signoz_traces.distributed_signoz_index_v3 WHERE timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_A AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), A AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_A) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), __resource_filter_B AS (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), B AS (SELECT * FROM signoz_traces.distributed_signoz_index_v3 WHERE resource_fingerprint GLOBAL IN (SELECT fingerprint FROM __resource_filter_B) AND timestamp >= ? AND timestamp < ? AND ts_bucket_start >= ? AND ts_bucket_start <= ?), A_DIR_DESC_B AS (SELECT p.* FROM A AS p INNER JOIN B AS c ON p.trace_id = c.trace_id AND p.span_id = c.parent_span_id) SELECT timestamp, trace_id, span_id, name, duration_nano, parent_span_id, multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL) AS `service.name` FROM A_DIR_DESC_B ORDER BY timestamp DESC LIMIT ? SETTINGS distributed_product_mode='allow', max_memory_usage=10000000000",
|
||||
Args: []any{"1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "frontend", "%service.name%", "%service.name\":\"frontend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), "backend", "%service.name%", "%service.name\":\"backend%", uint64(1747945619), uint64(1747983448), "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10},
|
||||
},
|
||||
expectedErr: nil,
|
||||
@@ -391,7 +390,7 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
|
||||
@@ -444,7 +443,6 @@ func TestTraceOperatorStatementBuilder(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
cases := []struct {
|
||||
name string
|
||||
operator qbtypes.QueryBuilderTraceOperator
|
||||
@@ -508,7 +506,7 @@ func TestTraceOperatorStatementBuilderErrors(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
fl := flaggertest.New(t)
|
||||
aggExprRewriter := querybuilder.NewAggExprRewriter(instrumentationtest.New().ToProviderSettings(), nil, fm, cb, nil, fl)
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
@@ -17,13 +16,12 @@ import (
|
||||
)
|
||||
|
||||
func TestTraceTimeRangeOptimization(t *testing.T) {
|
||||
releaseTime := time.Date(2025, 5, 22, 22, 0, 0, 0, time.UTC)
|
||||
|
||||
fm := NewFieldMapper()
|
||||
cb := NewConditionBuilder(fm)
|
||||
mockMetadataStore := telemetrytypestest.NewMockMetadataStore()
|
||||
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap(releaseTime)
|
||||
mockMetadataStore.KeysMap = buildCompleteFieldKeyMap()
|
||||
mockMetadataStore.KeysMap["trace_id"] = []*telemetrytypes.TelemetryFieldKey{{
|
||||
Name: "trace_id",
|
||||
FieldContext: telemetrytypes.FieldContextSpan,
|
||||
|
||||
@@ -1,119 +0,0 @@
|
||||
package querybuildertypesv5
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
|
||||
// SelectEvolutionsForColumns selects the appropriate evolution entries for each column based on the time range.
|
||||
// Logic:
|
||||
// - Finds the latest base evolution (<= tsStartTime) across ALL columns
|
||||
// - Rejects all evolutions before this latest base evolution
|
||||
// - For duplicate evolutions it considers the oldest one (first in ReleaseTime)
|
||||
// - For each column, includes its evolution if it's >= latest base evolution and <= tsEndTime
|
||||
// - Results are sorted by ReleaseTime descending (newest first)
|
||||
func SelectEvolutionsForColumns(columns []*schema.Column, evolutions []*telemetrytypes.EvolutionEntry, tsStart, tsEnd uint64) ([]*schema.Column, []*telemetrytypes.EvolutionEntry, error) {
|
||||
|
||||
sortedEvolutions := make([]*telemetrytypes.EvolutionEntry, len(evolutions))
|
||||
copy(sortedEvolutions, evolutions)
|
||||
|
||||
// sort the evolutions by ReleaseTime ascending
|
||||
sort.Slice(sortedEvolutions, func(i, j int) bool {
|
||||
return sortedEvolutions[i].ReleaseTime.Before(sortedEvolutions[j].ReleaseTime)
|
||||
})
|
||||
|
||||
tsStartTime := time.Unix(0, int64(tsStart))
|
||||
tsEndTime := time.Unix(0, int64(tsEnd))
|
||||
|
||||
// Build evolution map: column name -> evolution
|
||||
evolutionMap := make(map[string]*telemetrytypes.EvolutionEntry)
|
||||
for _, evolution := range sortedEvolutions {
|
||||
if _, exists := evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))]; exists {
|
||||
// since if there is duplicate we would just use the oldest one.
|
||||
continue
|
||||
}
|
||||
evolutionMap[evolution.ColumnName+":"+evolution.FieldName+":"+strconv.Itoa(int(evolution.Version))] = evolution
|
||||
}
|
||||
|
||||
// Find the latest base evolution (<= tsStartTime) across ALL columns
|
||||
// Evolutions are sorted, so we can break early
|
||||
var latestBaseEvolutionAcrossAll *telemetrytypes.EvolutionEntry
|
||||
for _, evolution := range sortedEvolutions {
|
||||
if evolution.ReleaseTime.After(tsStartTime) {
|
||||
break
|
||||
}
|
||||
latestBaseEvolutionAcrossAll = evolution
|
||||
}
|
||||
|
||||
// We shouldn't reach this, it basically means there is something wrong with the evolutions data
|
||||
if latestBaseEvolutionAcrossAll == nil {
|
||||
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "no base evolution found for columns %v", columns)
|
||||
}
|
||||
|
||||
columnLookUpMap := make(map[string]*schema.Column)
|
||||
for _, column := range columns {
|
||||
columnLookUpMap[column.Name] = column
|
||||
}
|
||||
|
||||
// Collect column-evolution pairs
|
||||
type colEvoPair struct {
|
||||
column *schema.Column
|
||||
evolution *telemetrytypes.EvolutionEntry
|
||||
}
|
||||
pairs := []colEvoPair{}
|
||||
|
||||
for _, evolution := range evolutionMap {
|
||||
// Reject evolutions before the latest base evolution
|
||||
if evolution.ReleaseTime.Before(latestBaseEvolutionAcrossAll.ReleaseTime) {
|
||||
continue
|
||||
}
|
||||
// skip evolutions after tsEndTime
|
||||
if evolution.ReleaseTime.After(tsEndTime) || evolution.ReleaseTime.Equal(tsEndTime) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, exists := columnLookUpMap[evolution.ColumnName]; !exists {
|
||||
return nil, nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "evolution column %s not found in columns %v", evolution.ColumnName, columns)
|
||||
}
|
||||
|
||||
pairs = append(pairs, colEvoPair{columnLookUpMap[evolution.ColumnName], evolution})
|
||||
}
|
||||
|
||||
// If no pairs found, fall back to latestBaseEvolutionAcrossAll for matching columns
|
||||
if len(pairs) == 0 {
|
||||
for _, column := range columns {
|
||||
// Use latestBaseEvolutionAcrossAll if this column name matches its column name
|
||||
if column.Name == latestBaseEvolutionAcrossAll.ColumnName {
|
||||
pairs = append(pairs, colEvoPair{column, latestBaseEvolutionAcrossAll})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by ReleaseTime descending (newest first)
|
||||
slices.SortFunc(pairs, func(a, b colEvoPair) int {
|
||||
// Sort by ReleaseTime descending (newest first)
|
||||
if a.evolution.ReleaseTime.After(b.evolution.ReleaseTime) {
|
||||
return -1
|
||||
}
|
||||
if a.evolution.ReleaseTime.Before(b.evolution.ReleaseTime) {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
|
||||
// Extract results
|
||||
newColumns := make([]*schema.Column, len(pairs))
|
||||
evolutionsEntries := make([]*telemetrytypes.EvolutionEntry, len(pairs))
|
||||
for i, pair := range pairs {
|
||||
newColumns[i] = pair.column
|
||||
evolutionsEntries[i] = pair.evolution
|
||||
}
|
||||
|
||||
return newColumns, evolutionsEntries, nil
|
||||
}
|
||||
11
tests/fixtures/traces.py
vendored
11
tests/fixtures/traces.py
vendored
@@ -6,7 +6,7 @@ import uuid
|
||||
from abc import ABC
|
||||
from collections.abc import Callable, Generator
|
||||
from enum import Enum
|
||||
from typing import Any, Literal
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import numpy as np
|
||||
@@ -236,7 +236,6 @@ class Traces(ABC):
|
||||
attributes_number: dict[str, np.float64]
|
||||
attributes_bool: dict[str, bool]
|
||||
resources_string: dict[str, str]
|
||||
resource_json: dict[str, str]
|
||||
events: list[str]
|
||||
links: str
|
||||
response_status_code: str
|
||||
@@ -274,7 +273,6 @@ class Traces(ABC):
|
||||
links: list[TracesLink] = [],
|
||||
trace_state: str = "",
|
||||
flags: np.uint32 = 0,
|
||||
resource_write_mode: Literal["legacy_only", "dual_write"] = "dual_write",
|
||||
) -> None:
|
||||
if timestamp is None:
|
||||
timestamp = datetime.datetime.now()
|
||||
@@ -324,11 +322,8 @@ class Traces(ABC):
|
||||
self.db_name = ""
|
||||
self.db_operation = ""
|
||||
|
||||
# Process resources and derive service_name. Spans written before the
|
||||
# JSON-resource evolution time only populate resources_string (legacy_only);
|
||||
# spans at or after the evolution time dual-write to both columns.
|
||||
# Process resources and derive service_name
|
||||
self.resources_string = {k: str(v) for k, v in resources.items()}
|
||||
self.resource_json = {} if resource_write_mode == "legacy_only" else dict(self.resources_string)
|
||||
self.service_name = self.resources_string.get("service.name", "default-service")
|
||||
|
||||
for k, v in self.resources_string.items():
|
||||
@@ -580,7 +575,7 @@ class Traces(ABC):
|
||||
self.db_operation,
|
||||
self.has_error,
|
||||
self.is_remote,
|
||||
self.resource_json,
|
||||
self.resources_string,
|
||||
],
|
||||
dtype=object,
|
||||
)
|
||||
|
||||
@@ -1,240 +0,0 @@
|
||||
from collections.abc import Callable
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from http import HTTPStatus
|
||||
|
||||
from fixtures import types
|
||||
from fixtures.auth import USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD
|
||||
from fixtures.querier import (
|
||||
build_group_by_field,
|
||||
build_logs_aggregation,
|
||||
index_series_by_label,
|
||||
make_query_request,
|
||||
)
|
||||
from fixtures.traces import TraceIdGenerator, Traces
|
||||
|
||||
|
||||
# we already create the evolution for resource during schema migration
|
||||
# since we have to create test data around it, we need to get the evolution time
|
||||
def _get_traces_resource_evolution_time_json(signoz: types.SigNoz) -> datetime:
|
||||
result = signoz.telemetrystore.conn.query(
|
||||
"""
|
||||
SELECT release_time
|
||||
FROM signoz_metadata.distributed_column_evolution_metadata
|
||||
WHERE signal = 'traces'
|
||||
AND field_context = 'resource'
|
||||
AND field_name = '__all__'
|
||||
AND column_name = 'resource'
|
||||
LIMIT 1
|
||||
"""
|
||||
).result_rows
|
||||
|
||||
assert result, "Expected traces resource evolution metadata to exist"
|
||||
|
||||
release_time_ns = int(result[0][0])
|
||||
return datetime.fromtimestamp(release_time_ns / 1e9, tz=UTC)
|
||||
|
||||
|
||||
# Spans with timestamps before the evolution time will have resources written only to resources_string.
|
||||
# Spans with timestamps at or after the evolution time will have resources written to both resources_string and resource (JSON).
|
||||
def _build_evolved_span(
|
||||
timestamp: datetime,
|
||||
evolution_time: datetime,
|
||||
service_name: str,
|
||||
name: str,
|
||||
) -> Traces:
|
||||
resource_write_mode = "legacy_only" if timestamp < evolution_time else "dual_write"
|
||||
return Traces(
|
||||
timestamp=timestamp,
|
||||
trace_id=TraceIdGenerator.trace_id(),
|
||||
span_id=TraceIdGenerator.span_id(),
|
||||
name=name,
|
||||
resources={
|
||||
"service.name": service_name,
|
||||
"deployment.environment": "integration",
|
||||
},
|
||||
resource_write_mode=resource_write_mode,
|
||||
)
|
||||
|
||||
|
||||
def _query_grouped_trace_series(
|
||||
signoz: types.SigNoz,
|
||||
token: str,
|
||||
start: datetime,
|
||||
end: datetime,
|
||||
group_by: str = "service.name",
|
||||
aggregation: str = "count()",
|
||||
) -> dict[str, list[dict]]:
|
||||
response = make_query_request(
|
||||
signoz,
|
||||
token,
|
||||
start_ms=int(start.timestamp() * 1000),
|
||||
end_ms=int(end.timestamp() * 1000),
|
||||
request_type="time_series",
|
||||
queries=[
|
||||
{
|
||||
"type": "builder_query",
|
||||
"spec": {
|
||||
"name": "A",
|
||||
"signal": "traces",
|
||||
"stepInterval": 60,
|
||||
"disabled": False,
|
||||
"groupBy": [build_group_by_field(group_by)],
|
||||
"having": {"expression": ""},
|
||||
"aggregations": [build_logs_aggregation(aggregation)],
|
||||
},
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
assert response.json()["status"] == "success"
|
||||
|
||||
results = response.json()["data"]["data"]["results"]
|
||||
assert len(results) == 1
|
||||
|
||||
aggregations = results[0]["aggregations"]
|
||||
assert len(aggregations) == 1
|
||||
|
||||
return index_series_by_label(aggregations[0]["series"], group_by)
|
||||
|
||||
|
||||
def _assert_grouped_series(
|
||||
series_by_group: dict[str, dict],
|
||||
expected_values_by_group: dict[str, dict[int, int]],
|
||||
) -> None:
|
||||
assert set(series_by_group.keys()) == set(expected_values_by_group.keys())
|
||||
|
||||
for group_name, expected_by_ts in expected_values_by_group.items():
|
||||
actual_values = sorted(
|
||||
series_by_group[group_name]["values"],
|
||||
key=lambda value: value["timestamp"],
|
||||
)
|
||||
expected_values = [{"timestamp": timestamp, "value": value} for timestamp, value in sorted(expected_by_ts.items())]
|
||||
assert actual_values == expected_values
|
||||
|
||||
|
||||
def _test_traces_resource_evolution(
|
||||
signoz: types.SigNoz,
|
||||
token: str,
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
) -> None:
|
||||
"""
|
||||
# 1. Get the evolution time.
|
||||
# 2. Ingest spans before the evolution time.
|
||||
# 3. Ingest spans after the evolution time.
|
||||
# 4. Query the spans before the evolution time.
|
||||
# 5. Query the spans after the evolution time.
|
||||
# Both aggregation and group by should be checked.
|
||||
"""
|
||||
evolution_time = _get_traces_resource_evolution_time_json(signoz)
|
||||
evolution_time = evolution_time.replace(second=0, microsecond=0)
|
||||
|
||||
before_2 = evolution_time - timedelta(minutes=10)
|
||||
before_1 = evolution_time - timedelta(minutes=5)
|
||||
after_1 = evolution_time + timedelta(minutes=5)
|
||||
after_2 = evolution_time + timedelta(minutes=10)
|
||||
|
||||
insert_traces(
|
||||
[
|
||||
_build_evolved_span(
|
||||
timestamp=before_2,
|
||||
evolution_time=evolution_time,
|
||||
service_name="svc-before-2",
|
||||
name="span before evolution 2",
|
||||
),
|
||||
_build_evolved_span(
|
||||
timestamp=before_1,
|
||||
evolution_time=evolution_time,
|
||||
service_name="svc-before-1",
|
||||
name="span before evolution 1",
|
||||
),
|
||||
_build_evolved_span(
|
||||
timestamp=after_1,
|
||||
evolution_time=evolution_time,
|
||||
service_name="svc-after-1",
|
||||
name="span after evolution 1",
|
||||
),
|
||||
_build_evolved_span(
|
||||
timestamp=after_2,
|
||||
evolution_time=evolution_time,
|
||||
service_name="svc-after-2",
|
||||
name="span after evolution 2",
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
before_series = _query_grouped_trace_series(signoz, token, before_2 - timedelta(minutes=1), before_1 + timedelta(minutes=1))
|
||||
_assert_grouped_series(
|
||||
before_series,
|
||||
expected_values_by_group={
|
||||
"svc-before-2": {
|
||||
int(before_2.timestamp() * 1000): 1,
|
||||
},
|
||||
"svc-before-1": {
|
||||
int(before_1.timestamp() * 1000): 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
after_series = _query_grouped_trace_series(signoz, token, after_1 - timedelta(minutes=1), after_2 + timedelta(minutes=1))
|
||||
_assert_grouped_series(
|
||||
after_series,
|
||||
expected_values_by_group={
|
||||
"svc-after-1": {
|
||||
int(after_1.timestamp() * 1000): 1,
|
||||
},
|
||||
"svc-after-2": {
|
||||
int(after_2.timestamp() * 1000): 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
spanning_series = _query_grouped_trace_series(signoz, token, before_2, after_2 + timedelta(minutes=1))
|
||||
_assert_grouped_series(
|
||||
spanning_series,
|
||||
expected_values_by_group={
|
||||
"svc-before-2": {
|
||||
int(before_2.timestamp() * 1000): 1,
|
||||
},
|
||||
"svc-before-1": {
|
||||
int(before_1.timestamp() * 1000): 1,
|
||||
},
|
||||
"svc-after-1": {
|
||||
int(after_1.timestamp() * 1000): 1,
|
||||
},
|
||||
"svc-after-2": {
|
||||
int(after_2.timestamp() * 1000): 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
# query to check aggregation on the resource field like count_distinct(service.name)
|
||||
aggregation_series = _query_grouped_trace_series(
|
||||
signoz,
|
||||
token,
|
||||
before_2,
|
||||
after_2 + timedelta(minutes=1),
|
||||
group_by="deployment.environment",
|
||||
aggregation="count_distinct(service.name)",
|
||||
)
|
||||
_assert_grouped_series(
|
||||
aggregation_series,
|
||||
expected_values_by_group={
|
||||
"integration": {
|
||||
int(before_2.timestamp() * 1000): 1,
|
||||
int(before_1.timestamp() * 1000): 1,
|
||||
int(after_1.timestamp() * 1000): 1,
|
||||
int(after_2.timestamp() * 1000): 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def test_traces_resource_evolution(
|
||||
signoz: types.SigNoz,
|
||||
create_user_admin: None, # pylint: disable=unused-argument
|
||||
get_token: Callable[[str, str], str],
|
||||
insert_traces: Callable[[list[Traces]], None],
|
||||
) -> None:
|
||||
token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
_test_traces_resource_evolution(signoz, token, insert_traces)
|
||||
Reference in New Issue
Block a user