Compare commits

..

1 Commits

Author SHA1 Message Date
Shivanshu Raj Shrivastava
fc1483c56a chore: testing
Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
2025-04-29 00:02:02 +05:30
54 changed files with 4618 additions and 192 deletions

View File

@@ -66,6 +66,8 @@ func NewAPIHandler(opts APIHandlerOptions, signoz *signoz.SigNoz) (*APIHandler,
LogsParsingPipelineController: opts.LogsParsingPipelineController,
Cache: opts.Cache,
FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
AlertmanagerAPI: alertmanager.NewAPI(signoz.Alertmanager),
FieldsAPI: fields.NewAPI(signoz.TelemetryStore),
Signoz: signoz,

View File

@@ -23,10 +23,12 @@ func NewDataConnector(
telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus,
cluster string,
useLogsNewSchema bool,
useTraceNewSchema bool,
fluxIntervalForTraceDetail time.Duration,
cache cache.Cache,
) *ClickhouseReader {
chReader := basechr.NewReader(sqlDB, telemetryStore, prometheus, cluster, fluxIntervalForTraceDetail, cache)
chReader := basechr.NewReader(sqlDB, telemetryStore, prometheus, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
return &ClickhouseReader{
conn: telemetryStore.ClickhouseDB(),
appdb: sqlDB,

View File

@@ -62,6 +62,8 @@ type ServerOptions struct {
FluxIntervalForTraceDetail string
Cluster string
GatewayUrl string
UseLogsNewSchema bool
UseTraceNewSchema bool
Jwt *authtypes.JWT
}
@@ -130,6 +132,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.SigNoz.TelemetryStore,
serverOptions.SigNoz.Prometheus,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
fluxIntervalForTraceDetail,
serverOptions.SigNoz.Cache,
)
@@ -147,6 +151,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.SigNoz.SQLStore.SQLxDB(),
reader,
c,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
serverOptions.SigNoz.Alertmanager,
serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore,
@@ -227,6 +233,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
FluxInterval: fluxInterval,
Gateway: gatewayProxy,
GatewayUrl: serverOptions.GatewayUrl,
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
JWT: serverOptions.Jwt,
}
@@ -236,6 +244,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
s := &Server{
// logger: logger,
// tracer: tracer,
ruleManager: rm,
serverOptions: serverOptions,
unavailableChannel: make(chan healthcheck.Status),
@@ -327,6 +337,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
apiHandler.RegisterMessagingQueuesRoutes(r, am)
apiHandler.RegisterThirdPartyApiRoutes(r, am)
apiHandler.MetricExplorerRoutes(r, am)
apiHandler.RegisterTraceFunnelsRoutes(r, am)
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
@@ -476,6 +487,8 @@ func makeRulesManager(
db *sqlx.DB,
ch baseint.Reader,
cache cache.Cache,
useLogsNewSchema bool,
useTraceNewSchema bool,
alertmanager alertmanager.Alertmanager,
sqlstore sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
@@ -492,6 +505,8 @@ func makeRulesManager(
Cache: cache,
EvalDelay: baseconst.GetEvalDelay(),
PrepareTaskFunc: rules.PrepareTaskFunc,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
PrepareTestRuleFunc: rules.TestNotification,
Alertmanager: alertmanager,
SQLStore: sqlstore,

View File

@@ -21,7 +21,6 @@ import (
"go.uber.org/zap/zapcore"
)
// Deprecated: Please use the logger from pkg/instrumentation.
func initZapLog() *zap.Logger {
config := zap.NewProductionConfig()
config.EncoderConfig.TimeKey = "timestamp"
@@ -51,9 +50,7 @@ func main() {
var gatewayUrl string
var useLicensesV3 bool
// Deprecated
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
// Deprecated
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
// Deprecated
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
@@ -139,6 +136,8 @@ func main() {
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
Cluster: cluster,
GatewayUrl: gatewayUrl,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
Jwt: jwt,
}

View File

@@ -25,6 +25,8 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error)
ruleId,
opts.Rule,
opts.Reader,
opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay),
baserules.WithSQLStore(opts.SQLStore),
)
@@ -121,6 +123,8 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap
alertname,
parsedRule,
opts.Reader,
opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
baserules.WithSendAlways(),
baserules.WithSendUnmatched(),
baserules.WithSQLStore(opts.SQLStore),

View File

@@ -64,6 +64,10 @@ export const TraceDetail = Loadable(
),
);
export const UsageExplorerPage = Loadable(
() => import(/* webpackChunkName: "UsageExplorerPage" */ 'modules/Usage'),
);
export const SignupPage = Loadable(
() => import(/* webpackChunkName: "SignupPage" */ 'pages/SignUp'),
);

View File

@@ -57,6 +57,7 @@ import {
TracesFunnels,
TracesSaveViews,
UnAuthorized,
UsageExplorerPage,
WorkspaceAccessRestricted,
WorkspaceBlocked,
WorkspaceSuspended,
@@ -154,6 +155,13 @@ const routes: AppRoutes[] = [
isPrivate: true,
key: 'SETTINGS',
},
{
path: ROUTES.USAGE_EXPLORER,
exact: true,
component: UsageExplorerPage,
isPrivate: true,
key: 'USAGE_EXPLORER',
},
{
path: ROUTES.ALL_DASHBOARD,
exact: true,

View File

@@ -0,0 +1,26 @@
import axios from 'api';
import { ErrorResponseHandler } from 'api/ErrorResponseHandler';
import { AxiosError } from 'axios';
import { ErrorResponse, SuccessResponse } from 'types/api';
import { PayloadProps, Props } from 'types/api/logs/getLogs';
const GetLogs = async (
props: Props,
): Promise<SuccessResponse<PayloadProps> | ErrorResponse> => {
try {
const data = await axios.get(`/logs`, {
params: props,
});
return {
statusCode: 200,
error: null,
message: '',
payload: data.data.results,
};
} catch (error) {
return ErrorResponseHandler(error as AxiosError);
}
};
export default GetLogs;

View File

@@ -0,0 +1,19 @@
import apiV1 from 'api/apiV1';
import getLocalStorageKey from 'api/browser/localstorage/get';
import { ENVIRONMENT } from 'constants/env';
import { LOCALSTORAGE } from 'constants/localStorage';
import { EventSourcePolyfill } from 'event-source-polyfill';
// 10 min in ms
const TIMEOUT_IN_MS = 10 * 60 * 1000;
export const LiveTail = (queryParams: string): EventSourcePolyfill =>
new EventSourcePolyfill(
`${ENVIRONMENT.baseURL}${apiV1}logs/tail?${queryParams}`,
{
headers: {
Authorization: `Bearer ${getLocalStorageKey(LOCALSTORAGE.AUTH_TOKEN)}`,
},
heartbeatTimeout: TIMEOUT_IN_MS,
},
);

View File

@@ -17,6 +17,7 @@ const ROUTES = {
'/get-started/infrastructure-monitoring',
GET_STARTED_AWS_MONITORING: '/get-started/aws-monitoring',
GET_STARTED_AZURE_MONITORING: '/get-started/azure-monitoring',
USAGE_EXPLORER: '/usage-explorer',
APPLICATION: '/services',
ALL_DASHBOARD: '/dashboard',
DASHBOARD: '/dashboard/:dashboardId',

View File

@@ -133,3 +133,231 @@ const ServicesListTable = memo(
),
);
ServicesListTable.displayName = 'ServicesListTable';
function ServiceMetrics({
onUpdateChecklistDoneItem,
loadingUserPreferences,
}: {
onUpdateChecklistDoneItem: (itemKey: string) => void;
loadingUserPreferences: boolean;
}): JSX.Element {
const { selectedTime: globalSelectedInterval } = useSelector<
AppState,
GlobalReducer
>((state) => state.globalTime);
const { user, activeLicenseV3 } = useAppContext();
const [timeRange, setTimeRange] = useState(() => {
const now = new Date().getTime();
return {
startTime: now - homeInterval,
endTime: now,
selectedInterval: homeInterval,
};
});
const { queries } = useResourceAttribute();
const { safeNavigate } = useSafeNavigate();
const selectedTags = useMemo(
() => (convertRawQueriesToTraceSelectedTags(queries) as Tags[]) || [],
[queries],
);
const [isError, setIsError] = useState(false);
const queryKey: QueryKey = useMemo(
() => [
timeRange.startTime,
timeRange.endTime,
selectedTags,
globalSelectedInterval,
],
[
timeRange.startTime,
timeRange.endTime,
selectedTags,
globalSelectedInterval,
],
);
const {
data,
isLoading: isLoadingTopLevelOperations,
isError: isErrorTopLevelOperations,
} = useGetTopLevelOperations(queryKey, {
start: timeRange.startTime * 1e6,
end: timeRange.endTime * 1e6,
});
const handleTimeIntervalChange = useCallback((value: number): void => {
const timeInterval = TIME_PICKER_OPTIONS.find(
(option) => option.value === value,
);
logEvent('Homepage: Services time interval updated', {
updatedTimeInterval: timeInterval?.label,
});
const now = new Date();
setTimeRange({
startTime: now.getTime() - value,
endTime: now.getTime(),
selectedInterval: value,
});
}, []);
const topLevelOperations = useMemo(() => Object.entries(data || {}), [data]);
const queryRangeRequestData = useMemo(
() =>
getQueryRangeRequestData({
topLevelOperations,
minTime: timeRange.startTime * 1e6,
maxTime: timeRange.endTime * 1e6,
globalSelectedInterval,
}),
[
globalSelectedInterval,
timeRange.endTime,
timeRange.startTime,
topLevelOperations,
],
);
const dataQueries = useGetQueriesRange(
queryRangeRequestData,
ENTITY_VERSION_V4,
{
queryKey: useMemo(
() => [
`GetMetricsQueryRange-home-${globalSelectedInterval}`,
timeRange.endTime,
timeRange.startTime,
globalSelectedInterval,
],
[globalSelectedInterval, timeRange.endTime, timeRange.startTime],
),
keepPreviousData: true,
enabled: true,
refetchOnMount: false,
onError: () => {
setIsError(true);
},
},
);
const isLoading = useMemo(() => dataQueries.some((query) => query.isLoading), [
dataQueries,
]);
const services: ServicesList[] = useMemo(
() =>
getServiceListFromQuery({
queries: dataQueries,
topLevelOperations,
isLoading,
}),
[dataQueries, topLevelOperations, isLoading],
);
const sortedServices = useMemo(
() =>
services?.sort((a, b) => {
const aUpdateAt = new Date(a.p99).getTime();
const bUpdateAt = new Date(b.p99).getTime();
return bUpdateAt - aUpdateAt;
}) || [],
[services],
);
const servicesExist = sortedServices.length > 0;
const top5Services = useMemo(() => sortedServices.slice(0, 5), [
sortedServices,
]);
useEffect(() => {
if (!loadingUserPreferences && servicesExist) {
onUpdateChecklistDoneItem('SETUP_SERVICES');
}
}, [onUpdateChecklistDoneItem, loadingUserPreferences, servicesExist]);
const handleRowClick = useCallback(
(record: ServicesList) => {
logEvent('Homepage: Service clicked', {
serviceName: record.serviceName,
});
safeNavigate(`${ROUTES.APPLICATION}/${record.serviceName}`);
},
[safeNavigate],
);
if (isLoadingTopLevelOperations || isLoading) {
return (
<Card className="services-list-card home-data-card loading-card">
<Card.Content>
<Skeleton active />
</Card.Content>
</Card>
);
}
if (isErrorTopLevelOperations || isError) {
return (
<Card className="services-list-card home-data-card error-card">
<Card.Content>
<Skeleton active />
</Card.Content>
</Card>
);
}
return (
<Card className="services-list-card home-data-card">
{servicesExist && (
<Card.Header>
<div className="services-header home-data-card-header">
{' '}
Services
<div className="services-header-actions">
<Select
value={timeRange.selectedInterval}
onChange={handleTimeIntervalChange}
options={TIME_PICKER_OPTIONS}
className="services-header-select"
/>
</div>
</div>
</Card.Header>
)}
<Card.Content>
{servicesExist ? (
<ServicesListTable services={top5Services} onRowClick={handleRowClick} />
) : (
<EmptyState user={user} activeLicenseV3={activeLicenseV3} />
)}
</Card.Content>
{servicesExist && (
<Card.Footer>
<div className="services-footer home-data-card-footer">
<Link to="/services">
<Button
type="link"
className="periscope-btn link learn-more-link"
onClick={(): void => {
logEvent('Homepage: All Services clicked', {});
}}
>
All Services <ArrowRight size={12} />
</Button>
</Link>
</div>
</Card.Footer>
)}
</Card>
);
}
export default memo(ServiceMetrics);

View File

@@ -21,10 +21,17 @@ function Services({
return (
<Sentry.ErrorBoundary fallback={<ErrorBoundaryFallback />}>
<div className="home-services-container">
<ServiceTraces
{isSpanMetricEnabled ? (
<ServiceMetrics
onUpdateChecklistDoneItem={onUpdateChecklistDoneItem}
loadingUserPreferences={loadingUserPreferences}
/>
) : (
<ServiceTraces
onUpdateChecklistDoneItem={onUpdateChecklistDoneItem}
loadingUserPreferences={loadingUserPreferences}
/>
)}
</div>
</Sentry.ErrorBoundary>
);

View File

@@ -481,6 +481,7 @@ export const apDexMetricsQueryBuilderQueries = ({
export const operationPerSec = ({
servicename,
tagFilterItems,
topLevelOperations,
}: OperationPerSecProps): QueryBuilderData => {
const autocompleteData: BaseAutocompleteData[] = [
{

View File

@@ -1,4 +1,7 @@
import logEvent from 'api/common/logEvent';
import getTopLevelOperations, {
ServiceDataProps,
} from 'api/metrics/getTopLevelOperations';
import { FeatureKeys } from 'constants/features';
import { QueryParams } from 'constants/query';
import { PANEL_TYPES } from 'constants/queryBuilder';
@@ -107,6 +110,21 @@ function Application(): JSX.Element {
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
const {
data: topLevelOperations,
error: topLevelOperationsError,
isLoading: topLevelOperationsIsLoading,
isError: topLevelOperationsIsError,
} = useQuery<ServiceDataProps>({
queryKey: [servicename, minTime, maxTime],
queryFn: (): Promise<ServiceDataProps> =>
getTopLevelOperations({
service: servicename || '',
start: minTime,
end: maxTime,
}),
});
const selectedTraceTags: string = JSON.stringify(
convertRawQueriesToTraceSelectedTags(queries) || [],
);
@@ -119,6 +137,14 @@ function Application(): JSX.Element {
[queries],
);
const topLevelOperationsRoute = useMemo(
() =>
topLevelOperations
? defaultTo(topLevelOperations[servicename || ''], [])
: [],
[servicename, topLevelOperations],
);
const operationPerSecWidget = useMemo(
() =>
getWidgetQueryBuilder({

View File

@@ -0,0 +1,224 @@
/* eslint-disable */
//@ts-nocheck
import { Select, Space, Typography } from 'antd';
import Graph from 'components/Graph';
import { useEffect, useState } from 'react';
import { connect, useSelector } from 'react-redux';
import { withRouter } from 'react-router-dom';
import { GetService, getUsageData, UsageDataItem } from 'store/actions';
import { AppState } from 'store/reducers';
import { GlobalTime } from 'types/actions/globalTime';
import { GlobalReducer } from 'types/reducer/globalTime';
import MetricReducer from 'types/reducer/metrics';
import { isOnboardingSkipped } from 'utils/app';
import { Card } from './styles';
const { Option } = Select;
interface UsageExplorerProps {
usageData: UsageDataItem[];
getUsageData: (
minTime: number,
maxTime: number,
selectedInterval: number,
selectedService: string,
) => void;
getServicesList: ({
selectedTimeInterval,
}: {
selectedTimeInterval: GlobalReducer['selectedTime'];
}) => void;
globalTime: GlobalTime;
servicesList: servicesListItem[];
totalCount: number;
}
const timeDaysOptions = [
{ value: 30, label: 'Last 30 Days' },
{ value: 7, label: 'Last week' },
{ value: 1, label: 'Last day' },
];
const interval = [
{
value: 604800,
chartDivideMultiplier: 1,
label: 'Weekly',
applicableOn: [timeDaysOptions[0]],
},
{
value: 86400,
chartDivideMultiplier: 30,
label: 'Daily',
applicableOn: [timeDaysOptions[0], timeDaysOptions[1]],
},
{
value: 3600,
chartDivideMultiplier: 10,
label: 'Hours',
applicableOn: [timeDaysOptions[2], timeDaysOptions[1]],
},
];
function _UsageExplorer(props: UsageExplorerProps): JSX.Element {
const [selectedTime, setSelectedTime] = useState(timeDaysOptions[1]);
const [selectedInterval, setSelectedInterval] = useState(interval[2]);
const [selectedService, setSelectedService] = useState<string>('');
const { selectedTime: globalSelectedTime } = useSelector<
AppState,
GlobalReducer
>((state) => state.globalTime);
const {
getServicesList,
getUsageData,
globalTime,
totalCount,
usageData,
} = props;
const { services } = useSelector<AppState, MetricReducer>(
(state) => state.metrics,
);
useEffect(() => {
if (selectedTime && selectedInterval) {
const maxTime = new Date().getTime() * 1000000;
const minTime = maxTime - selectedTime.value * 24 * 3600000 * 1000000;
getUsageData(minTime, maxTime, selectedInterval.value, selectedService);
}
}, [selectedTime, selectedInterval, selectedService, getUsageData]);
useEffect(() => {
getServicesList({
selectedTimeInterval: globalSelectedTime,
});
}, [globalTime, getServicesList, globalSelectedTime]);
const data = {
labels: usageData.map((s) => new Date(s.timestamp / 1000000)),
datasets: [
{
label: 'Span Count',
data: usageData.map((s) => s.count),
backgroundColor: 'rgba(255, 99, 132, 0.2)',
borderColor: 'rgba(255, 99, 132, 1)',
borderWidth: 2,
},
],
};
return (
<>
<Space style={{ marginTop: 40, marginLeft: 20 }}>
<Space>
<Select
onSelect={(value): void => {
setSelectedTime(
timeDaysOptions.filter((item) => item.value == parseInt(value))[0],
);
}}
value={selectedTime.label}
>
{timeDaysOptions.map(({ value, label }) => (
<Option key={value} value={value}>
{label}
</Option>
))}
</Select>
</Space>
<Space>
<Select
onSelect={(value): void => {
setSelectedInterval(
interval.filter((item) => item.value === parseInt(value))[0],
);
}}
value={selectedInterval.label}
>
{interval
.filter((interval) => interval.applicableOn.includes(selectedTime))
.map((item) => (
<Option key={item.label} value={item.value}>
{item.label}
</Option>
))}
</Select>
</Space>
<Space>
<Select
onSelect={(value): void => {
setSelectedService(value);
}}
value={selectedService || 'All Services'}
>
<Option value="">All Services</Option>
{services?.map((service) => (
<Option key={service.serviceName} value={service.serviceName}>
{service.serviceName}
</Option>
))}
</Select>
</Space>
{isOnboardingSkipped() && totalCount === 0 ? (
<Space
style={{
width: '100%',
margin: '40px 0',
marginLeft: 20,
justifyContent: 'center',
}}
>
<Typography>
No spans found. Please add instrumentation (follow this
<a
href="https://signoz.io/docs/instrumentation/overview"
target="_blank"
style={{ marginLeft: 3 }}
rel="noreferrer"
>
guide
</a>
)
</Typography>
</Space>
) : (
<Space style={{ display: 'block', marginLeft: 20, width: 200 }}>
<Typography>{`Total count is ${totalCount}`}</Typography>
</Space>
)}
</Space>
<Card>
<Graph name="usage" data={data} type="bar" />
</Card>
</>
);
}
const mapStateToProps = (
state: AppState,
): {
totalCount: number;
globalTime: GlobalTime;
usageData: UsageDataItem[];
} => {
let totalCount = 0;
for (const item of state.usageDate) {
totalCount += item.count;
}
return {
totalCount,
usageData: state.usageDate,
globalTime: state.globalTime,
};
};
export const UsageExplorer = withRouter(
connect(mapStateToProps, {
getUsageData,
getServicesList: GetService,
})(_UsageExplorer),
);

View File

@@ -0,0 +1,7 @@
import { UsageExplorer } from './UsageExplorer';
function UsageExplorerContainer(): JSX.Element {
return <UsageExplorer />;
}
export default UsageExplorerContainer;

View File

@@ -0,0 +1,13 @@
import { Card as CardComponent } from 'antd';
import styled from 'styled-components';
export const Card = styled(CardComponent)`
&&& {
width: 90%;
margin-top: 2rem;
}
.ant-card-body {
height: 70vh;
}
`;

View File

@@ -2,3 +2,4 @@ export * from './global';
export * from './metrics';
export * from './serviceMap';
export * from './types';
export * from './usage';

View File

@@ -0,0 +1,34 @@
import GetLogs from 'api/logs/GetLogs';
import { Dispatch } from 'redux';
import AppActions from 'types/actions';
import { SET_LOADING, SET_LOGS } from 'types/actions/logs';
import { Props } from 'types/api/logs/getLogs';
export const getLogs = (
props: Props,
): ((dispatch: Dispatch<AppActions>) => void) => async (
dispatch,
): Promise<void> => {
dispatch({
type: SET_LOADING,
payload: true,
});
const response = await GetLogs(props);
if (response.payload)
dispatch({
type: SET_LOGS,
payload: response.payload,
});
else
dispatch({
type: SET_LOGS,
payload: [],
});
dispatch({
type: SET_LOADING,
payload: false,
});
};

View File

@@ -1,14 +1,17 @@
import { ServiceMapItemAction, ServiceMapLoading } from './serviceMap';
import { GetUsageDataAction } from './usage';
export enum ActionTypes {
updateTimeInterval = 'UPDATE_TIME_INTERVAL',
getServiceMapItems = 'GET_SERVICE_MAP_ITEMS',
getServices = 'GET_SERVICES',
getUsageData = 'GET_USAGE_DATE',
fetchTraces = 'FETCH_TRACES',
fetchTraceItem = 'FETCH_TRACE_ITEM',
serviceMapLoading = 'UPDATE_SERVICE_MAP_LOADING',
}
export type Action =
| GetUsageDataAction
| ServiceMapItemAction
| ServiceMapLoading;

View File

@@ -0,0 +1,34 @@
import api from 'api';
import { Dispatch } from 'redux';
import { toUTCEpoch } from 'utils/timeUtils';
import { ActionTypes } from './types';
export interface UsageDataItem {
timestamp: number;
count: number;
}
export interface GetUsageDataAction {
type: ActionTypes.getUsageData;
payload: UsageDataItem[];
}
export const getUsageData = (
minTime: number,
maxTime: number,
step: number,
service: string,
) => async (dispatch: Dispatch): Promise<void> => {
const requesString = `/usage?start=${toUTCEpoch(minTime)}&end=${toUTCEpoch(
maxTime,
)}&step=${step}&service=${service || ''}`;
// Step can only be multiple of 3600
const response = await api.get<UsageDataItem[]>(requesString);
dispatch<GetUsageDataAction>({
type: ActionTypes.getUsageData,
payload: response.data,
// PNOTE - response.data in the axios response has the actual API response
});
};

View File

@@ -6,9 +6,11 @@ import { LogsReducer } from './logs';
import metricsReducers from './metric';
import { ServiceMapReducer } from './serviceMap';
import traceReducer from './trace';
import { usageDataReducer } from './usage';
const reducers = combineReducers({
traces: traceReducer,
usageDate: usageDataReducer,
globalTime: globalTimeReducer,
serviceMap: ServiceMapReducer,
app: appReducer,

View File

@@ -0,0 +1,14 @@
/* eslint-disable sonarjs/no-small-switch */
import { Action, ActionTypes, UsageDataItem } from 'store/actions';
export const usageDataReducer = (
state: UsageDataItem[] = [{ timestamp: 0, count: 0 }],
action: Action,
): UsageDataItem[] => {
switch (action.type) {
case ActionTypes.getUsageData:
return action.payload;
default:
return state;
}
};

View File

@@ -0,0 +1,450 @@
package impltracefunnel
import (
"encoding/json"
"net/http"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/tracefunnel"
"github.com/SigNoz/signoz/pkg/types/authtypes"
tf "github.com/SigNoz/signoz/pkg/types/tracefunnel"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/gorilla/mux"
)
type handler struct {
module tracefunnel.Module
}
func NewHandler(module tracefunnel.Module) tracefunnel.Handler {
return &handler{module: module}
}
func (handler *handler) New(rw http.ResponseWriter, r *http.Request) {
var req tf.FunnelRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
render.Error(rw, err)
return
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(rw, err)
return
}
userID := claims.UserID
orgID := claims.OrgID
funnels, err := handler.module.List(r.Context(), orgID)
if err != nil {
render.Error(rw, err)
return
}
for _, f := range funnels {
if f.Name == req.Name {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "a funnel with name '%s' already exists in this organization", req.Name))
return
}
}
funnel, err := handler.module.Create(r.Context(), req.Timestamp, req.Name, userID, orgID)
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to create funnel"))
return
}
response := tf.FunnelResponse{
FunnelID: funnel.ID.String(),
FunnelName: funnel.Name,
CreatedAt: req.Timestamp,
UserEmail: claims.Email,
OrgID: orgID,
}
render.Success(rw, http.StatusOK, response)
}
func (handler *handler) Update(rw http.ResponseWriter, r *http.Request) {
var req tf.FunnelRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
render.Error(rw, err)
return
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(rw, err)
return
}
userID := claims.UserID
orgID := claims.OrgID
if err := tracefunnel.ValidateTimestamp(req.Timestamp, "timestamp"); err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "timestamp is invalid: %v", err))
return
}
funnel, err := handler.module.Get(r.Context(), req.FunnelID.String())
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "funnel not found: %v", err))
return
}
// Check if name is being updated and if it already exists
if req.Name != "" && req.Name != funnel.Name {
funnels, err := handler.module.List(r.Context(), orgID)
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to list funnels: %v", err))
return
}
for _, f := range funnels {
if f.Name == req.Name {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "a funnel with name '%s' already exists in this organization", req.Name))
return
}
}
}
// Process each step in the request
for i := range req.Steps {
if req.Steps[i].Order < 1 {
req.Steps[i].Order = int64(i + 1) // Default to sequential ordering if not specified
}
// Generate a new UUID for the step if it doesn't have one
if req.Steps[i].Id.IsZero() {
newUUID := valuer.GenerateUUID()
req.Steps[i].Id = newUUID
}
}
if err := tracefunnel.ValidateFunnelSteps(req.Steps); err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid funnel steps: %v", err))
return
}
// Normalize step orders
req.Steps = tracefunnel.NormalizeFunnelSteps(req.Steps)
// Update the funnel with new steps
funnel.Steps = req.Steps
funnel.UpdatedAt = time.Unix(0, req.Timestamp*1000000) // Convert to nanoseconds
funnel.UpdatedBy = userID
if req.Name != "" {
funnel.Name = req.Name
}
if req.Description != "" {
funnel.Description = req.Description
}
// Update funnel in database
err = handler.module.Update(r.Context(), funnel, userID)
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to update funnel in database: %v", err))
return
}
//// Update name and description if provided
//if req.Name != "" || req.Description != "" {
// name := req.Name
//
// description := req.Description
//
// err = handler.module.UpdateMetadata(r.Context(), funnel.ID, name, description, userID)
// if err != nil {
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to update funnel metadata: %v", err))
// return
// }
//}
// Get the updated funnel to return in response
updatedFunnel, err := handler.module.Get(r.Context(), funnel.ID.String())
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to get updated funnel: %v", err))
return
}
response := tf.FunnelResponse{
FunnelName: updatedFunnel.Name,
FunnelID: updatedFunnel.ID.String(),
Steps: updatedFunnel.Steps,
CreatedAt: updatedFunnel.CreatedAt.UnixNano() / 1000000,
CreatedBy: updatedFunnel.CreatedBy,
OrgID: updatedFunnel.OrgID.String(),
UpdatedBy: userID,
UpdatedAt: updatedFunnel.UpdatedAt.UnixNano() / 1000000,
Description: updatedFunnel.Description,
}
render.Success(rw, http.StatusOK, response)
}
func (handler *handler) List(rw http.ResponseWriter, r *http.Request) {
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "unauthenticated"))
return
}
orgID := claims.OrgID
funnels, err := handler.module.List(r.Context(), orgID)
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to list funnels: %v", err))
return
}
var response []tf.FunnelResponse
for _, f := range funnels {
funnelResp := tf.FunnelResponse{
FunnelName: f.Name,
FunnelID: f.ID.String(),
CreatedAt: f.CreatedAt.UnixNano() / 1000000,
CreatedBy: f.CreatedBy,
OrgID: f.OrgID.String(),
UpdatedAt: f.UpdatedAt.UnixNano() / 1000000,
UpdatedBy: f.UpdatedBy,
Description: f.Description,
}
// Get user email if available
if f.CreatedByUser != nil {
funnelResp.UserEmail = f.CreatedByUser.Email
}
response = append(response, funnelResp)
}
render.Success(rw, http.StatusOK, response)
}
func (handler *handler) Get(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
funnel, err := handler.module.Get(r.Context(), funnelID)
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "funnel not found: %v", err))
return
}
// Create a response with all funnel details including step IDs
response := tf.FunnelResponse{
FunnelID: funnel.ID.String(),
FunnelName: funnel.Name,
Description: funnel.Description,
CreatedAt: funnel.CreatedAt.UnixNano() / 1000000,
UpdatedAt: funnel.UpdatedAt.UnixNano() / 1000000,
CreatedBy: funnel.CreatedBy,
UpdatedBy: funnel.UpdatedBy,
OrgID: funnel.OrgID.String(),
Steps: funnel.Steps,
}
// Add user email if available
if funnel.CreatedByUser != nil {
response.UserEmail = funnel.CreatedByUser.Email
}
render.Success(rw, http.StatusOK, response)
}
func (handler *handler) Delete(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
err := handler.module.Delete(r.Context(), funnelID)
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to delete funnel: %v", err))
return
}
render.Success(rw, http.StatusOK, nil)
}
func (handler *handler) Save(rw http.ResponseWriter, r *http.Request) {
var req tf.FunnelRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid request: %v", err))
return
}
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "unauthenticated"))
return
}
orgID := claims.OrgID
usrID := claims.UserID
funnel, err := handler.module.Get(r.Context(), req.FunnelID.String())
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "funnel not found: %v", err))
return
}
updateTimestamp := req.Timestamp
if updateTimestamp == 0 {
updateTimestamp = time.Now().UnixMilli()
} else if !tracefunnel.ValidateTimestampIsMilliseconds(updateTimestamp) {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "timestamp must be in milliseconds format (13 digits)"))
return
}
funnel.UpdatedAt = time.Unix(0, updateTimestamp*1000000) // Convert to nanoseconds
if req.UserID != "" {
funnel.UpdatedBy = usrID
}
funnel.Description = req.Description
if err := handler.module.Save(r.Context(), funnel, funnel.UpdatedBy, orgID); err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to save funnel: %v", err))
return
}
// Try to fetch metadata from DB
createdAt, updatedAt, extraDataFromDB, err := handler.module.GetFunnelMetadata(r.Context(), funnel.ID.String())
if err != nil {
render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "failed to get funnel metadata: %v", err))
return
}
resp := tf.FunnelResponse{
FunnelName: funnel.Name,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
CreatedBy: funnel.CreatedBy,
UpdatedBy: funnel.UpdatedBy,
OrgID: funnel.OrgID.String(),
Description: extraDataFromDB,
}
render.Success(rw, http.StatusOK, resp)
}
//func (handler *handler) ValidateTraces(rw http.ResponseWriter, r *http.Request) {
// vars := mux.Vars(r)
// funnelID := vars["funnel_id"]
//
// funnel, err := handler.module.Get(r.Context(), funnelID)
// if err != nil {
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "funnel not found: %v", err))
// return
// }
//
// var timeRange tf.TimeRange
// if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error decoding time range: %v", err))
// return
// }
//
// response, err := handler.module.ValidateTraces(r.Context(), funnel, timeRange)
// if err != nil {
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error validating traces: %v", err))
// return
// }
//
// render.Success(rw, http.StatusOK, response)
//}
//
//func (handler *handler) FunnelAnalytics(rw http.ResponseWriter, r *http.Request) {
// vars := mux.Vars(r)
// funnelID := vars["funnel_id"]
//
// funnel, err := handler.module.Get(r.Context(), funnelID)
// if err != nil {
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "funnel not found: %v", err))
// return
// }
//
// var timeRange tf.TimeRange
// if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error decoding time range: %v", err))
// return
// }
//
// response, err := handler.module.GetFunnelAnalytics(r.Context(), funnel, timeRange)
// if err != nil {
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error getting funnel analytics: %v", err))
// return
// }
//
// render.Success(rw, http.StatusOK, response)
//}
//
//func (handler *handler) StepAnalytics(rw http.ResponseWriter, r *http.Request) {
// vars := mux.Vars(r)
// funnelID := vars["funnel_id"]
//
// funnel, err := handler.module.Get(r.Context(), funnelID)
// if err != nil {
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "funnel not found: %v", err))
// return
// }
//
// var timeRange tf.TimeRange
// if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error decoding time range: %v", err))
// return
// }
//
// response, err := handler.module.GetStepAnalytics(r.Context(), funnel, timeRange)
// if err != nil {
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error getting step analytics: %v", err))
// return
// }
//
// render.Success(rw, http.StatusOK, response)
//}
//
//func (handler *handler) SlowestTraces(rw http.ResponseWriter, r *http.Request) {
// handler.handleTracesWithLatency(rw, r, false)
//}
//
//func (handler *handler) ErrorTraces(rw http.ResponseWriter, r *http.Request) {
// handler.handleTracesWithLatency(rw, r, true)
//}
//
//// handleTracesWithLatency handles both slow and error traces with common logic
//func (handler *handler) handleTracesWithLatency(rw http.ResponseWriter, r *http.Request, isError bool) {
// funnel, req, err := handler.validateTracesRequest(r)
// if err != nil {
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "%v", err))
// return
// }
//
// if err := tracefunnel.ValidateSteps(funnel, req.StepAOrder, req.StepBOrder); err != nil {
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "%v", err))
// return
// }
//
// response, err := handler.module.GetSlowestTraces(r.Context(), funnel, req.StepAOrder, req.StepBOrder, req.TimeRange, isError)
// if err != nil {
// render.Error(rw, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "error getting traces: %v", err))
// return
// }
//
// render.Success(rw, http.StatusOK, response)
//}
//
//// validateTracesRequest validates and extracts the request parameters
//func (handler *handler) validateTracesRequest(r *http.Request) (*tf.Funnel, *tf.StepTransitionRequest, error) {
// vars := mux.Vars(r)
// funnelID := vars["funnel_id"]
//
// funnel, err := handler.module.Get(r.Context(), funnelID)
// if err != nil {
// return nil, nil, fmt.Errorf("funnel not found: %v", err)
// }
//
// var req tf.StepTransitionRequest
// if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
// return nil, nil, fmt.Errorf("invalid request body: %v", err)
// }
//
// return funnel, &req, nil
//}

View File

@@ -0,0 +1,220 @@
package impltracefunnel
import (
"context"
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/modules/tracefunnel"
"github.com/SigNoz/signoz/pkg/types"
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
"github.com/SigNoz/signoz/pkg/valuer"
)
type module struct {
store traceFunnels.TraceFunnelStore
}
func NewModule(store traceFunnels.TraceFunnelStore) tracefunnel.Module {
return &module{
store: store,
}
}
func (module *module) Create(ctx context.Context, timestamp int64, name string, userID string, orgID string) (*traceFunnels.Funnel, error) {
orgUUID, err := valuer.NewUUID(orgID)
if err != nil {
return nil, fmt.Errorf("invalid org ID: %v", err)
}
funnel := &traceFunnels.Funnel{
BaseMetadata: traceFunnels.BaseMetadata{
Name: name,
OrgID: orgUUID,
},
}
funnel.CreatedAt = time.Unix(0, timestamp*1000000) // Convert to nanoseconds
funnel.CreatedBy = userID
// Set up the user relationship
funnel.CreatedByUser = &types.User{
ID: userID,
}
if err := module.store.Create(ctx, funnel); err != nil {
return nil, fmt.Errorf("failed to create funnel: %v", err)
}
return funnel, nil
}
// Get gets a funnel by ID
func (module *module) Get(ctx context.Context, funnelID string) (*traceFunnels.Funnel, error) {
uuid, err := valuer.NewUUID(funnelID)
if err != nil {
return nil, fmt.Errorf("invalid funnel ID: %v", err)
}
return module.store.Get(ctx, uuid)
}
// Update updates a funnel
func (module *module) Update(ctx context.Context, funnel *traceFunnels.Funnel, userID string) error {
funnel.UpdatedBy = userID
return module.store.Update(ctx, funnel)
}
// List lists all funnels for an organization
func (module *module) List(ctx context.Context, orgID string) ([]*traceFunnels.Funnel, error) {
orgUUID, err := valuer.NewUUID(orgID)
if err != nil {
return nil, fmt.Errorf("invalid org ID: %v", err)
}
funnels, err := module.store.List(ctx)
if err != nil {
return nil, err
}
// Filter by orgID
var orgFunnels []*traceFunnels.Funnel
for _, f := range funnels {
if f.OrgID == orgUUID {
orgFunnels = append(orgFunnels, f)
}
}
return orgFunnels, nil
}
// Delete deletes a funnel
func (module *module) Delete(ctx context.Context, funnelID string) error {
uuid, err := valuer.NewUUID(funnelID)
if err != nil {
return fmt.Errorf("invalid funnel ID: %v", err)
}
return module.store.Delete(ctx, uuid)
}
// Save saves a funnel
func (module *module) Save(ctx context.Context, funnel *traceFunnels.Funnel, userID string, orgID string) error {
orgUUID, err := valuer.NewUUID(orgID)
if err != nil {
return fmt.Errorf("invalid org ID: %v", err)
}
funnel.UpdatedBy = userID
funnel.OrgID = orgUUID
return module.store.Update(ctx, funnel)
}
// GetFunnelMetadata gets metadata for a funnel
func (module *module) GetFunnelMetadata(ctx context.Context, funnelID string) (int64, int64, string, error) {
uuid, err := valuer.NewUUID(funnelID)
if err != nil {
return 0, 0, "", fmt.Errorf("invalid funnel ID: %v", err)
}
funnel, err := module.store.Get(ctx, uuid)
if err != nil {
return 0, 0, "", err
}
return funnel.CreatedAt.UnixNano() / 1000000, funnel.UpdatedAt.UnixNano() / 1000000, funnel.Description, nil
}
// ValidateTraces validates traces in a funnel
//func (module *module) ValidateTraces(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) ([]*v3.Row, error) {
// chq, err := tracefunnel.ValidateTraces(funnel, timeRange)
// if err != nil {
// RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
// return
// }
//
// results, err := aH.reader. GetListResultV3(r.Context(), chq.Query)
// if err != nil {
// RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
// return
// }
//
//}
// GetFunnelAnalytics gets analytics for a funnel
//func (module *module) GetFunnelAnalytics(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.FunnelAnalytics, error) {
// if err := tracefunnel.ValidateFunnel(funnel); err != nil {
// return nil, fmt.Errorf("invalid funnel: %v", err)
// }
//
// if err := tracefunnel.ValidateTimeRange(timeRange); err != nil {
// return nil, fmt.Errorf("invalid time range: %v", err)
// }
//
// _, err := tracefunnel.ValidateTracesWithLatency(funnel, timeRange)
// if err != nil {
// return nil, fmt.Errorf("error building clickhouse query: %v", err)
// }
//
// // TODO: Execute query and return results
// // For now, return empty analytics
// return &traceFunnels.FunnelAnalytics{
// TotalStart: 0,
// TotalComplete: 0,
// ErrorCount: 0,
// AvgDurationMs: 0,
// P99LatencyMs: 0,
// ConversionRate: 0,
// }, nil
//}
// GetStepAnalytics gets analytics for each step
//func (module *module) GetStepAnalytics(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.FunnelAnalytics, error) {
// if err := tracefunnel.ValidateFunnel(funnel); err != nil {
// return nil, fmt.Errorf("invalid funnel: %v", err)
// }
//
// if err := tracefunnel.ValidateTimeRange(timeRange); err != nil {
// return nil, fmt.Errorf("invalid time range: %v", err)
// }
//
// _, err := tracefunnel.GetStepAnalytics(funnel, timeRange)
// if err != nil {
// return nil, fmt.Errorf("error building clickhouse query: %v", err)
// }
//
// // TODO: Execute query and return results
// // For now, return empty analytics
// return &traceFunnels.FunnelAnalytics{
// TotalStart: 0,
// TotalComplete: 0,
// ErrorCount: 0,
// AvgDurationMs: 0,
// P99LatencyMs: 0,
// ConversionRate: 0,
// }, nil
//}
// GetSlowestTraces gets the slowest traces between two steps
//func (module *module) GetSlowestTraces(ctx context.Context, funnel *traceFunnels.Funnel, stepAOrder, stepBOrder int64, timeRange traceFunnels.TimeRange, isError bool) (*traceFunnels.ValidTracesResponse, error) {
// if err := tracefunnel.ValidateFunnel(funnel); err != nil {
// return nil, fmt.Errorf("invalid funnel: %v", err)
// }
//
// if err := tracefunnel.ValidateTimeRange(timeRange); err != nil {
// return nil, fmt.Errorf("invalid time range: %v", err)
// }
//
// _, err := tracefunnel.GetSlowestTraces(funnel, stepAOrder, stepBOrder, timeRange, isError)
// if err != nil {
// return nil, fmt.Errorf("error building clickhouse query: %v", err)
// }
//
// // TODO: Execute query and return results
// // For now, return empty response
// return &traceFunnels.ValidTracesResponse{
// TraceIDs: []string{},
// }, nil
//}
//UpdateMetadata updates the metadata of a funnel
//func (module *module) UpdateMetadata(ctx context.Context, funnelID valuer.UUID, name, description string, userID string) error {
// return module.store.UpdateMetadata(ctx, funnelID, name, description, userID)
//}

View File

@@ -0,0 +1,220 @@
package impltracefunnel
import (
"context"
"fmt"
"time"
"github.com/SigNoz/signoz/pkg/sqlstore"
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
"github.com/SigNoz/signoz/pkg/valuer"
)
type store struct {
sqlstore sqlstore.SQLStore
}
func NewStore(sqlstore sqlstore.SQLStore) traceFunnels.TraceFunnelStore {
return &store{sqlstore: sqlstore}
}
func (store *store) Create(ctx context.Context, funnel *traceFunnels.Funnel) error {
if funnel.ID.IsZero() {
funnel.ID = valuer.GenerateUUID()
}
if funnel.CreatedAt.IsZero() {
funnel.CreatedAt = time.Now()
}
if funnel.UpdatedAt.IsZero() {
funnel.UpdatedAt = time.Now()
}
_, err := store.
sqlstore.
BunDB().
NewInsert().
Model(funnel).
Exec(ctx)
if err != nil {
return fmt.Errorf("failed to create funnel: %v", err)
}
if funnel.CreatedByUser != nil {
_, err = store.sqlstore.BunDB().NewUpdate().
Model(funnel).
Set("created_by = ?", funnel.CreatedByUser.ID).
Where("id = ?", funnel.ID).
Exec(ctx)
if err != nil {
return fmt.Errorf("failed to update funnel user relationship: %v", err)
}
}
return nil
}
// Get retrieves a funnel by ID
func (store *store) Get(ctx context.Context, uuid valuer.UUID) (*traceFunnels.Funnel, error) {
funnel := &traceFunnels.Funnel{}
err := store.
sqlstore.
BunDB().
NewSelect().
Model(funnel).
Relation("CreatedByUser").
Where("?TableAlias.id = ?", uuid).
Scan(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get funnel: %v", err)
}
return funnel, nil
}
// Update updates an existing funnel
func (store *store) Update(ctx context.Context, funnel *traceFunnels.Funnel) error {
// Update the updated_at timestamp
funnel.UpdatedAt = time.Now()
_, err := store.
sqlstore.
BunDB().
NewUpdate().
Model(funnel).
WherePK().
Exec(ctx)
if err != nil {
return fmt.Errorf("failed to update funnel: %v", err)
}
return nil
}
// List retrieves all funnels
func (store *store) List(ctx context.Context) ([]*traceFunnels.Funnel, error) {
var funnels []*traceFunnels.Funnel
err := store.
sqlstore.
BunDB().
NewSelect().
Model(&funnels).
Relation("CreatedByUser").
Scan(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list funnels: %v", err)
}
return funnels, nil
}
// Delete removes a funnel by ID
func (store *store) Delete(ctx context.Context, uuid valuer.UUID) error {
_, err := store.
sqlstore.
BunDB().
NewDelete().
Model((*traceFunnels.Funnel)(nil)).
Where("id = ?", uuid).Exec(ctx)
if err != nil {
return fmt.Errorf("failed to delete funnel: %v", err)
}
return nil
}
// ListByOrg retrieves all funnels for a specific organization
//func (store *store) ListByOrg(ctx context.Context, orgID valuer.UUID) ([]*traceFunnels.Funnel, error) {
// var funnels []*traceFunnels.Funnel
// err := store.
// sqlstore.
// BunDB().
// NewSelect().
// Model(&funnels).
// Relation("CreatedByUser").
// Where("org_id = ?", orgID).
// Scan(ctx)
// if err != nil {
// return nil, fmt.Errorf("failed to list funnels by org: %v", err)
// }
// return funnels, nil
//}
// GetByIDAndOrg retrieves a funnel by ID and organization ID
//func (store *store) GetByIDAndOrg(ctx context.Context, id, orgID valuer.UUID) (*traceFunnels.Funnel, error) {
// funnel := &traceFunnels.Funnel{}
// err := store.
// sqlstore.
// BunDB().
// NewSelect().
// Model(funnel).
// Relation("CreatedByUser").
// Where("?TableAlias.id = ? AND ?TableAlias.org_id = ?", id, orgID).
// Scan(ctx)
// if err != nil {
// return nil, fmt.Errorf("failed to get funnel by ID and org: %v", err)
// }
// return funnel, nil
//}
// UpdateSteps updates the steps of a funnel
//func (store *store) UpdateSteps(ctx context.Context, funnelID valuer.UUID, steps []traceFunnels.FunnelStep) error {
// _, err := store.
// sqlstore.
// BunDB().
// NewUpdate().
// Model((*traceFunnels.Funnel)(nil)).
// Set("steps = ?", steps).
// Where("id = ?", funnelID).
// Exec(ctx)
// if err != nil {
// return fmt.Errorf("failed to update funnel steps: %v", err)
// }
// return nil
//}
// UpdateMetadata updates the metadata of a funnel
//func (store *store) UpdateMetadata(ctx context.Context, funnelID valuer.UUID, name, description string, userID string) error {
//
// // First get the current funnel to preserve other fields
// funnel := &traceFunnels.Funnel{}
// err := store.
// sqlstore.
// BunDB().
// NewSelect().
// Model(funnel).
// Where("id = ?", funnelID).
// Scan(ctx)
// if err != nil {
// return fmt.Errorf("failed to get funnel: %v", err)
// }
//
// // Update the fields
// funnel.Name = name
// funnel.Description = description
// funnel.UpdatedAt = time.Now()
// funnel.UpdatedBy = userID
//
// // Save the updated funnel
// _, err = store.
// sqlstore.
// BunDB().
// NewUpdate().
// Model(funnel).
// WherePK().
// Exec(ctx)
// if err != nil {
// return fmt.Errorf("failed to update funnel metadata: %v", err)
// }
//
// // Verify the update
// updatedFunnel := &traceFunnels.Funnel{}
// err = store.
// sqlstore.
// BunDB().
// NewSelect().
// Model(updatedFunnel).
// Where("id = ?", funnelID).
// Scan(ctx)
// if err != nil {
// return fmt.Errorf("failed to verify update: %v", err)
// }
//
// return nil
//}

View File

@@ -0,0 +1,442 @@
package tracefunnel
import (
"fmt"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
tracefunnel "github.com/SigNoz/signoz/pkg/types/tracefunnel"
"strings"
)
// GetSlowestTraces builds a ClickHouse query to get the slowest traces between two steps
func GetSlowestTraces(funnel *tracefunnel.Funnel, stepAOrder, stepBOrder int64, timeRange tracefunnel.TimeRange, withErrors bool) (*v3.ClickHouseQuery, error) {
// Find steps by order
var stepA, stepB *tracefunnel.FunnelStep
for i := range funnel.Steps {
if funnel.Steps[i].Order == stepAOrder {
stepA = &funnel.Steps[i]
}
if funnel.Steps[i].Order == stepBOrder {
stepB = &funnel.Steps[i]
}
}
if stepA == nil || stepB == nil {
return nil, fmt.Errorf("step not found")
}
// Build having clause based on withErrors flag
havingClause := ""
if withErrors {
havingClause = "HAVING has_error = 1"
}
// Build filter strings for each step
stepAFilters := ""
if stepA.Filters != nil && len(stepA.Filters.Items) > 0 {
// ToDO: need to implement where clause filtering with minimal code duplication
stepAFilters = "/* Custom filters for step A would be applied here */"
}
stepBFilters := ""
if stepB.Filters != nil && len(stepB.Filters.Items) > 0 {
// ToDO: need to implement where clause filtering with minimal code duplication
stepBFilters = "/* Custom filters for step B would be applied here */"
}
query := fmt.Sprintf(`
WITH
toUInt64(%d) AS start_time,
toUInt64(%d) AS end_time,
toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart,
toString(intDiv(end_time, 1000000000)) AS tsBucketEnd
SELECT
trace_id,
concat(toString((max_end_time_ns - min_start_time_ns) / 1e6), ' ms') AS duration_ms,
COUNT(*) AS span_count
FROM (
SELECT
s1.trace_id,
MIN(toUnixTimestamp64Nano(s1.timestamp)) AS min_start_time_ns,
MAX(toUnixTimestamp64Nano(s2.timestamp) + s2.duration_nano) AS max_end_time_ns,
MAX(s1.has_error OR s2.has_error) AS has_error
FROM %s AS s1
JOIN %s AS s2
ON s1.trace_id = s2.trace_id
WHERE s1.resource_string_service$$name = '%s'
AND s1.name = '%s'
AND s2.resource_string_service$$name = '%s'
AND s2.name = '%s'
AND s1.timestamp BETWEEN toString(start_time) AND toString(end_time)
AND s1.ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
AND s2.timestamp BETWEEN toString(start_time) AND toString(end_time)
AND s2.ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
%s
%s
GROUP BY s1.trace_id
%s
) AS trace_durations
JOIN %s AS spans
ON spans.trace_id = trace_durations.trace_id
WHERE spans.timestamp BETWEEN toString(start_time) AND toString(end_time)
AND spans.ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
GROUP BY trace_id, duration_ms
ORDER BY CAST(replaceRegexpAll(duration_ms, ' ms$', '') AS Float64) DESC
LIMIT 5`,
timeRange.StartTime,
timeRange.EndTime,
TracesTable,
TracesTable,
escapeString(stepA.ServiceName),
escapeString(stepA.SpanName),
escapeString(stepB.ServiceName),
escapeString(stepB.SpanName),
stepAFilters,
stepBFilters,
havingClause,
TracesTable,
)
return &v3.ClickHouseQuery{
Query: query,
}, nil
}
// GetStepAnalytics builds a ClickHouse query to get analytics for each step
func GetStepAnalytics(funnel *tracefunnel.Funnel, timeRange tracefunnel.TimeRange) (*v3.ClickHouseQuery, error) {
if len(funnel.Steps) == 0 {
return nil, fmt.Errorf("funnel has no steps")
}
// Build funnel steps array
var steps []string
for _, step := range funnel.Steps {
steps = append(steps, fmt.Sprintf("('%s', '%s')",
escapeString(step.ServiceName), escapeString(step.SpanName)))
}
stepsArray := fmt.Sprintf("array(%s)", strings.Join(steps, ","))
// Build step CTEs
var stepCTEs []string
for i, step := range funnel.Steps {
filterStr := ""
if step.Filters != nil && len(step.Filters.Items) > 0 {
// ToDO: need to implement where clause filtering with minimal code duplication
filterStr = "/* Custom filters would be applied here */"
}
cte := fmt.Sprintf(`
step%d_traces AS (
SELECT DISTINCT trace_id
FROM %s
WHERE resource_string_service$$name = '%s'
AND name = '%s'
AND timestamp BETWEEN toString(start_time) AND toString(end_time)
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
%s
)`,
i+1,
TracesTable,
escapeString(step.ServiceName),
escapeString(step.SpanName),
filterStr,
)
stepCTEs = append(stepCTEs, cte)
}
// Build intersecting traces CTE
var intersections []string
for i := 1; i <= len(funnel.Steps); i++ {
intersections = append(intersections, fmt.Sprintf("SELECT trace_id FROM step%d_traces", i))
}
intersectingTracesCTE := fmt.Sprintf(`
intersecting_traces AS (
%s
)`,
strings.Join(intersections, "\nINTERSECT\n"),
)
// Build CASE expressions for each step
var caseExpressions []string
for i, step := range funnel.Steps {
totalSpansExpr := fmt.Sprintf(`
COUNT(CASE WHEN resource_string_service$$name = '%s'
AND name = '%s'
THEN trace_id END) AS total_s%d_spans`,
escapeString(step.ServiceName), escapeString(step.SpanName), i+1)
erroredSpansExpr := fmt.Sprintf(`
COUNT(CASE WHEN resource_string_service$$name = '%s'
AND name = '%s'
AND has_error = true
THEN trace_id END) AS total_s%d_errored_spans`,
escapeString(step.ServiceName), escapeString(step.SpanName), i+1)
caseExpressions = append(caseExpressions, totalSpansExpr, erroredSpansExpr)
}
query := fmt.Sprintf(`
WITH
toUInt64(%d) AS start_time,
toUInt64(%d) AS end_time,
toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart,
toString(intDiv(end_time, 1000000000)) AS tsBucketEnd,
%s AS funnel_steps,
%s,
%s
SELECT
%s
FROM %s
WHERE trace_id IN (SELECT trace_id FROM intersecting_traces)
AND timestamp BETWEEN toString(start_time) AND toString(end_time)
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd`,
timeRange.StartTime,
timeRange.EndTime,
stepsArray,
strings.Join(stepCTEs, ",\n"),
intersectingTracesCTE,
strings.Join(caseExpressions, ",\n "),
TracesTable,
)
return &v3.ClickHouseQuery{
Query: query,
}, nil
}
// ValidateTracesWithLatency builds a ClickHouse query to validate traces with latency information
func ValidateTracesWithLatency(funnel *tracefunnel.Funnel, timeRange tracefunnel.TimeRange) (*v3.ClickHouseQuery, error) {
filters, err := buildFunnelFiltersWithLatency(funnel)
if err != nil {
return nil, fmt.Errorf("error building funnel filters with latency: %w", err)
}
query := generateFunnelSQLWithLatency(timeRange.StartTime, timeRange.EndTime, filters)
return &v3.ClickHouseQuery{
Query: query,
}, nil
}
func generateFunnelSQLWithLatency(start, end int64, filters []tracefunnel.FunnelStepFilter) string {
var expressions []string
// Convert timestamps to nanoseconds
startTime := fmt.Sprintf("toUInt64(%d)", start)
endTime := fmt.Sprintf("toUInt64(%d)", end)
expressions = append(expressions, fmt.Sprintf("%s AS start_time", startTime))
expressions = append(expressions, fmt.Sprintf("%s AS end_time", endTime))
expressions = append(expressions, "toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart")
expressions = append(expressions, "toString(intDiv(end_time, 1000000000)) AS tsBucketEnd")
expressions = append(expressions, "(end_time - start_time) / 1e9 AS total_time_seconds")
// Define step configurations dynamically
for _, f := range filters {
expressions = append(expressions, fmt.Sprintf("('%s', '%s') AS s%d_config",
escapeString(f.ServiceName),
escapeString(f.SpanName),
f.StepNumber))
}
withClause := "WITH \n" + strings.Join(expressions, ",\n") + "\n"
// Build step raw expressions and cumulative logic
var stepRaws []string
var cumulativeLogic []string
var filterConditions []string
stepCount := len(filters)
// Build raw step detection
for i := 1; i <= stepCount; i++ {
stepRaws = append(stepRaws, fmt.Sprintf(
"MAX(CASE WHEN (resource_string_service$$name, name) = s%d_config THEN 1 ELSE 0 END) AS has_s%d_raw", i, i))
filterConditions = append(filterConditions, fmt.Sprintf("s%d_config", i))
}
// Build cumulative IF logic
for i := 1; i <= stepCount; i++ {
if i == 1 {
cumulativeLogic = append(cumulativeLogic, fmt.Sprintf(`
IF(MAX(CASE WHEN (resource_string_service$$name, name) = s1_config THEN 1 ELSE 0 END) = 1, 1, 0) AS has_s1`))
} else {
innerIf := "IF(MAX(CASE WHEN (resource_string_service$$name, name) = s1_config THEN 1 ELSE 0 END) = 1, 1, 0)"
for j := 2; j < i; j++ {
innerIf = fmt.Sprintf(`IF(%s = 1 AND MAX(CASE WHEN (resource_string_service$$name, name) = s%d_config THEN 1 ELSE 0 END) = 1, 1, 0)`, innerIf, j)
}
cumulativeLogic = append(cumulativeLogic, fmt.Sprintf(`
IF(
%s = 1 AND MAX(CASE WHEN (resource_string_service$$name, name) = s%d_config THEN 1 ELSE 0 END) = 1,
1, 0
) AS has_s%d`, innerIf, i, i))
}
}
// Final SELECT counts using FILTER clauses
var stepCounts []string
for i := 1; i <= stepCount; i++ {
stepCounts = append(stepCounts, fmt.Sprintf("COUNT(DISTINCT trace_id) FILTER (WHERE has_s%d = 1) AS step%d_count", i, i))
}
// Final query assembly
lastStep := fmt.Sprint(stepCount)
query := withClause + `
SELECT
` + strings.Join(stepCounts, ",\n ") + `,
IF(total_time_seconds = 0 OR COUNT(DISTINCT trace_id) FILTER (WHERE has_s` + lastStep + ` = 1) = 0, 0,
COUNT(DISTINCT trace_id) FILTER (WHERE has_s` + lastStep + ` = 1) / total_time_seconds
) AS avg_rate,
COUNT(DISTINCT trace_id) FILTER (WHERE has_s` + lastStep + ` = 1 AND has_error = true) AS errors,
IF(COUNT(*) = 0, 0, avg(trace_duration)) AS avg_duration,
IF(COUNT(*) = 0, 0, quantile(0.99)(trace_duration)) AS p99_latency,
IF(COUNT(DISTINCT trace_id) FILTER (WHERE has_s1 = 1) = 0, 0,
100.0 * COUNT(DISTINCT trace_id) FILTER (WHERE has_s` + lastStep + ` = 1) /
COUNT(DISTINCT trace_id) FILTER (WHERE has_s1 = 1)
) AS conversion_rate
FROM (
SELECT
trace_id,
MAX(has_error) AS has_error,
` + strings.Join(stepRaws, ",\n ") + `,
MAX(toUnixTimestamp64Nano(timestamp) + duration_nano) - MIN(toUnixTimestamp64Nano(timestamp)) AS trace_duration,
` + strings.Join(cumulativeLogic, ",\n ") + `
FROM ` + TracesTable + `
WHERE
timestamp BETWEEN toString(start_time) AND toString(end_time)
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
AND (resource_string_service$$name, name) IN (` + strings.Join(filterConditions, ", ") + `)
GROUP BY trace_id
) AS funnel_data;`
return query
}
func buildFunnelFiltersWithLatency(funnel *tracefunnel.Funnel) ([]tracefunnel.FunnelStepFilter, error) {
if funnel == nil {
return nil, fmt.Errorf("funnel cannot be nil")
}
if len(funnel.Steps) == 0 {
return nil, fmt.Errorf("funnel must have at least one step")
}
filters := make([]tracefunnel.FunnelStepFilter, len(funnel.Steps))
for i, step := range funnel.Steps {
latencyPointer := "start" // Default value
if step.LatencyPointer != "" {
latencyPointer = step.LatencyPointer
}
filters[i] = tracefunnel.FunnelStepFilter{
StepNumber: i + 1,
ServiceName: step.ServiceName,
SpanName: step.SpanName,
LatencyPointer: latencyPointer,
CustomFilters: step.Filters,
}
}
return filters, nil
}
func buildFunnelFilters(funnel *tracefunnel.Funnel) ([]tracefunnel.FunnelStepFilter, error) {
if funnel == nil {
return nil, fmt.Errorf("funnel cannot be nil")
}
if len(funnel.Steps) == 0 {
return nil, fmt.Errorf("funnel must have at least one step")
}
filters := make([]tracefunnel.FunnelStepFilter, len(funnel.Steps))
for i, step := range funnel.Steps {
filters[i] = tracefunnel.FunnelStepFilter{
StepNumber: i + 1,
ServiceName: step.ServiceName,
SpanName: step.SpanName,
CustomFilters: step.Filters,
}
}
return filters, nil
}
func escapeString(s string) string {
// Replace single quotes with double single quotes to escape them in SQL
return strings.ReplaceAll(s, "'", "''")
}
const TracesTable = "signoz_traces.signoz_index_v3"
func generateFunnelSQL(start, end int64, filters []tracefunnel.FunnelStepFilter) string {
var expressions []string
// Basic time expressions.
expressions = append(expressions, fmt.Sprintf("toUInt64(%d) AS start_time", start))
expressions = append(expressions, fmt.Sprintf("toUInt64(%d) AS end_time", end))
expressions = append(expressions, "toString(intDiv(start_time, 1000000000) - 1800) AS tsBucketStart")
expressions = append(expressions, "toString(intDiv(end_time, 1000000000)) AS tsBucketEnd")
// Add service and span alias definitions from each filter.
for _, f := range filters {
expressions = append(expressions, fmt.Sprintf("'%s' AS service_%d", escapeString(f.ServiceName), f.StepNumber))
expressions = append(expressions, fmt.Sprintf("'%s' AS span_%d", escapeString(f.SpanName), f.StepNumber))
}
// Add the CTE for each step.
for _, f := range filters {
cte := fmt.Sprintf(`step%d_traces AS (
SELECT DISTINCT trace_id
FROM %s
WHERE serviceName = service_%d
AND name = span_%d
AND timestamp BETWEEN toString(start_time) AND toString(end_time)
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
)`, f.StepNumber, TracesTable, f.StepNumber, f.StepNumber)
expressions = append(expressions, cte)
}
withClause := "WITH \n" + strings.Join(expressions, ",\n") + "\n"
// Build the intersect clause for each step.
var intersectQueries []string
for _, f := range filters {
intersectQueries = append(intersectQueries, fmt.Sprintf("SELECT trace_id FROM step%d_traces", f.StepNumber))
}
intersectClause := strings.Join(intersectQueries, "\nINTERSECT\n")
query := withClause + `
SELECT trace_id
FROM ` + TracesTable + `
WHERE trace_id IN (
` + intersectClause + `
)
AND timestamp BETWEEN toString(start_time) AND toString(end_time)
AND ts_bucket_start BETWEEN tsBucketStart AND tsBucketEnd
GROUP BY trace_id
LIMIT 5
`
return query
}
// ValidateTraces builds a ClickHouse query to validate traces in a funnel
func ValidateTraces(funnel *tracefunnel.Funnel, timeRange tracefunnel.TimeRange) (*v3.ClickHouseQuery, error) {
filters, err := buildFunnelFilters(funnel)
if err != nil {
return nil, fmt.Errorf("error building funnel filters: %w", err)
}
query := generateFunnelSQL(timeRange.StartTime, timeRange.EndTime, filters)
return &v3.ClickHouseQuery{
Query: query,
}, nil
}

View File

@@ -0,0 +1,65 @@
package tracefunnel
import (
"context"
"net/http"
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
)
// Module defines the interface for trace funnel operations
type Module interface {
// operations on funnel
Create(ctx context.Context, timestamp int64, name string, userID string, orgID string) (*traceFunnels.Funnel, error)
Get(ctx context.Context, funnelID string) (*traceFunnels.Funnel, error)
Update(ctx context.Context, funnel *traceFunnels.Funnel, userID string) error
List(ctx context.Context, orgID string) ([]*traceFunnels.Funnel, error)
Delete(ctx context.Context, funnelID string) error
Save(ctx context.Context, funnel *traceFunnels.Funnel, userID string, orgID string) error
GetFunnelMetadata(ctx context.Context, funnelID string) (int64, int64, string, error)
//
//GetFunnelAnalytics(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.FunnelAnalytics, error)
//
//GetStepAnalytics(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) (*traceFunnels.FunnelAnalytics, error)
//
//GetSlowestTraces(ctx context.Context, funnel *traceFunnels.Funnel, stepAOrder, stepBOrder int64, timeRange traceFunnels.TimeRange, isError bool) (*traceFunnels.ValidTracesResponse, error)
// updates funnel metadata
//UpdateMetadata(ctx context.Context, funnelID valuer.UUID, name, description string, userID string) error
// validates funnel
//ValidateTraces(ctx context.Context, funnel *traceFunnels.Funnel, timeRange traceFunnels.TimeRange) ([]*v3.Row, error)
}
type Handler interface {
// CRUD on funnel
New(http.ResponseWriter, *http.Request)
Update(http.ResponseWriter, *http.Request)
List(http.ResponseWriter, *http.Request)
Get(http.ResponseWriter, *http.Request)
Delete(http.ResponseWriter, *http.Request)
Save(http.ResponseWriter, *http.Request)
// validator handlers
//ValidateTraces(http.ResponseWriter, *http.Request)
//
//// Analytics handlers
//FunnelAnalytics(http.ResponseWriter, *http.Request)
//
//StepAnalytics(http.ResponseWriter, *http.Request)
//
//SlowestTraces(http.ResponseWriter, *http.Request)
//
//ErrorTraces(http.ResponseWriter, *http.Request)
}

View File

@@ -0,0 +1,171 @@
package tracefunnel
import (
"fmt"
tracefunnel "github.com/SigNoz/signoz/pkg/types/tracefunnel"
"sort"
)
// ValidateTimestamp validates a timestamp
func ValidateTimestamp(timestamp int64, fieldName string) error {
if timestamp == 0 {
return fmt.Errorf("%s is required", fieldName)
}
if timestamp < 0 {
return fmt.Errorf("%s must be positive", fieldName)
}
return nil
}
// ValidateTimestampIsMilliseconds validates that a timestamp is in milliseconds
func ValidateTimestampIsMilliseconds(timestamp int64) bool {
// Check if timestamp is in milliseconds (13 digits)
return timestamp >= 1000000000000 && timestamp <= 9999999999999
}
// ValidateFunnelSteps validates funnel steps
func ValidateFunnelSteps(steps []tracefunnel.FunnelStep) error {
if len(steps) < 2 {
return fmt.Errorf("funnel must have at least 2 steps")
}
for i, step := range steps {
if step.ServiceName == "" {
return fmt.Errorf("step %d: service name is required", i+1)
}
if step.SpanName == "" {
return fmt.Errorf("step %d: span name is required", i+1)
}
if step.Order < 0 {
return fmt.Errorf("step %d: order must be non-negative", i+1)
}
}
return nil
}
// NormalizeFunnelSteps normalizes step orders to be sequential
func NormalizeFunnelSteps(steps []tracefunnel.FunnelStep) []tracefunnel.FunnelStep {
// Sort steps by order
sort.Slice(steps, func(i, j int) bool {
return steps[i].Order < steps[j].Order
})
// Normalize orders to be sequential
for i := range steps {
steps[i].Order = int64(i + 1)
}
return steps
}
//// ValidateSteps checks if the requested steps exist in the funnel
//func ValidateSteps(funnel *tracefunnel.Funnel, stepAOrder, stepBOrder int64) error {
// stepAExists, stepBExists := false, false
// for _, step := range funnel.Steps {
// if step.Order == stepAOrder {
// stepAExists = true
// }
// if step.Order == stepBOrder {
// stepBExists = true
// }
// }
//
// if !stepAExists || !stepBExists {
// return fmt.Errorf("one or both steps not found. Step A Order: %d, Step B Order: %d", stepAOrder, stepBOrder)
// }
//
// return nil
//}
//// ValidateFunnel validates a funnel's data
//func ValidateFunnel(funnel *tracefunnel.Funnel) error {
// if funnel == nil {
// return fmt.Errorf("funnel cannot be nil")
// }
//
// if len(funnel.Steps) < 2 {
// return fmt.Errorf("funnel must have at least 2 steps")
// }
//
// // Validate each step
// for i, step := range funnel.Steps {
// if err := ValidateStep(step, i+1); err != nil {
// return err
// }
// }
//
// return nil
//}
// ValidateStep validates a single funnel step
//func ValidateStep(step tracefunnel.FunnelStep, stepNum int) error {
// if step.ServiceName == "" {
// return fmt.Errorf("step %d: service name is required", stepNum)
// }
//
// if step.SpanName == "" {
// return fmt.Errorf("step %d: span name is required", stepNum)
// }
//
// if step.Order < 0 {
// return fmt.Errorf("step %d: order must be non-negative", stepNum)
// }
//
// return nil
//}
//
//// ValidateTimeRange validates a time range
//func ValidateTimeRange(timeRange tracefunnel.TimeRange) error {
// if timeRange.StartTime <= 0 {
// return fmt.Errorf("start time must be positive")
// }
//
// if timeRange.EndTime <= 0 {
// return fmt.Errorf("end time must be positive")
// }
//
// if timeRange.EndTime < timeRange.StartTime {
// return fmt.Errorf("end time must be after start time")
// }
//
// // Check if the time range is not too far in the future
// now := time.Now().UnixNano() / 1000000 // Convert to milliseconds
// if timeRange.EndTime > now {
// return fmt.Errorf("end time cannot be in the future")
// }
//
// // Check if the time range is not too old (e.g., more than 30 days)
// maxAge := int64(30 * 24 * 60 * 60 * 1000) // 30 days in milliseconds
// if now-timeRange.StartTime > maxAge {
// return fmt.Errorf("time range cannot be older than 30 days")
// }
//
// return nil
//}
//
//// ValidateStepOrder validates that step orders are sequential
//func ValidateStepOrder(steps []tracefunnel.FunnelStep) error {
// if len(steps) < 2 {
// return nil
// }
//
// // Create a map to track used orders
// usedOrders := make(map[int64]bool)
//
// for i, step := range steps {
// if usedOrders[step.Order] {
// return fmt.Errorf("duplicate step order %d at step %d", step.Order, i+1)
// }
// usedOrders[step.Order] = true
// }
//
// // Check if orders are sequential
// for i := 0; i < len(steps)-1; i++ {
// if steps[i+1].Order != steps[i].Order+1 {
// return fmt.Errorf("step orders must be sequential")
// }
// }
//
// return nil
//}

View File

@@ -17,6 +17,8 @@ const (
const (
defaultTraceDB string = "signoz_traces"
defaultOperationsTable string = "distributed_signoz_operations"
defaultIndexTable string = "distributed_signoz_index_v2"
defaultLocalIndexTable string = "signoz_index_v2"
defaultErrorTable string = "distributed_signoz_error_index_v2"
defaultDurationTable string = "distributed_durationSort"
@@ -57,10 +59,19 @@ type namespaceConfig struct {
Enabled bool
Datasource string
TraceDB string
ErrorTable string
OperationsTable string
IndexTable string
LocalIndexTable string
DurationTable string
UsageExplorerTable string
SpansTable string
ErrorTable string
SpanAttributeTableV2 string
SpanAttributeKeysTable string
DependencyGraphTable string
TopLevelOperationsTable string
LogsDB string
LogsTable string
LogsLocalTable string
LogsAttributeKeysTable string
LogsResourceKeysTable string
@@ -71,7 +82,6 @@ type namespaceConfig struct {
Encoding Encoding
Connector Connector
LogsDB string
LogsLocalTableV2 string
LogsTableV2 string
LogsResourceLocalTableV2 string

File diff suppressed because it is too large Load Diff

View File

@@ -23,9 +23,11 @@ import (
errorsV2 "github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/middleware"
"github.com/SigNoz/signoz/pkg/http/render"
tracefunnels "github.com/SigNoz/signoz/pkg/modules/tracefunnel"
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/metricsexplorer"
"github.com/SigNoz/signoz/pkg/signoz"
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/prometheus/prometheus/promql"
@@ -49,6 +51,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/querier"
querierV2 "github.com/SigNoz/signoz/pkg/query-service/app/querier/v2"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
"github.com/SigNoz/signoz/pkg/query-service/auth"
"github.com/SigNoz/signoz/pkg/query-service/cache"
@@ -117,6 +120,9 @@ type APIHandler struct {
// Websocket connection upgrader
Upgrader *websocket.Upgrader
UseLogsNewSchema bool
UseTraceNewSchema bool
hostsRepo *inframetrics.HostsRepo
processesRepo *inframetrics.ProcessesRepo
podsRepo *inframetrics.PodsRepo
@@ -173,6 +179,11 @@ type APIHandlerOpts struct {
// Querier Influx Interval
FluxInterval time.Duration
// Use Logs New schema
UseLogsNewSchema bool
UseTraceNewSchema bool
JWT *authtypes.JWT
AlertmanagerAPI *alertmanager.API
@@ -185,17 +196,21 @@ type APIHandlerOpts struct {
// NewAPIHandler returns an APIHandler
func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
querierOpts := querier.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
}
querierOptsV2 := querierV2.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
}
querier := querier.NewQuerier(querierOpts)
@@ -226,6 +241,8 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
LogsParsingPipelineController: opts.LogsParsingPipelineController,
querier: querier,
querierV2: querierv2,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
hostsRepo: hostsRepo,
processesRepo: processesRepo,
podsRepo: podsRepo,
@@ -244,8 +261,15 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
FieldsAPI: opts.FieldsAPI,
}
logsQueryBuilder := logsv4.PrepareLogsQuery
tracesQueryBuilder := tracesV4.PrepareTracesQuery
logsQueryBuilder := logsv3.PrepareLogsQuery
if opts.UseLogsNewSchema {
logsQueryBuilder = logsv4.PrepareLogsQuery
}
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if opts.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
builderOpts := queryBuilder.QueryBuilderOptions{
BuildMetricQuery: metricsv3.PrepareMetricQuery,
@@ -528,8 +552,12 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *middleware.AuthZ) {
// router.HandleFunc("/api/v1/get_percentiles", aH.getApplicationPercentiles).Methods(http.MethodGet)
router.HandleFunc("/api/v1/services", am.ViewAccess(aH.getServices)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/services/list", am.ViewAccess(aH.getServicesList)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/service/top_operations", am.ViewAccess(aH.getTopOperations)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/service/top_level_operations", am.ViewAccess(aH.getServicesTopLevelOps)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/traces/{traceId}", am.ViewAccess(aH.SearchTraces)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/usage", am.ViewAccess(aH.getUsage)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/dependency_graph", am.ViewAccess(aH.dependencyGraph)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/settings/ttl", am.AdminAccess(aH.setTTL)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/settings/ttl", am.ViewAccess(aH.getTTL)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/settings/apdex", am.AdminAccess(aH.setApdexSettings)).Methods(http.MethodPost)
@@ -1598,13 +1626,122 @@ func (aH *APIHandler) getTopOperations(w http.ResponseWriter, r *http.Request) {
}
func (aH *APIHandler) getUsage(w http.ResponseWriter, r *http.Request) {
query, err := parseGetUsageRequest(r)
if aH.HandleError(w, err, http.StatusBadRequest) {
return
}
result, err := aH.reader.GetUsage(r.Context(), query)
if aH.HandleError(w, err, http.StatusBadRequest) {
return
}
aH.WriteJSON(w, r, result)
}
func (aH *APIHandler) getServicesTopLevelOps(w http.ResponseWriter, r *http.Request) {
var start, end time.Time
var services []string
type topLevelOpsParams struct {
Service string `json:"service"`
Start string `json:"start"`
End string `json:"end"`
}
var params topLevelOpsParams
err := json.NewDecoder(r.Body).Decode(&params)
if err != nil {
zap.L().Error("Error in getting req body for get top operations API", zap.Error(err))
}
if params.Service != "" {
services = []string{params.Service}
}
startEpoch := params.Start
if startEpoch != "" {
startEpochInt, err := strconv.ParseInt(startEpoch, 10, 64)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading start time")
return
}
start = time.Unix(0, startEpochInt)
}
endEpoch := params.End
if endEpoch != "" {
endEpochInt, err := strconv.ParseInt(endEpoch, 10, 64)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading end time")
return
}
end = time.Unix(0, endEpochInt)
}
result, apiErr := aH.reader.GetTopLevelOperations(r.Context(), start, end, services)
if apiErr != nil {
RespondError(w, apiErr, nil)
return
}
aH.WriteJSON(w, r, result)
}
func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) {
query, err := parseGetServicesRequest(r)
if aH.HandleError(w, err, http.StatusBadRequest) {
return
}
result, apiErr := aH.reader.GetServices(r.Context(), query)
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
return
}
data := map[string]interface{}{
"number": len(*result),
}
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_NUMBER_OF_SERVICES, data, claims.Email, true, false)
}
if (data["number"] != 0) && (data["number"] != telemetry.DEFAULT_NUMBER_OF_SERVICES) {
telemetry.GetInstance().AddActiveTracesUser()
}
aH.WriteJSON(w, r, result)
}
func (aH *APIHandler) dependencyGraph(w http.ResponseWriter, r *http.Request) {
query, err := parseGetServicesRequest(r)
if aH.HandleError(w, err, http.StatusBadRequest) {
return
}
result, err := aH.reader.GetDependencyGraph(r.Context(), query)
if aH.HandleError(w, err, http.StatusBadRequest) {
return
}
aH.WriteJSON(w, r, result)
}
func (aH *APIHandler) getServicesList(w http.ResponseWriter, r *http.Request) {
result, err := aH.reader.GetServicesList(r.Context())
if aH.HandleError(w, err, http.StatusBadRequest) {
return
}
aH.WriteJSON(w, r, result)
}
func (aH *APIHandler) SearchTraces(w http.ResponseWriter, r *http.Request) {
@@ -4104,8 +4241,11 @@ func (aH *APIHandler) CloudIntegrationsUpdateServiceConfig(
// logs
func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router, am *middleware.AuthZ) {
subRouter := router.PathPrefix("/api/v1/logs").Subrouter()
subRouter.HandleFunc("", am.ViewAccess(aH.getLogs)).Methods(http.MethodGet)
subRouter.HandleFunc("/tail", am.ViewAccess(aH.tailLogs)).Methods(http.MethodGet)
subRouter.HandleFunc("/fields", am.ViewAccess(aH.logFields)).Methods(http.MethodGet)
subRouter.HandleFunc("/fields", am.EditAccess(aH.logFieldUpdate)).Methods(http.MethodPost)
subRouter.HandleFunc("/aggregate", am.ViewAccess(aH.logAggregate)).Methods(http.MethodGet)
// log pipelines
subRouter.HandleFunc("/pipelines/preview", am.ViewAccess(aH.PreviewLogsPipelinesHandler)).Methods(http.MethodPost)
@@ -4145,6 +4285,81 @@ func (aH *APIHandler) logFieldUpdate(w http.ResponseWriter, r *http.Request) {
aH.WriteJSON(w, r, field)
}
func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) {
params, err := logs.ParseLogFilterParams(r)
if err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErr, "Incorrect params")
return
}
res, apiErr := aH.reader.GetLogs(r.Context(), params)
if apiErr != nil {
RespondError(w, apiErr, "Failed to fetch logs from the DB")
return
}
aH.WriteJSON(w, r, map[string]interface{}{"results": res})
}
func (aH *APIHandler) tailLogs(w http.ResponseWriter, r *http.Request) {
params, err := logs.ParseLogFilterParams(r)
if err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErr, "Incorrect params")
return
}
// create the client
client := &model.LogsTailClient{Name: r.RemoteAddr, Logs: make(chan *model.SignozLog, 1000), Done: make(chan *bool), Error: make(chan error), Filter: *params}
go aH.reader.TailLogs(r.Context(), client)
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(200)
flusher, ok := w.(http.Flusher)
if !ok {
err := model.ApiError{Typ: model.ErrorStreamingNotSupported, Err: nil}
RespondError(w, &err, "streaming is not supported")
return
}
// flush the headers
flusher.Flush()
for {
select {
case log := <-client.Logs:
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc.Encode(log)
fmt.Fprintf(w, "data: %v\n\n", buf.String())
flusher.Flush()
case <-client.Done:
zap.L().Debug("done!")
return
case err := <-client.Error:
zap.L().Error("error occured", zap.Error(err))
return
}
}
}
func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
params, err := logs.ParseLogAggregateParams(r)
if err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErr, "Incorrect params")
return
}
res, apiErr := aH.reader.AggregateLogs(r.Context(), params)
if apiErr != nil {
RespondError(w, apiErr, "Failed to fetch logs aggregate from the DB")
return
}
aH.WriteJSON(w, r, res)
}
const logPipelines = "log_pipelines"
func parseAgentConfigVersion(r *http.Request) (int, *model.ApiError) {
@@ -4626,10 +4841,30 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
RespondError(w, apiErrObj, errQuriesByName)
return
}
tracesV4.Enrich(queryRangeParams, spanKeys)
if aH.UseTraceNewSchema {
tracesV4.Enrich(queryRangeParams, spanKeys)
} else {
tracesV3.Enrich(queryRangeParams, spanKeys)
}
}
// WARN: Only works for AND operator in traces query
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
// check if traceID is used as filter (with equal/similar operator) in traces query if yes add timestamp filter to queryRange params
isUsed, traceIDs := tracesV3.TraceIdFilterUsedWithEqual(queryRangeParams)
if isUsed && len(traceIDs) > 0 {
zap.L().Debug("traceID used as filter in traces query")
// query signoz_spans table with traceID to get min and max timestamp
min, max, err := aH.reader.GetMinAndMaxTimestampForTraceID(ctx, traceIDs)
if err == nil {
// add timestamp filter to queryRange params
tracesV3.AddTimestampFilters(min, max, queryRangeParams)
zap.L().Debug("post adding timestamp filter in traces query", zap.Any("queryRangeParams", queryRangeParams))
}
}
}
// Hook up query progress tracking if requested
queryIdHeader := r.Header.Get("X-SIGNOZ-QUERY-ID")
if len(queryIdHeader) > 0 {
@@ -4969,7 +5204,88 @@ func (aH *APIHandler) liveTailLogsV2(w http.ResponseWriter, r *http.Request) {
}
func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) {
aH.liveTailLogsV2(w, r)
if aH.UseLogsNewSchema {
aH.liveTailLogsV2(w, r)
return
}
// get the param from url and add it to body
stringReader := strings.NewReader(r.URL.Query().Get("q"))
r.Body = io.NopCloser(stringReader)
queryRangeParams, apiErrorObj := ParseQueryRangeParams(r)
if apiErrorObj != nil {
zap.L().Error(apiErrorObj.Err.Error())
RespondError(w, apiErrorObj, nil)
return
}
var err error
var queryString string
switch queryRangeParams.CompositeQuery.QueryType {
case v3.QueryTypeBuilder:
// check if any enrichment is required for logs if yes then enrich them
if logsv3.EnrichmentRequired(queryRangeParams) {
logsFields, err := aH.reader.GetLogFields(r.Context())
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err}
RespondError(w, apiErrObj, nil)
return
}
// get the fields if any logs query is present
fields := model.GetLogFieldsV3(r.Context(), queryRangeParams, logsFields)
logsv3.Enrich(queryRangeParams, fields)
}
queryString, err = aH.queryBuilder.PrepareLiveTailQuery(queryRangeParams)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
default:
err = fmt.Errorf("invalid query type")
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
// create the client
client := &model.LogsLiveTailClient{Name: r.RemoteAddr, Logs: make(chan *model.SignozLog, 1000), Done: make(chan *bool), Error: make(chan error)}
go aH.reader.LiveTailLogsV3(r.Context(), queryString, uint64(queryRangeParams.Start), "", client)
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(200)
flusher, ok := w.(http.Flusher)
if !ok {
err := model.ApiError{Typ: model.ErrorStreamingNotSupported, Err: nil}
RespondError(w, &err, "streaming is not supported")
return
}
// flush the headers
flusher.Flush()
for {
select {
case log := <-client.Logs:
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc.Encode(log)
fmt.Fprintf(w, "data: %v\n\n", buf.String())
flusher.Flush()
case <-client.Done:
zap.L().Debug("done!")
return
case err := <-client.Error:
zap.L().Error("error occurred", zap.Error(err))
fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error())
flusher.Flush()
return
}
}
}
func (aH *APIHandler) getMetricMetadata(w http.ResponseWriter, r *http.Request) {
@@ -5010,7 +5326,27 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que
RespondError(w, apiErrObj, errQuriesByName)
return
}
tracesV4.Enrich(queryRangeParams, spanKeys)
if aH.UseTraceNewSchema {
tracesV4.Enrich(queryRangeParams, spanKeys)
} else {
tracesV3.Enrich(queryRangeParams, spanKeys)
}
}
// WARN: Only works for AND operator in traces query
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
// check if traceID is used as filter (with equal/similar operator) in traces query if yes add timestamp filter to queryRange params
isUsed, traceIDs := tracesV3.TraceIdFilterUsedWithEqual(queryRangeParams)
if isUsed && len(traceIDs) > 0 {
zap.L().Debug("traceID used as filter in traces query")
// query signoz_spans table with traceID to get min and max timestamp
min, max, err := aH.reader.GetMinAndMaxTimestampForTraceID(ctx, traceIDs)
if err == nil {
// add timestamp filter to queryRange params
tracesV3.AddTimestampFilters(min, max, queryRangeParams)
zap.L().Debug("post adding timestamp filter in traces query", zap.Any("queryRangeParams", queryRangeParams))
}
}
}
result, errQuriesByName, err = aH.querierV2.QueryRange(ctx, queryRangeParams)
@@ -5201,3 +5537,207 @@ func (aH *APIHandler) getDomainInfo(w http.ResponseWriter, r *http.Request) {
}
aH.Respond(w, resp)
}
// RegisterTraceFunnelsRoutes adds trace funnels routes
func (aH *APIHandler) RegisterTraceFunnelsRoutes(router *mux.Router, am *middleware.AuthZ) {
// Main trace funnels router
traceFunnelsRouter := router.PathPrefix("/api/v1/trace-funnels").Subrouter()
// API endpoints
traceFunnelsRouter.HandleFunc("/new",
am.ViewAccess(aH.Signoz.Handlers.TraceFunnel.New)).
Methods(http.MethodPost)
traceFunnelsRouter.HandleFunc("/list",
am.ViewAccess(aH.Signoz.Handlers.TraceFunnel.List)).
Methods(http.MethodGet)
traceFunnelsRouter.HandleFunc("/steps/update",
am.ViewAccess(aH.Signoz.Handlers.TraceFunnel.Update)).
Methods(http.MethodPut)
traceFunnelsRouter.HandleFunc("/{funnel_id}",
am.ViewAccess(aH.Signoz.Handlers.TraceFunnel.Get)).
Methods(http.MethodGet)
traceFunnelsRouter.HandleFunc("/{funnel_id}",
am.ViewAccess(aH.Signoz.Handlers.TraceFunnel.Delete)).
Methods(http.MethodDelete)
traceFunnelsRouter.HandleFunc("/save",
am.ViewAccess(aH.Signoz.Handlers.TraceFunnel.Save)).
Methods(http.MethodPost)
// Analytics endpoints
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/validate", aH.handleValidateTraces).Methods("POST")
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/overview", aH.handleFunnelAnalytics).Methods("POST")
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/steps", aH.handleStepAnalytics).Methods("POST")
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/slow-traces", aH.handleFunnelSlowTraces).Methods("POST")
traceFunnelsRouter.HandleFunc("/{funnel_id}/analytics/error-traces", aH.handleFunnelErrorTraces).Methods("POST")
}
func (aH *APIHandler) handleValidateTraces(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), funnelID)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
return
}
var timeRange traceFunnels.TimeRange
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding time range: %v", err)}, nil)
return
}
if len(funnel.Steps) < 2 {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("funnel must have at least 2 steps")}, nil)
return
}
chq, err := tracefunnels.ValidateTraces(funnel, timeRange)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
func (aH *APIHandler) handleFunnelAnalytics(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), funnelID)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
return
}
var timeRange traceFunnels.TimeRange
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding time range: %v", err)}, nil)
return
}
chq, err := tracefunnels.ValidateTracesWithLatency(funnel, timeRange)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
func (aH *APIHandler) handleStepAnalytics(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), funnelID)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("funnel not found: %v", err)}, nil)
return
}
var timeRange traceFunnels.TimeRange
if err := json.NewDecoder(r.Body).Decode(&timeRange); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("error decoding time range: %v", err)}, nil)
return
}
chq, err := tracefunnels.GetStepAnalytics(funnel, timeRange)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
// handleFunnelSlowTraces handles requests for slow traces in a funnel
func (aH *APIHandler) handleFunnelSlowTraces(w http.ResponseWriter, r *http.Request) {
aH.handleTracesWithLatency(w, r, false)
}
// handleFunnelErrorTraces handles requests for error traces in a funnel
func (aH *APIHandler) handleFunnelErrorTraces(w http.ResponseWriter, r *http.Request) {
aH.handleTracesWithLatency(w, r, true)
}
// handleTracesWithLatency handles both slow and error traces with common logic
func (aH *APIHandler) handleTracesWithLatency(w http.ResponseWriter, r *http.Request, isError bool) {
funnel, req, err := aH.validateTracesRequest(r)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
if err := aH.validateSteps(funnel, req.StepAOrder, req.StepBOrder); err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
chq, err := tracefunnels.GetSlowestTraces(funnel, req.StepAOrder, req.StepBOrder, req.TimeRange, isError)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error building clickhouse query: %v", err)}, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error converting clickhouse results to list: %v", err)}, nil)
return
}
aH.Respond(w, results)
}
// validateTracesRequest validates and extracts the request parameters
func (aH *APIHandler) validateTracesRequest(r *http.Request) (*traceFunnels.Funnel, *traceFunnels.StepTransitionRequest, error) {
vars := mux.Vars(r)
funnelID := vars["funnel_id"]
funnel, err := aH.Signoz.Modules.TraceFunnel.Get(r.Context(), funnelID)
if err != nil {
return nil, nil, fmt.Errorf("funnel not found: %v", err)
}
var req traceFunnels.StepTransitionRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, nil, fmt.Errorf("invalid request body: %v", err)
}
return funnel, &req, nil
}
// validateSteps checks if the requested steps exist in the funnel
func (aH *APIHandler) validateSteps(funnel *traceFunnels.Funnel, stepAOrder, stepBOrder int64) error {
stepAExists, stepBExists := false, false
for _, step := range funnel.Steps {
if step.Order == stepAOrder {
stepAExists = true
}
if step.Order == stepBOrder {
stepBExists = true
}
}
if !stepAExists || !stepBExists {
return fmt.Errorf("one or both steps not found. Step A Order: %d, Step B Order: %d", stepAOrder, stepBOrder)
}
return nil
}

View File

@@ -171,6 +171,42 @@ func parseQueryRangeRequest(r *http.Request) (*model.QueryRangeParams, *model.Ap
return &queryRangeParams, nil
}
func parseGetUsageRequest(r *http.Request) (*model.GetUsageParams, error) {
startTime, err := parseTime("start", r)
if err != nil {
return nil, err
}
endTime, err := parseTime("end", r)
if err != nil {
return nil, err
}
stepStr := r.URL.Query().Get("step")
if len(stepStr) == 0 {
return nil, errors.New("step param missing in query")
}
stepInt, err := strconv.Atoi(stepStr)
if err != nil {
return nil, errors.New("step param is not in correct format")
}
serviceName := r.URL.Query().Get("service")
stepHour := stepInt / 3600
getUsageParams := model.GetUsageParams{
StartTime: startTime.Format(time.RFC3339Nano),
EndTime: endTime.Format(time.RFC3339Nano),
Start: startTime,
End: endTime,
ServiceName: serviceName,
Period: fmt.Sprintf("PT%dH", stepHour),
StepHour: stepHour,
}
return &getUsageParams, nil
}
func parseGetServicesRequest(r *http.Request) (*model.GetServicesParams, error) {
var postData *model.GetServicesParams

View File

@@ -6,8 +6,10 @@ import (
"strings"
"sync"
logsV3 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v3"
logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4"
metricsV3 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v3"
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/constants"
@@ -17,15 +19,19 @@ import (
"go.uber.org/zap"
)
func prepareLogsQuery(
_ context.Context,
func prepareLogsQuery(_ context.Context,
useLogsNewSchema bool,
start,
end int64,
builderQuery *v3.BuilderQuery,
params *v3.QueryRangeParamsV3,
) (string, error) {
query := ""
logsQueryBuilder := logsV4.PrepareLogsQuery
logsQueryBuilder := logsV3.PrepareLogsQuery
if useLogsNewSchema {
logsQueryBuilder = logsV4.PrepareLogsQuery
}
if params == nil || builderQuery == nil {
return query, fmt.Errorf("params and builderQuery cannot be nil")
@@ -96,7 +102,7 @@ func (q *querier) runBuilderQuery(
var err error
if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for logs query", zap.String("queryName", queryName), zap.Int64("start", start), zap.Int64("end", end), zap.Int64("step", builderQuery.StepInterval), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
query, err = prepareLogsQuery(ctx, start, end, builderQuery, params)
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
@@ -111,7 +117,7 @@ func (q *querier) runBuilderQuery(
missedSeries := make([]querycache.CachedSeriesData, 0)
filteredMissedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
query, err = prepareLogsQuery(ctx, miss.Start, miss.End, builderQuery, params)
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
@@ -163,7 +169,11 @@ func (q *querier) runBuilderQuery(
}
if builderQuery.DataSource == v3.DataSourceTraces {
tracesQueryBuilder := tracesV4.PrepareTracesQuery
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if q.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
var query string
var err error

View File

@@ -6,9 +6,11 @@ import (
"sync"
"time"
logsV3 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v3"
logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4"
metricsV3 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v3"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/constants"
@@ -50,6 +52,9 @@ type querier struct {
timeRanges [][]int
returnedSeries []*v3.Series
returnedErr error
UseLogsNewSchema bool
UseTraceNewSchema bool
}
type QuerierOptions struct {
@@ -59,14 +64,22 @@ type QuerierOptions struct {
FluxInterval time.Duration
// used for testing
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
UseLogsNewSchema bool
UseTraceNewSchema bool
}
func NewQuerier(opts QuerierOptions) interfaces.Querier {
logsQueryBuilder := logsV4.PrepareLogsQuery
tracesQueryBuilder := tracesV4.PrepareTracesQuery
logsQueryBuilder := logsV3.PrepareLogsQuery
if opts.UseLogsNewSchema {
logsQueryBuilder = logsV4.PrepareLogsQuery
}
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if opts.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
@@ -83,9 +96,11 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
BuildMetricQuery: metricsV3.PrepareMetricQuery,
}),
testingMode: opts.TestingMode,
returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr,
testingMode: opts.TestingMode,
returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
}
}
@@ -430,6 +445,11 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
len(params.CompositeQuery.BuilderQueries) == 1 &&
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
for _, v := range params.CompositeQuery.BuilderQueries {
if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) ||
(v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) {
break
}
// only allow of logs queries with timestamp ordering desc
// TODO(nitya): allow for timestamp asc
if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&

View File

@@ -1370,6 +1370,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
telemetryStore,
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
"",
true,
true,
time.Duration(time.Second),
nil,
)

View File

@@ -6,9 +6,11 @@ import (
"strings"
"sync"
logsV3 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v3"
logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4"
metricsV3 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v3"
metricsV4 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v4"
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/constants"
@@ -17,14 +19,17 @@ import (
"go.uber.org/zap"
)
func prepareLogsQuery(
_ context.Context,
func prepareLogsQuery(_ context.Context,
useLogsNewSchema bool,
start,
end int64,
builderQuery *v3.BuilderQuery,
params *v3.QueryRangeParamsV3,
) (string, error) {
logsQueryBuilder := logsV4.PrepareLogsQuery
logsQueryBuilder := logsV3.PrepareLogsQuery
if useLogsNewSchema {
logsQueryBuilder = logsV4.PrepareLogsQuery
}
query := ""
if params == nil || builderQuery == nil {
@@ -97,7 +102,7 @@ func (q *querier) runBuilderQuery(
var err error
if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for logs query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
query, err = prepareLogsQuery(ctx, start, end, builderQuery, params)
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
@@ -111,7 +116,7 @@ func (q *querier) runBuilderQuery(
missedSeries := make([]querycache.CachedSeriesData, 0)
filteredMissedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses {
query, err = prepareLogsQuery(ctx, miss.Start, miss.End, builderQuery, params)
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
@@ -164,7 +169,11 @@ func (q *querier) runBuilderQuery(
}
if builderQuery.DataSource == v3.DataSourceTraces {
tracesQueryBuilder := tracesV4.PrepareTracesQuery
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if q.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
var query string
var err error

View File

@@ -6,9 +6,11 @@ import (
"sync"
"time"
logsV3 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v3"
logsV4 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v4"
metricsV4 "github.com/SigNoz/signoz/pkg/query-service/app/metrics/v4"
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
"github.com/SigNoz/signoz/pkg/query-service/common"
"github.com/SigNoz/signoz/pkg/query-service/constants"
@@ -47,9 +49,11 @@ type querier struct {
testingMode bool
queriesExecuted []string
// tuple of start and end time in milliseconds
timeRanges [][]int
returnedSeries []*v3.Series
returnedErr error
timeRanges [][]int
returnedSeries []*v3.Series
returnedErr error
UseLogsNewSchema bool
UseTraceNewSchema bool
}
type QuerierOptions struct {
@@ -59,14 +63,23 @@ type QuerierOptions struct {
FluxInterval time.Duration
// used for testing
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
UseLogsNewSchema bool
UseTraceNewSchema bool
}
func NewQuerier(opts QuerierOptions) interfaces.Querier {
logsQueryBuilder := logsV4.PrepareLogsQuery
tracesQueryBuilder := tracesV4.PrepareTracesQuery
logsQueryBuilder := logsV3.PrepareLogsQuery
if opts.UseLogsNewSchema {
logsQueryBuilder = logsV4.PrepareLogsQuery
}
tracesQueryBuilder := tracesV3.PrepareTracesQuery
if opts.UseTraceNewSchema {
tracesQueryBuilder = tracesV4.PrepareTracesQuery
}
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
@@ -83,9 +96,11 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
BuildMetricQuery: metricsV4.PrepareMetricQuery,
}),
testingMode: opts.TestingMode,
returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr,
testingMode: opts.TestingMode,
returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr,
UseLogsNewSchema: opts.UseLogsNewSchema,
UseTraceNewSchema: opts.UseTraceNewSchema,
}
}
@@ -431,6 +446,11 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
len(params.CompositeQuery.BuilderQueries) == 1 &&
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
for _, v := range params.CompositeQuery.BuilderQueries {
if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) ||
(v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) {
break
}
// only allow of logs queries with timestamp ordering desc
// TODO(nitya): allow for timestamp asc
if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&

View File

@@ -1424,6 +1424,8 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) {
telemetryStore,
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
"",
true,
true,
time.Duration(time.Second),
nil,
)

View File

@@ -53,6 +53,8 @@ type ServerOptions struct {
FluxInterval string
FluxIntervalForTraceDetail string
Cluster string
UseLogsNewSchema bool
UseTraceNewSchema bool
SigNoz *signoz.SigNoz
Jwt *authtypes.JWT
}
@@ -108,6 +110,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.SigNoz.TelemetryStore,
serverOptions.SigNoz.Prometheus,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
fluxIntervalForTraceDetail,
serverOptions.SigNoz.Cache,
)
@@ -125,6 +129,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.SigNoz.SQLStore.SQLxDB(),
reader,
c,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
serverOptions.SigNoz.SQLStore,
serverOptions.SigNoz.TelemetryStore,
serverOptions.SigNoz.Prometheus,
@@ -167,6 +173,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
LogsParsingPipelineController: logParsingPipelineController,
Cache: c,
FluxInterval: fluxInterval,
UseLogsNewSchema: serverOptions.UseLogsNewSchema,
UseTraceNewSchema: serverOptions.UseTraceNewSchema,
JWT: serverOptions.Jwt,
AlertmanagerAPI: alertmanager.NewAPI(serverOptions.SigNoz.Alertmanager),
FieldsAPI: fields.NewAPI(serverOptions.SigNoz.TelemetryStore),
@@ -281,6 +289,7 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
api.RegisterMessagingQueuesRoutes(r, am)
api.RegisterThirdPartyApiRoutes(r, am)
api.MetricExplorerRoutes(r, am)
api.RegisterTraceFunnelsRoutes(r, am)
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
@@ -427,21 +436,25 @@ func makeRulesManager(
db *sqlx.DB,
ch interfaces.Reader,
cache cache.Cache,
useLogsNewSchema bool,
useTraceNewSchema bool,
sqlstore sqlstore.SQLStore,
telemetryStore telemetrystore.TelemetryStore,
prometheus prometheus.Prometheus,
) (*rules.Manager, error) {
// create manager opts
managerOpts := &rules.ManagerOptions{
TelemetryStore: telemetryStore,
Prometheus: prometheus,
DBConn: db,
Context: context.Background(),
Logger: zap.L(),
Reader: ch,
Cache: cache,
EvalDelay: constants.GetEvalDelay(),
SQLStore: sqlstore,
TelemetryStore: telemetryStore,
Prometheus: prometheus,
DBConn: db,
Context: context.Background(),
Logger: zap.L(),
Reader: ch,
Cache: cache,
EvalDelay: constants.GetEvalDelay(),
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
SQLStore: sqlstore,
}
// create Manager

View File

@@ -15,8 +15,12 @@ import (
type Reader interface {
GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
GetTopLevelOperations(ctx context.Context, start, end time.Time, services []string) (*map[string][]string, *model.ApiError)
GetServices(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceItem, *model.ApiError)
GetTopOperations(ctx context.Context, query *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError)
GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error)
GetServicesList(ctx context.Context) (*[]string, error)
GetDependencyGraph(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error)
GetTTL(ctx context.Context, orgID string, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError)
@@ -70,6 +74,9 @@ type Reader interface {
// Logs
GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError
GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.SignozLog, *model.ApiError)
TailLogs(ctx context.Context, client *model.LogsTailClient)
AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError)
GetLogAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
GetLogAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
GetLogAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
@@ -93,6 +100,8 @@ type Reader interface {
ReadRuleStateHistoryTopContributorsByRuleID(ctx context.Context, ruleID string, params *model.QueryRuleStateHistory) ([]model.RuleStateHistoryContributor, error)
GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]model.RuleStateHistory, error)
GetMinAndMaxTimestampForTraceID(ctx context.Context, traceID []string) (int64, int64, error)
// Query Progress tracking helpers.
ReportQueryStartForProgressTracking(queryId string) (reportQueryFinished func(), err *model.ApiError)
SubscribeToQueryProgress(queryId string) (<-chan model.QueryProgress, func(), *model.ApiError)

View File

@@ -45,9 +45,7 @@ func main() {
var maxOpenConns int
var dialTimeout time.Duration
// Deprecated
flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs")
// Deprecated
flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces")
// Deprecated
flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
@@ -128,6 +126,8 @@ func main() {
FluxInterval: fluxInterval,
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
Cluster: cluster,
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
SigNoz: signoz,
Jwt: jwt,
}

View File

@@ -70,6 +70,16 @@ type RegisterEventParams struct {
RateLimited bool `json:"rateLimited"`
}
type GetUsageParams struct {
StartTime string
EndTime string
ServiceName string
Period string
StepHour int
Start *time.Time
End *time.Time
}
type GetServicesParams struct {
StartTime string `json:"start"`
EndTime string `json:"end"`

View File

@@ -34,29 +34,33 @@ import (
)
type PrepareTaskOptions struct {
Rule *ruletypes.PostableRule
TaskName string
RuleStore ruletypes.RuleStore
MaintenanceStore ruletypes.MaintenanceStore
Logger *zap.Logger
Reader interfaces.Reader
Cache cache.Cache
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
SQLStore sqlstore.SQLStore
OrgID string
Rule *ruletypes.PostableRule
TaskName string
RuleStore ruletypes.RuleStore
MaintenanceStore ruletypes.MaintenanceStore
Logger *zap.Logger
Reader interfaces.Reader
Cache cache.Cache
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
SQLStore sqlstore.SQLStore
UseLogsNewSchema bool
UseTraceNewSchema bool
OrgID string
}
type PrepareTestRuleOptions struct {
Rule *ruletypes.PostableRule
RuleStore ruletypes.RuleStore
MaintenanceStore ruletypes.MaintenanceStore
Logger *zap.Logger
Reader interfaces.Reader
Cache cache.Cache
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
SQLStore sqlstore.SQLStore
Rule *ruletypes.PostableRule
RuleStore ruletypes.RuleStore
MaintenanceStore ruletypes.MaintenanceStore
Logger *zap.Logger
Reader interfaces.Reader
Cache cache.Cache
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
SQLStore sqlstore.SQLStore
UseLogsNewSchema bool
UseTraceNewSchema bool
}
const taskNamesuffix = "webAppEditor"
@@ -91,7 +95,10 @@ type ManagerOptions struct {
EvalDelay time.Duration
PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
UseLogsNewSchema bool
UseTraceNewSchema bool
PrepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
Alertmanager alertmanager.Alertmanager
SQLStore sqlstore.SQLStore
@@ -114,6 +121,9 @@ type Manager struct {
prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
prepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError)
UseLogsNewSchema bool
UseTraceNewSchema bool
alertmanager alertmanager.Alertmanager
sqlstore sqlstore.SQLStore
}
@@ -146,6 +156,8 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {
ruleId,
opts.Rule,
opts.Reader,
opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
WithEvalDelay(opts.ManagerOpts.EvalDelay),
WithSQLStore(opts.SQLStore),
)
@@ -395,17 +407,19 @@ func (m *Manager) editTask(_ context.Context, orgID string, rule *ruletypes.Post
zap.L().Debug("editing a rule task", zap.String("name", taskName))
newTask, err := m.prepareTaskFunc(PrepareTaskOptions{
Rule: rule,
TaskName: taskName,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Logger: m.logger,
Reader: m.reader,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
SQLStore: m.sqlstore,
OrgID: orgID,
Rule: rule,
TaskName: taskName,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Logger: m.logger,
Reader: m.reader,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
SQLStore: m.sqlstore,
UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
OrgID: orgID,
})
if err != nil {
@@ -581,17 +595,19 @@ func (m *Manager) addTask(_ context.Context, orgID string, rule *ruletypes.Posta
zap.L().Debug("adding a new rule task", zap.String("name", taskName))
newTask, err := m.prepareTaskFunc(PrepareTaskOptions{
Rule: rule,
TaskName: taskName,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Logger: m.logger,
Reader: m.reader,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
SQLStore: m.sqlstore,
OrgID: orgID,
Rule: rule,
TaskName: taskName,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Logger: m.logger,
Reader: m.reader,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
SQLStore: m.sqlstore,
UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
OrgID: orgID,
})
if err != nil {
@@ -971,15 +987,17 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
}
alertCount, apiErr := m.prepareTestRuleFunc(PrepareTestRuleOptions{
Rule: parsedRule,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Logger: m.logger,
Reader: m.reader,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareTestNotifyFunc(),
SQLStore: m.sqlstore,
Rule: parsedRule,
RuleStore: m.ruleStore,
MaintenanceStore: m.maintenanceStore,
Logger: m.logger,
Reader: m.reader,
Cache: m.cache,
ManagerOpts: m.opts,
NotifyFunc: m.prepareTestNotifyFunc(),
SQLStore: m.sqlstore,
UseLogsNewSchema: m.opts.UseLogsNewSchema,
UseTraceNewSchema: m.opts.UseTraceNewSchema,
})
return alertCount, apiErr

View File

@@ -15,6 +15,7 @@ import (
// TestNotification prepares a dummy rule for given rule parameters and
// sends a test notification. returns alert count and error (if any)
func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError) {
ctx := context.Background()
if opts.Rule == nil {
@@ -47,6 +48,8 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError)
alertname,
parsedRule,
opts.Reader,
opts.UseLogsNewSchema,
opts.UseTraceNewSchema,
WithSendAlways(),
WithSendUnmatched(),
WithSQLStore(opts.SQLStore),

View File

@@ -29,6 +29,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/utils/timestamp"
logsv3 "github.com/SigNoz/signoz/pkg/query-service/app/logs/v3"
tracesV3 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v3"
tracesV4 "github.com/SigNoz/signoz/pkg/query-service/app/traces/v4"
"github.com/SigNoz/signoz/pkg/query-service/formatter"
@@ -51,12 +52,16 @@ type ThresholdRule struct {
// used for attribute metadata enrichment for logs and traces
logsKeys map[string]v3.AttributeKey
spansKeys map[string]v3.AttributeKey
useTraceNewSchema bool
}
func NewThresholdRule(
id string,
p *ruletypes.PostableRule,
reader interfaces.Reader,
useLogsNewSchema bool,
useTraceNewSchema bool,
opts ...RuleOption,
) (*ThresholdRule, error) {
@@ -68,20 +73,25 @@ func NewThresholdRule(
}
t := ThresholdRule{
BaseRule: baseRule,
version: p.Version,
BaseRule: baseRule,
version: p.Version,
useTraceNewSchema: useTraceNewSchema,
}
querierOption := querier.QuerierOptions{
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
}
querierOptsV2 := querierV2.QuerierOptions{
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
Reader: reader,
Cache: nil,
KeyGenerator: queryBuilder.NewKeyGenerator(),
UseLogsNewSchema: useLogsNewSchema,
UseTraceNewSchema: useTraceNewSchema,
}
t.querier = querier.NewQuerier(querierOption)
@@ -291,7 +301,11 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time) (rul
return nil, err
}
r.spansKeys = spanKeys
tracesV4.Enrich(params, spanKeys)
if r.useTraceNewSchema {
tracesV4.Enrich(params, spanKeys)
} else {
tracesV3.Enrich(params, spanKeys)
}
}
}

View File

@@ -801,7 +801,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) {
postableRule.RuleCondition.MatchType = ruletypes.MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -889,7 +889,7 @@ func TestPrepareLinksToLogs(t *testing.T) {
},
}
rule, err := NewThresholdRule("69", &postableRule, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -930,7 +930,7 @@ func TestPrepareLinksToTraces(t *testing.T) {
},
}
rule, err := NewThresholdRule("69", &postableRule, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -1005,7 +1005,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) {
postableRule.RuleCondition.MatchType = ruletypes.MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target
rule, err := NewThresholdRule("69", &postableRule, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -1057,7 +1057,7 @@ func TestThresholdRuleEvalDelay(t *testing.T) {
}
for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, nil) // no eval delay
rule, err := NewThresholdRule("69", &postableRule, nil, true, true) // no eval delay
if err != nil {
assert.NoError(t, err)
}
@@ -1105,7 +1105,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) {
}
for idx, c := range cases {
rule, err := NewThresholdRule("69", &postableRule, nil, WithEvalDelay(2*time.Minute))
rule, err := NewThresholdRule("69", &postableRule, nil, true, true, WithEvalDelay(2*time.Minute))
if err != nil {
assert.NoError(t, err)
}
@@ -1244,8 +1244,8 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
require.NoError(t, err)
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), readerCache)
rule, err := NewThresholdRule("69", &postableRule, reader)
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", true, true, time.Duration(time.Second), readerCache)
rule, err := NewThresholdRule("69", &postableRule, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
@@ -1340,9 +1340,9 @@ func TestThresholdRuleNoData(t *testing.T) {
}
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), readerCache)
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", true, true, time.Duration(time.Second), readerCache)
rule, err := NewThresholdRule("69", &postableRule, reader)
rule, err := NewThresholdRule("69", &postableRule, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
@@ -1444,9 +1444,9 @@ func TestThresholdRuleTracesLink(t *testing.T) {
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil)
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", true, true, time.Duration(time.Second), nil)
rule, err := NewThresholdRule("69", &postableRule, reader)
rule, err := NewThresholdRule("69", &postableRule, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
@@ -1565,9 +1565,9 @@ func TestThresholdRuleLogsLink(t *testing.T) {
}
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", time.Duration(time.Second), nil)
reader := clickhouseReader.NewReaderFromClickhouseConnection(options, nil, telemetryStore, prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}), "", true, true, time.Duration(time.Second), nil)
rule, err := NewThresholdRule("69", &postableRule, reader)
rule, err := NewThresholdRule("69", &postableRule, reader, true, true)
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
"signoz_calls_total": {
v3.Delta: true,
@@ -1643,7 +1643,7 @@ func TestThresholdRuleShiftBy(t *testing.T) {
},
}
rule, err := NewThresholdRule("69", &postableRule, nil)
rule, err := NewThresholdRule("69", &postableRule, nil, true, true)
if err != nil {
assert.NoError(t, err)
}

View File

@@ -46,6 +46,8 @@ func NewMockClickhouseReader(t *testing.T, testDB sqlstore.SQLStore) (*clickhous
telemetryStore,
prometheustest.New(instrumentationtest.New().Logger(), prometheus.Config{}),
"",
true,
true,
time.Duration(time.Second),
nil,
)

View File

@@ -5,16 +5,20 @@ import (
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
"github.com/SigNoz/signoz/pkg/modules/tracefunnel"
"github.com/SigNoz/signoz/pkg/modules/tracefunnel/impltracefunnel"
)
type Handlers struct {
Organization organization.Handler
Preference preference.Handler
TraceFunnel tracefunnel.Handler
}
func NewHandlers(modules Modules) Handlers {
return Handlers{
Organization: implorganization.NewHandler(modules.Organization),
Preference: implpreference.NewHandler(modules.Preference),
TraceFunnel: impltracefunnel.NewHandler(modules.TraceFunnel),
}
}

View File

@@ -5,6 +5,8 @@ import (
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/preference"
"github.com/SigNoz/signoz/pkg/modules/preference/implpreference"
"github.com/SigNoz/signoz/pkg/modules/tracefunnel"
"github.com/SigNoz/signoz/pkg/modules/tracefunnel/impltracefunnel"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/preferencetypes"
)
@@ -12,11 +14,13 @@ import (
type Modules struct {
Organization organization.Module
Preference preference.Module
TraceFunnel tracefunnel.Module
}
func NewModules(sqlstore sqlstore.SQLStore) Modules {
return Modules{
Organization: implorganization.NewModule(implorganization.NewStore(sqlstore)),
Preference: implpreference.NewModule(implpreference.NewStore(sqlstore), preferencetypes.NewDefaultPreferenceMap()),
TraceFunnel: impltracefunnel.NewModule(impltracefunnel.NewStore(sqlstore)),
}
}

View File

@@ -74,6 +74,7 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM
sqlmigration.NewUpdateIntegrationsFactory(sqlstore),
sqlmigration.NewUpdateOrganizationsFactory(sqlstore),
sqlmigration.NewDropGroupsFactory(sqlstore),
sqlmigration.NewAddTraceFunnelsFactory(sqlstore),
)
}

View File

@@ -0,0 +1,96 @@
package sqlmigration
import (
"context"
"fmt"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
traceFunnels "github.com/SigNoz/signoz/pkg/types/tracefunnel"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type addTraceFunnels struct {
sqlstore sqlstore.SQLStore
}
func NewAddTraceFunnelsFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("add_trace_funnels"), func(ctx context.Context, providerSettings factory.ProviderSettings, config Config) (SQLMigration, error) {
return newAddTraceFunnels(ctx, providerSettings, config, sqlstore)
})
}
func newAddTraceFunnels(_ context.Context, _ factory.ProviderSettings, _ Config, sqlstore sqlstore.SQLStore) (SQLMigration, error) {
return &addTraceFunnels{sqlstore: sqlstore}, nil
}
func (migration *addTraceFunnels) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addTraceFunnels) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Create trace_funnel table with foreign key constraint inline
_, err = tx.NewCreateTable().Model((*traceFunnels.Funnel)(nil)).
ForeignKey(`("org_id") REFERENCES "organizations" ("id") ON DELETE CASCADE`).
IfNotExists().
Exec(ctx)
if err != nil {
return fmt.Errorf("failed to create trace_funnel table: %v", err)
}
// Add unique constraint for org_id and name
_, err = tx.NewRaw(`
CREATE UNIQUE INDEX IF NOT EXISTS idx_trace_funnel_org_id_name
ON trace_funnel (org_id, name)
`).Exec(ctx)
if err != nil {
return fmt.Errorf("failed to create unique constraint: %v", err)
}
// Create indexes
_, err = tx.NewCreateIndex().Model((*traceFunnels.Funnel)(nil)).Index("idx_trace_funnel_org_id").Column("org_id").Exec(ctx)
if err != nil {
return fmt.Errorf("failed to create org_id index: %v", err)
}
_, err = tx.NewCreateIndex().Model((*traceFunnels.Funnel)(nil)).Index("idx_trace_funnel_created_at").Column("created_at").Exec(ctx)
if err != nil {
return fmt.Errorf("failed to create created_at index: %v", err)
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (migration *addTraceFunnels) Down(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Drop trace_funnel table
_, err = tx.NewDropTable().Model((*traceFunnels.Funnel)(nil)).IfExists().Exec(ctx)
if err != nil {
return fmt.Errorf("failed to drop trace_funnel table: %v", err)
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,15 @@
package traceFunnels
import (
"context"
"github.com/SigNoz/signoz/pkg/valuer"
)
type TraceFunnelStore interface {
Create(context.Context, *Funnel) error
Get(context.Context, valuer.UUID) (*Funnel, error)
List(context.Context) ([]*Funnel, error)
Update(context.Context, *Funnel) error
Delete(context.Context, valuer.UUID) error
}

View File

@@ -0,0 +1,113 @@
package traceFunnels
import (
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
// metadata for funnels
type BaseMetadata struct {
types.Identifiable // funnel id
types.TimeAuditable
types.UserAuditable
Name string `json:"funnel_name" bun:"name,type:text,notnull"` // funnel name
Description string `json:"description" bun:"description,type:text"` // funnel description
OrgID valuer.UUID `json:"org_id" bun:"org_id,type:varchar,notnull"`
}
// Funnel Core Data Structure (Funnel and FunnelStep)
type Funnel struct {
bun.BaseModel `bun:"table:trace_funnel"`
BaseMetadata
Steps []FunnelStep `json:"steps" bun:"steps,type:text,notnull"`
Tags string `json:"tags" bun:"tags,type:text"`
CreatedByUser *types.User `json:"user" bun:"rel:belongs-to,join:created_by=id"`
}
type FunnelStep struct {
Id valuer.UUID `json:"id,omitempty"`
Name string `json:"name,omitempty"` // step name
Description string `json:"description,omitempty"` // step description
Order int64 `json:"step_order"`
ServiceName string `json:"service_name"`
SpanName string `json:"span_name"`
Filters *v3.FilterSet `json:"filters,omitempty"`
LatencyPointer string `json:"latency_pointer,omitempty"`
LatencyType string `json:"latency_type,omitempty"`
HasErrors bool `json:"has_errors"`
}
// FunnelRequest represents all possible funnel-related requests
type FunnelRequest struct {
FunnelID valuer.UUID `json:"funnel_id,omitempty"`
Name string `json:"funnel_name,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
Description string `json:"description,omitempty"`
Steps []FunnelStep `json:"steps,omitempty"`
UserID string `json:"user_id,omitempty"`
// Analytics specific fields
StartTime int64 `json:"start_time,omitempty"`
EndTime int64 `json:"end_time,omitempty"`
StepAOrder int64 `json:"step_a_order,omitempty"`
StepBOrder int64 `json:"step_b_order,omitempty"`
}
// FunnelResponse represents all possible funnel-related responses
type FunnelResponse struct {
FunnelID string `json:"funnel_id,omitempty"`
FunnelName string `json:"funnel_name,omitempty"`
Description string `json:"description,omitempty"`
CreatedAt int64 `json:"created_at,omitempty"`
CreatedBy string `json:"created_by,omitempty"`
UpdatedAt int64 `json:"updated_at,omitempty"`
UpdatedBy string `json:"updated_by,omitempty"`
OrgID string `json:"org_id,omitempty"`
UserEmail string `json:"user_email,omitempty"`
Funnel *Funnel `json:"funnel,omitempty"`
Steps []FunnelStep `json:"steps,omitempty"`
}
// TimeRange represents a time range for analytics
type TimeRange struct {
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
}
// StepTransitionRequest represents a request for step transition analytics
type StepTransitionRequest struct {
TimeRange
StepAOrder int64 `json:"step_a_order"`
StepBOrder int64 `json:"step_b_order"`
}
// UserInfo represents basic user information
type UserInfo struct {
ID string `json:"id"`
Email string `json:"email"`
}
// Analytics on traces
//type FunnelAnalytics struct {
// TotalStart int64 `json:"total_start"`
// TotalComplete int64 `json:"total_complete"`
// ErrorCount int64 `json:"error_count"`
// AvgDurationMs float64 `json:"avg_duration_ms"`
// P99LatencyMs float64 `json:"p99_latency_ms"`
// ConversionRate float64 `json:"conversion_rate"`
//}
//type ValidTracesResponse struct {
// TraceIDs []string `json:"trace_ids"`
//}
type FunnelStepFilter struct {
StepNumber int
ServiceName string
SpanName string
LatencyPointer string // "start" or "end"
CustomFilters *v3.FilterSet
}