Compare commits

..

5 Commits

Author SHA1 Message Date
Tushar Vats
b5e030f513 fix: draft 2026-05-03 03:54:23 +05:30
Vinicius Lourenço
8b13f004ed Revert "feat(global-time-store): add support to context, url persistence, store persistence, drift handle (#11081)" (#11152)
This reverts commit cc3da72aa5.
2026-04-30 15:46:28 +00:00
Abhi kumar
8c1d13bb38 fix: added fix for groupby being undefined (#11151) 2026-04-30 15:46:05 +00:00
SagarRajput-7
ad8f3328e0 fix(mcp-page): added acitve host url instead of current url on mcp page (#11141)
Some checks failed
build-staging / prepare (push) Has been cancelled
build-staging / js-build (push) Has been cancelled
build-staging / go-build (push) Has been cancelled
build-staging / staging (push) Has been cancelled
Release Drafter / update_release_draft (push) Has been cancelled
* fix(mcp-page): added acitve host url instead of current url on mcp page

* fix(mcp-page): configure access and role control

* chore: move get hosts api access to viewers (#11145)

* chore: move get hosts api access to viewers

* chore: update openapi spec

---------

Co-authored-by: SagarRajput-7 <162284829+SagarRajput-7@users.noreply.github.com>

* fix: allowed hosts api to run on all the cloud users

* fix: updated test cases

---------

Co-authored-by: Karan Balani <29383381+balanikaran@users.noreply.github.com>
2026-04-30 13:31:43 +00:00
Vinicius Lourenço
cc3da72aa5 feat(global-time-store): add support to context, url persistence, store persistence, drift handle (#11081)
* feat(global-time-store): add support to context, url persistence, store persistence, drift handle

* chore(fmt): fix issue with format

* refactor(hooks): mark internal and public ones

* refactor(store): adapt to don't need round down

* refactor(global-time): scope queries via name for auto refresh to be isolated

* chore(use-query-cache): add little doc

* chore(global-time): update docs
2026-04-30 11:11:58 +00:00
37 changed files with 845 additions and 816 deletions

View File

@@ -17273,9 +17273,9 @@ paths:
description: Internal Server Error
security:
- api_key:
- ADMIN
- VIEWER
- tokenizer:
- ADMIN
- VIEWER
summary: Get host info from Zeus.
tags:
- zeus

View File

@@ -105,6 +105,10 @@ func (h *handler) QueryRawStream(rw http.ResponseWriter, req *http.Request) {
h.community.QueryRawStream(rw, req)
}
func (h *handler) QueryRangePreview(rw http.ResponseWriter, req *http.Request) {
h.community.QueryRangePreview(rw, req)
}
func (h *handler) ReplaceVariables(rw http.ResponseWriter, req *http.Request) {
h.community.ReplaceVariables(rw, req)
}

View File

@@ -114,7 +114,7 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
// initiate agent config handler
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
Store: signoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController, signoz.Modules.LLMPricingRule},
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController},
})
if err != nil {
return nil, err

View File

@@ -14,6 +14,7 @@ import { usePanelContextMenu } from '../../hooks/usePanelContextMenu';
import { prepareBarPanelConfig, prepareBarPanelData } from './utils';
import '../Panel.styles.scss';
import get from 'lodash/get';
function BarPanel(props: PanelWrapperProps): JSX.Element {
const {
@@ -114,7 +115,7 @@ function BarPanel(props: PanelWrapperProps): JSX.Element {
}, []);
const groupBy = useMemo(() => {
return widget.query.builder.queryData[0].groupBy;
return get(widget, 'query.builder.queryData[0].groupBy', []);
}, [widget.query]);
return (

View File

@@ -10,6 +10,7 @@ import { ContextMenu } from 'periscope/components/ContextMenu';
import { useTimezone } from 'providers/Timezone';
import uPlot from 'uplot';
import { getTimeRange } from 'utils/getTimeRange';
import get from 'lodash/get';
import { prepareChartData, prepareUPlotConfig } from '../TimeSeriesPanel/utils';
@@ -105,7 +106,7 @@ function TimeSeriesPanel(props: PanelWrapperProps): JSX.Element {
]);
const groupBy = useMemo(() => {
return widget.query.builder.queryData[0].groupBy;
return get(widget, 'query.builder.queryData[0].groupBy', []);
}, [widget.query]);
return (

View File

@@ -67,4 +67,40 @@ describe('AuthCard', () => {
expect(mockOnCreateServiceAccount).toHaveBeenCalledTimes(1);
});
it('shows URL for non-admin (all roles can fetch instance URL)', () => {
render(<AuthCard {...defaultProps} isAdmin={false} />);
expect(screen.getByTestId('mcp-instance-url')).toHaveTextContent(
'http://localhost',
);
});
describe('isLoadingInstanceUrl', () => {
it('shows a skeleton and hides the URL while loading', () => {
render(<AuthCard {...defaultProps} isAdmin isLoadingInstanceUrl />);
expect(screen.queryByTestId('mcp-instance-url')).not.toBeInTheDocument();
expect(document.querySelector('.ant-skeleton-input')).toBeInTheDocument();
});
it('does not render the copy button while loading', () => {
render(<AuthCard {...defaultProps} isAdmin isLoadingInstanceUrl />);
expect(
screen.queryByRole('button', { name: 'Copy SigNoz instance URL' }),
).not.toBeInTheDocument();
});
it('shows the URL and copy button once loading is done', () => {
render(<AuthCard {...defaultProps} isAdmin isLoadingInstanceUrl={false} />);
expect(screen.getByTestId('mcp-instance-url')).toHaveTextContent(
'http://localhost',
);
expect(
screen.getByRole('button', { name: 'Copy SigNoz instance URL' }),
).toBeInTheDocument();
});
});
});

View File

@@ -1,3 +1,4 @@
import { Skeleton } from 'antd';
import { Badge, Button } from '@signozhq/ui';
import { Info, KeyRound } from '@signozhq/icons';
import CopyIconButton from '../CopyIconButton';
@@ -7,6 +8,7 @@ import './AuthCard.styles.scss';
interface AuthCardProps {
isAdmin: boolean;
instanceUrl: string;
isLoadingInstanceUrl?: boolean;
onCopyInstanceUrl: () => void;
onCreateServiceAccount: () => void;
}
@@ -14,6 +16,7 @@ interface AuthCardProps {
function AuthCard({
isAdmin,
instanceUrl,
isLoadingInstanceUrl = false,
onCopyInstanceUrl,
onCreateServiceAccount,
}: AuthCardProps): JSX.Element {
@@ -32,13 +35,18 @@ function AuthCard({
<div className="mcp-auth-card__field">
<span className="mcp-auth-card__field-label">SigNoz Instance URL</span>
<div className="mcp-auth-card__endpoint-value">
<span data-testid="mcp-instance-url">{instanceUrl}</span>
<CopyIconButton
ariaLabel="Copy SigNoz instance URL"
onCopy={onCopyInstanceUrl}
/>
</div>
{isLoadingInstanceUrl ? (
<Skeleton.Input active size="small" />
) : (
<div className="mcp-auth-card__endpoint-value">
<span data-testid="mcp-instance-url">{instanceUrl}</span>
<CopyIconButton
ariaLabel="Copy SigNoz instance URL"
onCopy={onCopyInstanceUrl}
disabled={isLoadingInstanceUrl}
/>
</div>
)}
</div>
<div className="mcp-auth-card__field">

View File

@@ -6,6 +6,8 @@ const mockLogEvent = jest.fn();
const mockCopyToClipboard = jest.fn();
const mockHistoryPush = jest.fn();
const mockUseGetGlobalConfig = jest.fn();
const mockUseGetHosts = jest.fn();
const mockUseGetTenantLicense = jest.fn();
const mockToastSuccess = jest.fn();
const mockToastWarning = jest.fn();
@@ -19,6 +21,14 @@ jest.mock('api/generated/services/global', () => ({
mockUseGetGlobalConfig(...args),
}));
jest.mock('api/generated/services/zeus', () => ({
useGetHosts: (...args: unknown[]): unknown => mockUseGetHosts(...args),
}));
jest.mock('hooks/useGetTenantLicense', () => ({
useGetTenantLicense: (): unknown => mockUseGetTenantLicense(),
}));
jest.mock('react-use', () => ({
__esModule: true,
useCopyToClipboard: (): [unknown, jest.Mock] => [null, mockCopyToClipboard],
@@ -47,6 +57,23 @@ jest.mock('utils/basePath', () => ({
}));
const MCP_URL = 'https://mcp.us.signoz.cloud/mcp';
const CUSTOM_HOST_URL = 'https://myteam.signoz.cloud';
const DEFAULT_HOST_URL = 'https://default.signoz.cloud';
function setupLicense({
isCloudUser = true,
isEnterpriseSelfHostedUser = false,
}: {
isCloudUser?: boolean;
isEnterpriseSelfHostedUser?: boolean;
} = {}): void {
mockUseGetTenantLicense.mockReturnValue({
isCloudUser,
isEnterpriseSelfHostedUser,
isCommunityUser: !isCloudUser && !isEnterpriseSelfHostedUser,
isCommunityEnterpriseUser: false,
});
}
function setupGlobalConfig({ mcpUrl }: { mcpUrl: string | null }): void {
mockUseGetGlobalConfig.mockReturnValue({
@@ -55,7 +82,29 @@ function setupGlobalConfig({ mcpUrl }: { mcpUrl: string | null }): void {
});
}
function setupHosts({
hosts = [],
isLoading = false,
isError = false,
}: {
hosts?: { name?: string; url?: string; is_default?: boolean }[];
isLoading?: boolean;
isError?: boolean;
} = {}): void {
mockUseGetHosts.mockReturnValue({
data: isLoading || isError ? undefined : { data: { hosts } },
isLoading,
isError,
});
}
describe('MCPServerSettings', () => {
beforeEach(() => {
// Default: cloud user, hosts loaded but empty → instanceUrl falls back to getBaseUrl()
setupLicense();
setupHosts();
});
afterEach(() => {
jest.clearAllMocks();
});
@@ -158,4 +207,145 @@ describe('MCPServerSettings', () => {
'Instance URL copied to clipboard',
);
});
describe('instance URL resolution', () => {
it('uses the active custom host URL when available', async () => {
setupGlobalConfig({ mcpUrl: MCP_URL });
setupHosts({
hosts: [
{ name: 'default', url: DEFAULT_HOST_URL, is_default: true },
{ name: 'myteam', url: CUSTOM_HOST_URL, is_default: false },
],
});
const user = userEvent.setup({ pointerEventsCheck: 0 });
render(<MCPServerSettings />);
expect(screen.getByTestId('mcp-instance-url')).toHaveTextContent(
CUSTOM_HOST_URL,
);
await user.click(
screen.getByRole('button', { name: 'Copy SigNoz instance URL' }),
);
expect(mockCopyToClipboard).toHaveBeenCalledWith(CUSTOM_HOST_URL);
});
it('falls back to the default host URL when no custom host exists', async () => {
setupGlobalConfig({ mcpUrl: MCP_URL });
setupHosts({
hosts: [{ name: 'default', url: DEFAULT_HOST_URL, is_default: true }],
});
const user = userEvent.setup({ pointerEventsCheck: 0 });
render(<MCPServerSettings />);
expect(screen.getByTestId('mcp-instance-url')).toHaveTextContent(
DEFAULT_HOST_URL,
);
await user.click(
screen.getByRole('button', { name: 'Copy SigNoz instance URL' }),
);
expect(mockCopyToClipboard).toHaveBeenCalledWith(DEFAULT_HOST_URL);
});
it('falls back to browser URL when hosts request errors', async () => {
setupGlobalConfig({ mcpUrl: MCP_URL });
setupHosts({ isError: true });
const user = userEvent.setup({ pointerEventsCheck: 0 });
render(<MCPServerSettings />);
await user.click(
screen.getByRole('button', { name: 'Copy SigNoz instance URL' }),
);
expect(mockCopyToClipboard).toHaveBeenCalledWith('http://localhost');
});
it('shows URL skeleton while hosts are loading', () => {
setupGlobalConfig({ mcpUrl: MCP_URL });
setupHosts({ isLoading: true });
render(<MCPServerSettings />);
expect(screen.queryByTestId('mcp-instance-url')).not.toBeInTheDocument();
expect(document.querySelector('.ant-skeleton-input')).toBeInTheDocument();
});
it('does not copy while hosts are still loading', async () => {
setupGlobalConfig({ mcpUrl: MCP_URL });
setupHosts({ isLoading: true });
userEvent.setup({ pointerEventsCheck: 0 });
render(<MCPServerSettings />);
expect(
screen.queryByRole('button', { name: 'Copy SigNoz instance URL' }),
).not.toBeInTheDocument();
expect(mockCopyToClipboard).not.toHaveBeenCalled();
});
it('disables the hosts query for non-cloud deployments', () => {
setupGlobalConfig({ mcpUrl: MCP_URL });
setupLicense({ isCloudUser: false, isEnterpriseSelfHostedUser: true });
render(<MCPServerSettings />, undefined, { role: 'ADMIN' });
const callOptions = mockUseGetHosts.mock.calls[0]?.[0];
expect(callOptions?.query?.enabled).toBe(false);
});
it('uses browser URL immediately for enterprise self-hosted (no skeleton)', async () => {
setupGlobalConfig({ mcpUrl: MCP_URL });
setupLicense({ isCloudUser: false, isEnterpriseSelfHostedUser: true });
setupHosts({ isLoading: false });
const user = userEvent.setup({ pointerEventsCheck: 0 });
render(<MCPServerSettings />, undefined, { role: 'ADMIN' });
expect(
document.querySelector('.ant-skeleton-input'),
).not.toBeInTheDocument();
expect(screen.getByTestId('mcp-instance-url')).toHaveTextContent(
'http://localhost',
);
await user.click(
screen.getByRole('button', { name: 'Copy SigNoz instance URL' }),
);
expect(mockCopyToClipboard).toHaveBeenCalledWith('http://localhost');
});
it('enables the hosts query for all cloud users including viewers', () => {
setupGlobalConfig({ mcpUrl: MCP_URL });
setupLicense({ isCloudUser: true });
render(<MCPServerSettings />, undefined, { role: 'VIEWER' });
const callOptions = mockUseGetHosts.mock.calls[0]?.[0];
expect(callOptions?.query?.enabled).toBe(true);
});
it('shows instance URL for cloud viewer', () => {
setupGlobalConfig({ mcpUrl: MCP_URL });
setupLicense({ isCloudUser: true });
setupHosts({
hosts: [{ name: 'default', url: DEFAULT_HOST_URL, is_default: true }],
});
render(<MCPServerSettings />, undefined, { role: 'VIEWER' });
expect(
document.querySelector('.ant-skeleton-input'),
).not.toBeInTheDocument();
expect(screen.getByTestId('mcp-instance-url')).toHaveTextContent(
DEFAULT_HOST_URL,
);
});
});
});

View File

@@ -1,11 +1,13 @@
import { useCallback, useEffect, useState } from 'react';
import { useCallback, useEffect, useMemo, useState } from 'react';
import { useCopyToClipboard } from 'react-use';
import logEvent from 'api/common/logEvent';
import ROUTES from 'constants/routes';
import { SA_QUERY_PARAMS } from 'container/ServiceAccountsSettings/constants';
import { useGetGlobalConfig } from 'api/generated/services/global';
import { useGetHosts } from 'api/generated/services/zeus';
import history from 'lib/history';
import { useAppContext } from 'providers/App/App';
import { useGetTenantLicense } from 'hooks/useGetTenantLicense';
import { USER_ROLES } from 'types/roles';
import { getBaseUrl } from 'utils/basePath';
@@ -34,7 +36,23 @@ function MCPServerSettings(): JSX.Element {
const [, copyToClipboard] = useCopyToClipboard();
const isAdmin = user.role === USER_ROLES.ADMIN;
const instanceUrl = getBaseUrl();
const { isCloudUser } = useGetTenantLicense();
const {
data: hostsData,
isLoading: isLoadingHosts,
isError: isHostsError,
} = useGetHosts({ query: { enabled: isCloudUser } });
const instanceUrl = useMemo(() => {
if (isLoadingHosts || isHostsError || !hostsData) {
return getBaseUrl();
}
const hosts = hostsData.data?.hosts ?? [];
const activeHost =
hosts.find((h) => !h.is_default) ?? hosts.find((h) => h.is_default);
return activeHost?.url ?? getBaseUrl();
}, [hostsData, isLoadingHosts, isHostsError]);
const { data: globalConfig, isLoading: isConfigLoading } =
useGetGlobalConfig();
@@ -70,10 +88,13 @@ function MCPServerSettings(): JSX.Element {
}, []);
const handleCopyInstanceUrl = useCallback(() => {
if (isLoadingHosts) {
return;
}
copyToClipboard(instanceUrl);
toast.success('Instance URL copied to clipboard');
void logEvent(ANALYTICS.INSTANCE_URL_COPIED, {});
}, [copyToClipboard, instanceUrl]);
}, [copyToClipboard, instanceUrl, isLoadingHosts]);
const handleDocsLinkClick = useCallback((target: string) => {
void logEvent(ANALYTICS.DOCS_LINK_CLICKED, { target });
@@ -132,6 +153,7 @@ function MCPServerSettings(): JSX.Element {
<AuthCard
isAdmin={isAdmin}
instanceUrl={instanceUrl}
isLoadingInstanceUrl={isLoadingHosts}
onCopyInstanceUrl={handleCopyInstanceUrl}
onCreateServiceAccount={handleCreateServiceAccount}
/>

View File

@@ -36,6 +36,7 @@ function SettingsPage(): JSX.Element {
const isAdmin = user.role === USER_ROLES.ADMIN;
const isEditor = user.role === USER_ROLES.EDITOR;
const isViewer = user.role === USER_ROLES.VIEWER;
const isWorkspaceBlocked = trialInfo?.workSpaceBlock || false;
@@ -102,6 +103,13 @@ function SettingsPage(): JSX.Element {
: item.isEnabled,
}));
}
if (isViewer) {
updatedItems = updatedItems.map((item) => ({
...item,
isEnabled: item.key === ROUTES.MCP_SERVER ? true : item.isEnabled,
}));
}
}
if (isEnterpriseSelfHostedUser) {
@@ -134,6 +142,13 @@ function SettingsPage(): JSX.Element {
: item.isEnabled,
}));
}
if (isViewer) {
updatedItems = updatedItems.map((item) => ({
...item,
isEnabled: item.key === ROUTES.MCP_SERVER ? true : item.isEnabled,
}));
}
}
if (!isCloudUser && !isEnterpriseSelfHostedUser) {
@@ -166,6 +181,7 @@ function SettingsPage(): JSX.Element {
}, [
isAdmin,
isEditor,
isViewer,
isCloudUser,
isEnterpriseSelfHostedUser,
isFetchingActiveLicense,

View File

@@ -82,12 +82,13 @@ describe('SettingsPage nav sections', () => {
expect(screen.getByTestId(id)).toBeInTheDocument();
});
it.each(['billing', 'roles', 'mcp-server'])(
'does not render "%s" element',
(id) => {
expect(screen.queryByTestId(id)).not.toBeInTheDocument();
},
);
it.each(['billing', 'roles'])('does not render "%s" element', (id) => {
expect(screen.queryByTestId(id)).not.toBeInTheDocument();
});
it('renders "mcp-server" element', () => {
expect(screen.getByTestId('mcp-server')).toBeInTheDocument();
});
});
describe('Self-hosted Admin', () => {

View File

@@ -451,6 +451,22 @@ func (provider *provider) addQuerierRoutes(router *mux.Router) error {
return err
}
if err := router.Handle("/api/v5/query_range/preview", handler.New(provider.authZ.ViewAccess(provider.querierHandler.QueryRangePreview), handler.OpenAPIDef{
ID: "QueryRangePreviewV5",
Tags: []string{"querier"},
Summary: "Query range (dry-run)",
Description: "Validates the request and renders the underlying SQL/PromQL for each query in the composite query without executing it. When the 'explain' query parameter is set to true, an EXPLAIN is run against ClickHouse for each rendered SQL statement.",
Request: new(qbtypes.QueryRangeRequest),
RequestContentType: "application/json",
Response: new(qbtypes.QueryRangePreviewResponse),
ResponseContentType: "application/json",
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest},
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodPost).GetError(); err != nil {
return err
}
if err := router.Handle("/api/v5/substitute_vars", handler.New(provider.authZ.ViewAccess(provider.querierHandler.ReplaceVariables), handler.OpenAPIDef{
ID: "ReplaceVariables",
Tags: []string{"querier"},

View File

@@ -27,7 +27,7 @@ func (provider *provider) addZeusRoutes(router *mux.Router) error {
return err
}
if err := router.Handle("/api/v2/zeus/hosts", handler.New(provider.authZ.AdminAccess(provider.zeusHandler.GetHosts), handler.OpenAPIDef{
if err := router.Handle("/api/v2/zeus/hosts", handler.New(provider.authZ.ViewAccess(provider.zeusHandler.GetHosts), handler.OpenAPIDef{
ID: "GetHosts",
Tags: []string{"zeus"},
Summary: "Get host info from Zeus.",
@@ -39,7 +39,7 @@ func (provider *provider) addZeusRoutes(router *mux.Router) error {
SuccessStatusCode: http.StatusOK,
ErrorStatusCodes: []int{http.StatusBadRequest, http.StatusUnauthorized, http.StatusForbidden, http.StatusNotFound},
Deprecated: false,
SecuritySchemes: newSecuritySchemes(types.RoleAdmin),
SecuritySchemes: newSecuritySchemes(types.RoleViewer),
})).Methods(http.MethodGet).GetError(); err != nil {
return err
}

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/http/binding"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule"
@@ -18,11 +19,12 @@ import (
const maxLimit = 100
type handler struct {
module llmpricingrule.Module
module llmpricingrule.Module
providerSettings factory.ProviderSettings
}
func NewHandler(module llmpricingrule.Module) llmpricingrule.Handler {
return &handler{module: module}
func NewHandler(module llmpricingrule.Module, providerSettings factory.ProviderSettings) llmpricingrule.Handler {
return &handler{module: module, providerSettings: providerSettings}
}
// List handles GET /api/v1/llm_pricing_rules.

View File

@@ -1,221 +0,0 @@
package impllmpricingrule
import (
"bytes"
"context"
"encoding/json"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"gopkg.in/yaml.v3"
)
type module struct {
store llmpricingruletypes.Store
}
func NewModule(store llmpricingruletypes.Store) llmpricingrule.Module {
return &module{store: store}
}
func (module *module) List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*llmpricingruletypes.LLMPricingRule, int, error) {
return module.store.List(ctx, orgID, offset, limit)
}
func (module *module) Get(ctx context.Context, orgID valuer.UUID, id valuer.UUID) (*llmpricingruletypes.LLMPricingRule, error) {
return module.store.Get(ctx, orgID, id)
}
// CreateOrUpdate applies a batch of pricing rule changes:
// - ID set → match by id, overwrite fields.
// - SourceID set → match by source_id; if found overwrite, else insert.
// - neither set → insert a new user-created row (is_override = true).
//
// When UpdatableLLMPricingRule.IsOverride is nil AND the matched row has
// is_override = true, the row is fully preserved — only synced_at is stamped.
func (module *module) CreateOrUpdate(ctx context.Context, orgID valuer.UUID, userEmail string, rules []llmpricingruletypes.UpdatableLLMPricingRule) error {
now := time.Now()
upsert := func(ctx context.Context, u llmpricingruletypes.UpdatableLLMPricingRule) error {
existing, err := module.findExisting(ctx, orgID, u)
if err != nil && errors.Ast(err, errors.TypeNotFound) {
return module.store.Create(ctx, llmpricingruletypes.NewLLMPricingRuleFromUpdatable(u, orgID, userEmail, now))
}
if err != nil {
return err
}
existing.Update(u, userEmail, now)
return module.store.Update(ctx, existing)
}
err := module.store.RunInTx(ctx, func(ctx context.Context) error {
for _, u := range rules {
if err := upsert(ctx, u); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}
func (module *module) Delete(ctx context.Context, orgID, id valuer.UUID) error {
if err := module.store.Delete(ctx, orgID, id); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
}
func (module *module) AgentFeatureType() agentConf.AgentFeatureType {
return llmpricingruletypes.LLMCostFeatureType
}
// RecommendAgentConfig reads pricing rules and generates the
// signozllmpricing processor config for deployment to OTel collectors via OpAMP.
func (module *module) RecommendAgentConfig(
orgID valuer.UUID,
currentConfYaml []byte,
configVersion *opamptypes.AgentConfigVersion,
) ([]byte, string, error) {
ctx := context.Background()
rules, err := module.getEnabledRules(ctx, orgID)
if err != nil {
return nil, "", err
}
updatedConf, err := generateCollectorConfigWithLLMPricingProcessor(currentConfYaml, rules)
if err != nil {
return nil, "", err
}
serialized, err := json.Marshal(rules)
if err != nil {
return nil, "", err
}
return updatedConf, string(serialized), nil
}
func (module *module) getEnabledRules(ctx context.Context, orgID valuer.UUID) ([]*llmpricingruletypes.LLMPricingRule, error) {
rules, _, err := module.List(ctx, orgID, 0, 10000)
if err != nil {
return nil, err
}
enabled := make([]*llmpricingruletypes.LLMPricingRule, 0, len(rules))
for _, r := range rules {
if r.Enabled {
enabled = append(enabled, r)
}
}
return enabled, nil
}
// findExisting returns the row matching the updatable's ID or SourceID.
// Returns a TypeNotFound error when neither matches; the caller treats that
// as "insert new".
func (module *module) findExisting(ctx context.Context, orgID valuer.UUID, u llmpricingruletypes.UpdatableLLMPricingRule) (*llmpricingruletypes.LLMPricingRule, error) {
switch {
case u.ID != nil:
return module.store.Get(ctx, orgID, *u.ID)
case u.SourceID != nil:
return module.store.GetBySourceID(ctx, orgID, *u.SourceID)
default:
return nil, errors.Newf(errors.TypeNotFound, llmpricingruletypes.ErrCodePricingRuleNotFound, "rule has neither id nor sourceId")
}
}
// buildProcessorConfig converts pricing rules into the signozllmpricing processor config.
func buildProcessorConfig(rules []*llmpricingruletypes.LLMPricingRule) *llmpricingruletypes.LLMPricingRuleProcessorConfig {
pricingRules := make([]llmpricingruletypes.LLMPricingRuleProcessor, 0, len(rules))
for _, r := range rules {
var cache llmpricingruletypes.LLMPricingRuleProcessorCache
if r.Pricing.Cache != nil {
cache = llmpricingruletypes.LLMPricingRuleProcessorCache{
Mode: r.Pricing.Cache.Mode.StringValue(),
Read: r.Pricing.Cache.Read,
Write: r.Pricing.Cache.Write,
}
}
pricingRules = append(pricingRules, llmpricingruletypes.LLMPricingRuleProcessor{
Name: r.Model,
Pattern: r.ModelPattern,
Cache: cache,
In: r.Pricing.Input,
Out: r.Pricing.Output,
})
}
return &llmpricingruletypes.LLMPricingRuleProcessorConfig{
Attrs: llmpricingruletypes.LLMPricingRuleProcessorAttrs{
Model: llmpricingruletypes.GenAIRequestModel,
In: llmpricingruletypes.GenAIUsageInputTokens,
Out: llmpricingruletypes.GenAIUsageOutputTokens,
CacheRead: llmpricingruletypes.GenAIUsageCacheReadInputTokens,
CacheWrite: llmpricingruletypes.GenAIUsageCacheCreationInputTokens,
},
DefaultPricing: llmpricingruletypes.LLMPricingRuleProcessorDefaultPricing{
Unit: llmpricingruletypes.UnitPerMillionTokens.StringValue(),
Rules: pricingRules,
},
OutputAttrs: llmpricingruletypes.LLMPricingRuleProcessorOutputAttrs{
In: llmpricingruletypes.SignozGenAICostInput,
Out: llmpricingruletypes.SignozGenAICostOutput,
CacheRead: llmpricingruletypes.SignozGenAICostCacheRead,
CacheWrite: llmpricingruletypes.SignozGenAICostCacheWrite,
Total: llmpricingruletypes.SignozGenAITotalCost,
},
}
}
// generateCollectorConfigWithLLMPricingProcessor injects (or replaces) the signozllmpricing
// processor block in the collector YAML with one built from the given rules.
func generateCollectorConfigWithLLMPricingProcessor(
currentConfYaml []byte,
rules []*llmpricingruletypes.LLMPricingRule,
) ([]byte, error) {
if len(bytes.TrimSpace(currentConfYaml)) == 0 {
return currentConfYaml, nil
}
var collectorConf map[string]any
if err := yaml.Unmarshal(currentConfYaml, &collectorConf); err != nil {
return nil, errors.Wrapf(err, errors.TypeInvalidInput, llmpricingruletypes.ErrCodeInvalidCollectorConfig, "failed to unmarshal collector config")
}
// rare but don't do anything in this case, also means it's just comments.
if collectorConf == nil {
return currentConfYaml, nil
}
processors := map[string]any{}
if existing, ok := collectorConf["processors"]; ok && existing != nil {
p, ok := existing.(map[string]any)
if !ok {
return nil, errors.Newf(errors.TypeInvalidInput, llmpricingruletypes.ErrCodeInvalidCollectorConfig, "collector config 'processors' must be a mapping, got %T", existing)
}
processors = p
}
processors[llmpricingruletypes.ProcessorName] = buildProcessorConfig(rules)
collectorConf["processors"] = processors
out, err := yaml.Marshal(collectorConf)
if err != nil {
return nil, errors.Wrapf(err, errors.TypeInternal, llmpricingruletypes.ErrCodeBuildPricingProcessorConf, "failed to marshal llm pricing processor config")
}
return out, nil
}

View File

@@ -1,92 +0,0 @@
package impllmpricingrule
import (
"os"
"path/filepath"
"testing"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)
// assertYAMLEqualToFile decodes both sides into any and compares structurally,
// so map key ordering is irrelevant.
func assertYAMLEqualToFile(t *testing.T, name string, actual []byte) {
t.Helper()
expected, err := os.ReadFile(filepath.Join("testdata", name))
require.NoError(t, err)
var e, a any
require.NoError(t, yaml.Unmarshal(expected, &e))
require.NoError(t, yaml.Unmarshal(actual, &a))
assert.Equal(t, e, a)
}
func makePricingRule(model string, patterns []string, cacheMode llmpricingruletypes.LLMPricingRuleCacheMode, costIn, costOut, cacheRead, cacheWrite float64) *llmpricingruletypes.LLMPricingRule {
return &llmpricingruletypes.LLMPricingRule{
Model: model,
ModelPattern: llmpricingruletypes.StringSlice(patterns),
Unit: llmpricingruletypes.UnitPerMillionTokens,
Pricing: llmpricingruletypes.LLMRulePricing{
Input: costIn,
Output: costOut,
Cache: &llmpricingruletypes.LLMPricingCacheCosts{
Mode: cacheMode,
Read: cacheRead,
Write: cacheWrite,
},
},
Enabled: true,
}
}
func TestGenerateCollectorConfigWithLLMPricingProcessor(t *testing.T) {
tests := []struct {
name string
rules []*llmpricingruletypes.LLMPricingRule
expectedFile string
}{
{
name: "with_rule",
rules: []*llmpricingruletypes.LLMPricingRule{
makePricingRule("gpt-4o", []string{"gpt-4o*"}, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 5.0, 15.0, 2.5, 0),
},
expectedFile: "collector_with_rule.yaml",
},
// We deploy the processor even with zero rules so rules can be added
// later (by a user or by Zeus) without any config-shape change.
// Pipeline wiring is handled by the collector's baseline config.
{
name: "no_rules",
rules: nil,
expectedFile: "collector_no_rules.yaml",
},
}
input, err := os.ReadFile(filepath.Join("testdata", "collector_baseline.yaml"))
require.NoError(t, err)
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
out, err := generateCollectorConfigWithLLMPricingProcessor(input, tc.rules)
require.NoError(t, err)
assertYAMLEqualToFile(t, tc.expectedFile, out)
})
}
}
func TestGenerateCollectorConfig_EmptyInputPassthrough(t *testing.T) {
// yaml.v3 errors on empty/whitespace input; the generator passes such
// input through unchanged instead.
rules := []*llmpricingruletypes.LLMPricingRule{
makePricingRule("gpt-4o", []string{"gpt-4o*"}, llmpricingruletypes.LLMPricingRuleCacheModeSubtract, 5.0, 15.0, 2.5, 0),
}
for _, in := range [][]byte{nil, []byte(" \n")} {
out, err := generateCollectorConfigWithLLMPricingProcessor(in, rules)
require.NoError(t, err)
assert.Equal(t, in, out)
}
}

View File

@@ -1,134 +0,0 @@
package impllmpricingrule
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type store struct {
sqlstore sqlstore.SQLStore
}
func NewStore(sqlstore sqlstore.SQLStore) llmpricingruletypes.Store {
return &store{sqlstore: sqlstore}
}
func (store *store) List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*llmpricingruletypes.LLMPricingRule, int, error) {
rules := make([]*llmpricingruletypes.LLMPricingRule, 0)
count, err := store.sqlstore.
BunDBCtx(ctx).
NewSelect().
Model(&rules).
Where("org_id = ?", orgID).
Order("created_at DESC").
Offset(offset).
Limit(limit).
ScanAndCount(ctx)
if err != nil {
return nil, 0, err
}
return rules, count, nil
}
func (store *store) Get(ctx context.Context, orgID, id valuer.UUID) (*llmpricingruletypes.LLMPricingRule, error) {
rule := new(llmpricingruletypes.LLMPricingRule)
err := store.sqlstore.
BunDBCtx(ctx).
NewSelect().
Model(rule).
Where("org_id = ?", orgID).
Where("id = ?", id).
Scan(ctx)
if err != nil {
return nil, store.sqlstore.WrapNotFoundErrf(err, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule %s not found in the org", id)
}
return rule, nil
}
func (store *store) GetBySourceID(ctx context.Context, orgID, sourceID valuer.UUID) (*llmpricingruletypes.LLMPricingRule, error) {
rule := new(llmpricingruletypes.LLMPricingRule)
err := store.sqlstore.
BunDBCtx(ctx).
NewSelect().
Model(rule).
Where("org_id = ?", orgID).
Where("source_id = ?", sourceID).
Scan(ctx)
if err != nil {
return nil, store.sqlstore.WrapNotFoundErrf(err, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule with source_id %s not found in the org", sourceID)
}
return rule, nil
}
func (store *store) Create(ctx context.Context, rule *llmpricingruletypes.LLMPricingRule) error {
_, err := store.sqlstore.
BunDBCtx(ctx).
NewInsert().
Model(rule).
Exec(ctx)
if err != nil {
return store.sqlstore.WrapAlreadyExistsErrf(err, llmpricingruletypes.ErrCodePricingRuleAlreadyExists, "pricing rule with model %s already exists", rule.Model)
}
return nil
}
func (store *store) Update(ctx context.Context, rule *llmpricingruletypes.LLMPricingRule) error {
res, err := store.sqlstore.
BunDBCtx(ctx).
NewUpdate().
Model(rule).
Where("org_id = ?", rule.OrgID).
Where("id = ?", rule.ID).
ExcludeColumn("id", "org_id", "source_id", "created_at", "created_by").
Exec(ctx)
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
if rowsAffected == 0 {
return errors.Newf(errors.TypeNotFound, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule %s not found in the org", rule.ID)
}
return nil
}
func (store *store) Delete(ctx context.Context, orgID, id valuer.UUID) error {
res, err := store.sqlstore.
BunDBCtx(ctx).
NewDelete().
Model((*llmpricingruletypes.LLMPricingRule)(nil)).
Where("org_id = ?", orgID).
Where("id = ?", id).
Exec(ctx)
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
if rowsAffected == 0 {
return errors.Newf(errors.TypeNotFound, llmpricingruletypes.ErrCodePricingRuleNotFound, "pricing rule %s not found in the org", id)
}
return nil
}
func (store *store) RunInTx(ctx context.Context, cb func(ctx context.Context) error) error {
return store.sqlstore.RunInTxCtx(ctx, nil, cb)
}

View File

@@ -1,31 +0,0 @@
receivers:
otlp:
protocols:
grpc:
processors:
signozllmpricing:
attrs:
model: gen_ai.request.model
in: gen_ai.usage.input_tokens
out: gen_ai.usage.output_tokens
cache_read: gen_ai.usage.cache_read.input_tokens
cache_write: gen_ai.usage.cache_creation.input_tokens
default_pricing:
unit: per_million_tokens
rules: []
output_attrs:
in: _signoz.gen_ai.cost_input
out: _signoz.gen_ai.cost_output
cache_read: _signoz.gen_ai.cost_cache_read
cache_write: _signoz.gen_ai.cost_cache_write
total: _signoz.gen_ai.total_cost
batch: {}
exporters:
otlp:
endpoint: localhost:4317
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, signozllmpricing]
exporters: [otlp]

View File

@@ -1,35 +0,0 @@
exporters:
otlp:
endpoint: localhost:4317
processors:
batch: {}
signozllmpricing:
attrs:
model: gen_ai.request.model
in: gen_ai.usage.input_tokens
out: gen_ai.usage.output_tokens
cache_read: gen_ai.usage.cache_read.input_tokens
cache_write: gen_ai.usage.cache_creation.input_tokens
default_pricing:
unit: per_million_tokens
rules: []
output_attrs:
in: _signoz.gen_ai.cost_input
out: _signoz.gen_ai.cost_output
cache_read: _signoz.gen_ai.cost_cache_read
cache_write: _signoz.gen_ai.cost_cache_write
total: _signoz.gen_ai.total_cost
receivers:
otlp:
protocols:
grpc: null
service:
pipelines:
traces:
exporters:
- otlp
processors:
- batch
- signozllmpricing
receivers:
- otlp

View File

@@ -1,44 +0,0 @@
exporters:
otlp:
endpoint: localhost:4317
processors:
batch: {}
signozllmpricing:
attrs:
model: gen_ai.request.model
in: gen_ai.usage.input_tokens
out: gen_ai.usage.output_tokens
cache_read: gen_ai.usage.cache_read.input_tokens
cache_write: gen_ai.usage.cache_creation.input_tokens
default_pricing:
unit: per_million_tokens
rules:
- name: gpt-4o
pattern:
- gpt-4o*
cache:
mode: subtract
read: 2.5
write: 0
in: 5
out: 15
output_attrs:
in: _signoz.gen_ai.cost_input
out: _signoz.gen_ai.cost_output
cache_read: _signoz.gen_ai.cost_cache_read
cache_write: _signoz.gen_ai.cost_cache_write
total: _signoz.gen_ai.total_cost
receivers:
otlp:
protocols:
grpc: null
service:
pipelines:
traces:
exporters:
- otlp
processors:
- batch
- signozllmpricing
receivers:
- otlp

View File

@@ -4,15 +4,11 @@ import (
"context"
"net/http"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types/llmpricingruletypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type Module interface {
// Since this module interacts with OpAMP, it must implement the AgentFeature interface.
agentConf.AgentFeature
List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*llmpricingruletypes.LLMPricingRule, int, error)
Get(ctx context.Context, orgID valuer.UUID, id valuer.UUID) (*llmpricingruletypes.LLMPricingRule, error)
CreateOrUpdate(ctx context.Context, orgID valuer.UUID, userEmail string, rules []llmpricingruletypes.UpdatableLLMPricingRule) (err error)

View File

@@ -72,6 +72,53 @@ func (handler *handler) QueryRange(rw http.ResponseWriter, req *http.Request) {
render.Success(rw, http.StatusOK, queryRangeResponse)
}
func (handler *handler) QueryRangePreview(rw http.ResponseWriter, req *http.Request) {
ctx := req.Context()
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.CodeNamespace: "querier",
instrumentationtypes.CodeFunctionName: "QueryRangePreview",
})
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
var queryRangeRequest qbtypes.QueryRangeRequest
if err := json.NewDecoder(req.Body).Decode(&queryRangeRequest); err != nil {
render.Error(rw, err)
return
}
if err := queryRangeRequest.Validate(); err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)
return
}
explain, err := ParseExplainVariant(req.URL.Query().Get("explain"))
if err != nil {
render.Error(rw, err)
return
}
opts := qbtypes.QueryRangePreviewOptions{Explain: explain}
previewResp, err := handler.querier.QueryRangePreview(ctx, orgID, &queryRangeRequest, opts)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, previewResp)
}
func (handler *handler) QueryRawStream(rw http.ResponseWriter, req *http.Request) {
ctx := req.Context()

View File

@@ -192,6 +192,12 @@ func (q *builderQuery[T]) isWindowList() bool {
return true
}
// Statement renders the SQL statement for the builder query without executing
// it. It is used by the dry-run/preview path.
func (q *builderQuery[T]) Statement(ctx context.Context) (*qbtypes.Statement, error) {
return q.stmtBuilder.Build(ctx, q.fromMS, q.toMS, q.kind, q.spec, q.variables)
}
func (q *builderQuery[T]) Execute(ctx context.Context) (*qbtypes.Result, error) {
// can we do window based pagination?

View File

@@ -99,6 +99,16 @@ func (q *chSQLQuery) renderVars(query string, vars map[string]qbtypes.VariableIt
return newQuery.String(), nil
}
// Statement renders the SQL statement for the ClickHouse SQL query without
// executing it. It is used by the dry-run/preview path.
func (q *chSQLQuery) Statement(_ context.Context) (*qbtypes.Statement, error) {
rendered, err := q.renderVars(q.query.Query, q.vars, q.fromMS, q.toMS)
if err != nil {
return nil, err
}
return &qbtypes.Statement{Query: rendered, Args: q.args}, nil
}
func (q *chSQLQuery) Execute(ctx context.Context) (*qbtypes.Result, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{
instrumentationtypes.QueryDuration: instrumentationtypes.DurationBucket(q.fromMS, q.toMS),

View File

@@ -12,6 +12,11 @@ import (
type Querier interface {
QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error)
QueryRawStream(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest, client *qbtypes.RawStream)
// QueryRangePreview validates and renders the queries in req without
// executing them. opts controls dry-run behavior such as which
// EXPLAIN variant to attach to the response; the zero value performs
// a validation-only preview with no EXPLAIN.
QueryRangePreview(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest, opts qbtypes.QueryRangePreviewOptions) (*qbtypes.QueryRangePreviewResponse, error)
}
// BucketCache is the interface for bucket-based caching.
@@ -25,5 +30,6 @@ type BucketCache interface {
type Handler interface {
QueryRange(rw http.ResponseWriter, req *http.Request)
QueryRawStream(rw http.ResponseWriter, req *http.Request)
QueryRangePreview(rw http.ResponseWriter, req *http.Request)
ReplaceVariables(rw http.ResponseWriter, req *http.Request)
}

360
pkg/querier/preview.go Normal file
View File

@@ -0,0 +1,360 @@
package querier
import (
"context"
"log/slog"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/querybuilder"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// statementProvider is implemented by query types that can render the
// underlying SQL/PromQL statement without executing it.
type statementProvider interface {
Statement(ctx context.Context) (*qbtypes.Statement, error)
}
// clickhouseExplainClause maps a variant to the EXPLAIN clause understood
// by ClickHouse (i.e. what comes between EXPLAIN and the SELECT).
func clickhouseExplainClause(v qbtypes.ExplainVariant) (string, bool) {
switch v {
case qbtypes.ExplainVariantPlan:
return "PLAN", true
case qbtypes.ExplainVariantAST:
return "AST", true
case qbtypes.ExplainVariantSyntax:
return "SYNTAX", true
case qbtypes.ExplainVariantPipeline:
return "PIPELINE", true
case qbtypes.ExplainVariantEstimate:
return "ESTIMATE", true
case qbtypes.ExplainVariantQueryTree:
return "QUERY TREE", true
default:
return "", false
}
}
// ParseExplainVariant parses the ?explain= query parameter. An empty value
// (or "false") returns ExplainVariantNone. The literal "true" maps to PLAN
// for back-compat with simple ?explain=true. Otherwise the value must
// match one of the named variants.
func ParseExplainVariant(value string) (qbtypes.ExplainVariant, error) {
token := strings.ToLower(strings.TrimSpace(value))
switch token {
case "", "false":
return qbtypes.ExplainVariantNone, nil
case "true":
return qbtypes.ExplainVariantPlan, nil
}
v := qbtypes.ExplainVariant(token)
if _, ok := clickhouseExplainClause(v); !ok {
return qbtypes.ExplainVariantNone, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported explain variant %q (allowed: plan, ast, syntax, pipeline, estimate, query_tree)", token)
}
return v, nil
}
// QueryRangePreview validates the request and renders the underlying SQL
// (or PromQL) for each query in the composite query without executing it.
// When opts.Explain is non-empty, EXPLAIN <variant> is run against the
// telemetry store for each rendered SQL statement and attached to the
// response.
func (q *querier) QueryRangePreview(
ctx context.Context,
_ valuer.UUID,
req *qbtypes.QueryRangeRequest,
opts qbtypes.QueryRangePreviewOptions,
) (*qbtypes.QueryRangePreviewResponse, error) {
tmplVars := req.Variables
if tmplVars == nil {
tmplVars = make(map[string]qbtypes.VariableItem)
}
dependencyQueries := make(map[string]bool)
for _, query := range req.CompositeQuery.Queries {
if query.Type == qbtypes.QueryTypeTraceOperator {
if spec, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok {
if err := spec.ParseExpression(); err != nil {
return nil, err
}
for _, dep := range spec.CollectReferencedQueries(spec.ParsedExpression) {
dependencyQueries[dep] = true
}
}
}
}
// First pass: normalize step intervals and collect metric names that
// need temporality/type lookup.
metricNames := make([]string, 0)
for idx, query := range req.CompositeQuery.Queries {
switch query.Type {
case qbtypes.QueryTypeBuilder:
if spec, ok := query.Spec.(qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]); ok {
for _, agg := range spec.Aggregations {
if agg.MetricName != "" {
metricNames = append(metricNames, agg.MetricName)
}
}
}
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
if spec.StepInterval.Seconds() == 0 {
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepInterval(req.Start, req.End))}
}
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepInterval(req.Start, req.End)) {
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.MinAllowedStepInterval(req.Start, req.End))}
}
req.CompositeQuery.Queries[idx].Spec = spec
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
if spec.StepInterval.Seconds() == 0 {
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepInterval(req.Start, req.End))}
}
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepInterval(req.Start, req.End)) {
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.MinAllowedStepInterval(req.Start, req.End))}
}
req.CompositeQuery.Queries[idx].Spec = spec
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
if spec.Source == telemetrytypes.SourceMeter {
if spec.StepInterval.Seconds() == 0 {
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMeter(req.Start, req.End))}
}
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMeter(req.Start, req.End)) {
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMeter(req.Start, req.End))}
}
} else {
if spec.StepInterval.Seconds() == 0 {
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End))}
}
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End)) {
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.MinAllowedStepIntervalForMetric(req.Start, req.End))}
}
}
req.CompositeQuery.Queries[idx].Spec = spec
}
case qbtypes.QueryTypePromQL:
if spec, ok := query.Spec.(qbtypes.PromQuery); ok {
if spec.Step.Seconds() == 0 {
spec.Step = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepIntervalForMetric(req.Start, req.End))}
}
req.CompositeQuery.Queries[idx].Spec = spec
}
case qbtypes.QueryTypeTraceOperator:
if spec, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok {
if spec.StepInterval.Seconds() == 0 {
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.RecommendedStepInterval(req.Start, req.End))}
}
if spec.StepInterval.Seconds() < float64(querybuilder.MinAllowedStepInterval(req.Start, req.End)) {
spec.StepInterval = qbtypes.Step{Duration: time.Second * time.Duration(querybuilder.MinAllowedStepInterval(req.Start, req.End))}
}
req.CompositeQuery.Queries[idx].Spec = spec
}
}
}
// Fetch metric temporality/type for any builder metric queries.
var metricTemporality map[string]metrictypes.Temporality
var metricTypes map[string]metrictypes.Type
if len(metricNames) > 0 {
var err error
metricTemporality, metricTypes, err = q.metadataStore.FetchTemporalityAndTypeMulti(ctx, req.Start, req.End, metricNames...)
if err != nil {
q.logger.WarnContext(ctx, "failed to fetch metric temporality during preview", errors.Attr(err), slog.Any("metrics", metricNames))
return nil, errors.NewInternalf(errors.CodeInternal, "failed to fetch metrics temporality")
}
}
type queryEntry struct {
name string
queryType qbtypes.QueryType
query qbtypes.Query
}
entries := make([]queryEntry, 0, len(req.CompositeQuery.Queries))
for _, query := range req.CompositeQuery.Queries {
var queryName string
isTraceOperator := query.Type == qbtypes.QueryTypeTraceOperator
switch query.Type {
case qbtypes.QueryTypeTraceOperator:
if spec, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator); ok {
queryName = spec.Name
}
case qbtypes.QueryTypePromQL:
if spec, ok := query.Spec.(qbtypes.PromQuery); ok {
queryName = spec.Name
}
case qbtypes.QueryTypeClickHouseSQL:
if spec, ok := query.Spec.(qbtypes.ClickHouseQuery); ok {
queryName = spec.Name
}
case qbtypes.QueryTypeBuilder:
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
queryName = spec.Name
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
queryName = spec.Name
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
queryName = spec.Name
}
}
if !isTraceOperator && dependencyQueries[queryName] {
continue
}
switch query.Type {
case qbtypes.QueryTypePromQL:
promQuery, ok := query.Spec.(qbtypes.PromQuery)
if !ok {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query spec %T", query.Spec)
}
pq := newPromqlQuery(q.logger, q.promEngine, promQuery, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType, tmplVars)
entries = append(entries, queryEntry{name: promQuery.Name, queryType: query.Type, query: pq})
case qbtypes.QueryTypeClickHouseSQL:
chQuery, ok := query.Spec.(qbtypes.ClickHouseQuery)
if !ok {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid clickhouse query spec %T", query.Spec)
}
cq := newchSQLQuery(q.logger, q.telemetryStore, chQuery, nil, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType, tmplVars)
entries = append(entries, queryEntry{name: chQuery.Name, queryType: query.Type, query: cq})
case qbtypes.QueryTypeTraceOperator:
traceOpQuery, ok := query.Spec.(qbtypes.QueryBuilderTraceOperator)
if !ok {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid trace operator query spec %T", query.Spec)
}
toq := &traceOperatorQuery{
telemetryStore: q.telemetryStore,
stmtBuilder: q.traceOperatorStmtBuilder,
spec: traceOpQuery,
compositeQuery: &req.CompositeQuery,
fromMS: uint64(req.Start),
toMS: uint64(req.End),
kind: req.RequestType,
}
entries = append(entries, queryEntry{name: traceOpQuery.Name, queryType: query.Type, query: toq})
case qbtypes.QueryTypeBuilder:
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
bq := newBuilderQuery(q.logger, q.telemetryStore, q.traceStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
entries = append(entries, queryEntry{name: spec.Name, queryType: query.Type, query: bq})
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
stmtBuilder := q.logStmtBuilder
if spec.Source == telemetrytypes.SourceAudit {
stmtBuilder = q.auditStmtBuilder
}
bq := newBuilderQuery(q.logger, q.telemetryStore, stmtBuilder, spec, timeRange, req.RequestType, tmplVars)
entries = append(entries, queryEntry{name: spec.Name, queryType: query.Type, query: bq})
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
presentAggregations := []qbtypes.MetricAggregation{}
for i := range spec.Aggregations {
if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Temporality == metrictypes.Unknown {
if temp, ok := metricTemporality[spec.Aggregations[i].MetricName]; ok && temp != metrictypes.Unknown {
spec.Aggregations[i].Temporality = temp
}
}
if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Type == metrictypes.UnspecifiedType {
if foundMetricType, ok := metricTypes[spec.Aggregations[i].MetricName]; ok && foundMetricType != metrictypes.UnspecifiedType {
spec.Aggregations[i].Type = foundMetricType
}
}
if spec.Aggregations[i].Type == metrictypes.UnspecifiedType {
continue
}
presentAggregations = append(presentAggregations, spec.Aggregations[i])
}
if len(presentAggregations) == 0 {
// nothing renderable for this query — skip
continue
}
spec.Aggregations = presentAggregations
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
var bq *builderQuery[qbtypes.MetricAggregation]
if spec.Source == telemetrytypes.SourceMeter {
bq = newBuilderQuery(q.logger, q.telemetryStore, q.meterStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
} else {
bq = newBuilderQuery(q.logger, q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType, tmplVars)
}
entries = append(entries, queryEntry{name: spec.Name, queryType: query.Type, query: bq})
default:
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported builder spec type %T", query.Spec)
}
}
}
statements := make([]*qbtypes.PreviewStatement, 0, len(entries))
for _, entry := range entries {
provider, ok := entry.query.(statementProvider)
if !ok {
return nil, errors.NewInternalf(errors.CodeInternal, "query %s does not support preview", entry.name)
}
stmt, err := provider.Statement(ctx)
if err != nil {
return nil, err
}
ps := &qbtypes.PreviewStatement{
QueryName: entry.name,
QueryType: entry.queryType.StringValue(),
Query: stmt.Query,
Args: stmt.Args,
Warnings: stmt.Warnings,
}
if opts.Explain != qbtypes.ExplainVariantNone && entry.queryType != qbtypes.QueryTypePromQL {
out, err := q.runExplain(ctx, opts.Explain, stmt.Query, stmt.Args)
if err != nil {
return nil, err
}
ps.ExplainVariant = string(opts.Explain)
ps.Explain = out
}
statements = append(statements, ps)
}
return &qbtypes.QueryRangePreviewResponse{
Type: req.RequestType,
Statements: statements,
}, nil
}
// runExplain runs `EXPLAIN <variant> <stmt>` against the telemetry store
// and returns the formatted output as a single string with one row per
// line.
func (q *querier) runExplain(ctx context.Context, variant qbtypes.ExplainVariant, stmt string, args []any) (string, error) {
clause, ok := clickhouseExplainClause(variant)
if !ok {
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "unsupported explain variant %q", string(variant))
}
explainQuery := "EXPLAIN " + clause + " " + stmt
rows, err := q.telemetryStore.ClickhouseDB().Query(ctx, explainQuery, args...)
if err != nil {
return "", errors.WrapInternalf(err, errors.CodeInternal, "failed to run EXPLAIN %s", clause)
}
defer rows.Close()
var lines []string
for rows.Next() {
var line string
if err := rows.Scan(&line); err != nil {
return "", errors.WrapInternalf(err, errors.CodeInternal, "failed to scan EXPLAIN row")
}
lines = append(lines, line)
}
if err := rows.Err(); err != nil {
return "", errors.WrapInternalf(err, errors.CodeInternal, "EXPLAIN row iteration failed")
}
return strings.Join(lines, "\n"), nil
}

View File

@@ -211,6 +211,17 @@ func (q *promqlQuery) renderVars(query string, vars map[string]qbv5.VariableItem
return newQuery.String(), nil
}
// Statement renders the PromQL query string after variable substitution. It
// is used by the dry-run/preview path; PromQL queries do not have a
// SQL-style argument list.
func (q *promqlQuery) Statement(_ context.Context) (*qbv5.Statement, error) {
rendered, err := q.renderVars(q.query.Query, q.vars, q.tr.From, q.tr.To)
if err != nil {
return nil, err
}
return &qbv5.Statement{Query: rendered}, nil
}
func (q *promqlQuery) Execute(ctx context.Context) (*qbv5.Result, error) {
ctx = ctxtypes.NewContextWithCommentVals(ctx, map[string]string{

View File

@@ -32,6 +32,12 @@ func (q *traceOperatorQuery) Window() (uint64, uint64) {
return q.fromMS, q.toMS
}
// Statement renders the SQL statement for the trace operator query without
// executing it. It is used by the dry-run/preview path.
func (q *traceOperatorQuery) Statement(ctx context.Context) (*qbtypes.Statement, error) {
return q.stmtBuilder.Build(ctx, q.fromMS, q.toMS, q.kind, q.spec, q.compositeQuery)
}
func (q *traceOperatorQuery) Execute(ctx context.Context) (*qbtypes.Result, error) {
stmt, err := q.stmtBuilder.Build(
ctx,

View File

@@ -135,7 +135,6 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
Store: signoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{
logParsingPipelineController,
signoz.Modules.LLMPricingRule,
},
},
)

View File

@@ -124,6 +124,6 @@ func NewHandlers(
AlertmanagerHandler: signozalertmanager.NewHandler(alertmanagerService),
TraceDetail: impltracedetail.NewHandler(modules.TraceDetail),
RulerHandler: signozruler.NewHandler(rulerService),
LLMPricingRuleHandler: impllmpricingrule.NewHandler(modules.LLMPricingRule),
LLMPricingRuleHandler: impllmpricingrule.NewHandler(nil, providerSettings),
}
}

View File

@@ -17,8 +17,6 @@ import (
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring"
"github.com/SigNoz/signoz/pkg/modules/inframonitoring/implinframonitoring"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule"
"github.com/SigNoz/signoz/pkg/modules/llmpricingrule/impllmpricingrule"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/metricsexplorer/implmetricsexplorer"
"github.com/SigNoz/signoz/pkg/modules/organization"
@@ -82,7 +80,6 @@ type Modules struct {
CloudIntegration cloudintegration.Module
RuleStateHistory rulestatehistory.Module
TraceDetail tracedetail.Module
LLMPricingRule llmpricingrule.Module
}
func NewModules(
@@ -136,6 +133,5 @@ func NewModules(
RuleStateHistory: implrulestatehistory.NewModule(implrulestatehistory.NewStore(telemetryStore, telemetryMetadataStore, providerSettings.Logger)),
CloudIntegration: cloudIntegrationModule,
TraceDetail: impltracedetail.NewModule(impltracedetail.NewTraceStore(telemetryStore), providerSettings, config.TraceDetail),
LLMPricingRule: impllmpricingrule.NewModule(impllmpricingrule.NewStore(sqlstore)),
}
}

View File

@@ -195,7 +195,6 @@ func NewSQLMigrationProviderFactories(
sqlmigration.NewServiceAccountAuthzactory(sqlstore),
sqlmigration.NewDropUserDeletedAtFactory(sqlstore, sqlschema),
sqlmigration.NewMigrateAWSAllRegionsFactory(sqlstore),
sqlmigration.NewAddLLMPricingRulesFactory(sqlstore, sqlschema),
)
}

View File

@@ -1,98 +0,0 @@
package sqlmigration
import (
"context"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlschema"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type addLLMPricingRules struct {
sqlschema sqlschema.SQLSchema
sqlstore sqlstore.SQLStore
}
func NewAddLLMPricingRulesFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("add_llm_pricing_rule"), func(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
return &addLLMPricingRules{
sqlschema: sqlschema,
sqlstore: sqlstore,
}, nil
})
}
func (migration *addLLMPricingRules) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *addLLMPricingRules) Up(ctx context.Context, db *bun.DB) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
sqls := [][]byte{}
tableSQLs := migration.sqlschema.Operator().CreateTable(&sqlschema.Table{
Name: "llm_pricing_rule",
Columns: []*sqlschema.Column{
{Name: "id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "created_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
{Name: "updated_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
{Name: "created_by", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "updated_by", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "org_id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "source_id", DataType: sqlschema.DataTypeText, Nullable: true},
{Name: "model", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "provider", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "model_pattern", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "unit", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "pricing", DataType: sqlschema.DataTypeText, Nullable: false, Default: "'{}'"},
{Name: "is_override", DataType: sqlschema.DataTypeBoolean, Nullable: false, Default: "false"},
{Name: "synced_at", DataType: sqlschema.DataTypeTimestamp, Nullable: true},
{Name: "enabled", DataType: sqlschema.DataTypeBoolean, Nullable: false, Default: "true"},
},
PrimaryKeyConstraint: &sqlschema.PrimaryKeyConstraint{
ColumnNames: []sqlschema.ColumnName{"id"},
},
ForeignKeyConstraints: []*sqlschema.ForeignKeyConstraint{
{
ReferencingColumnName: sqlschema.ColumnName("org_id"),
ReferencedTableName: sqlschema.TableName("organizations"),
ReferencedColumnName: sqlschema.ColumnName("id"),
},
},
})
sqls = append(sqls, tableSQLs...)
indexSQLs := migration.sqlschema.Operator().CreateIndex(
&sqlschema.PartialUniqueIndex{
TableName: "llm_pricing_rule",
ColumnNames: []sqlschema.ColumnName{"org_id", "source_id"},
Where: "source_id IS NOT NULL",
})
sqls = append(sqls, indexSQLs...)
for _, sql := range sqls {
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
return err
}
}
return tx.Commit()
}
func (migration *addLLMPricingRules) Down(context.Context, *bun.DB) error {
return nil
}

View File

@@ -6,34 +6,14 @@ import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
)
const (
LLMCostFeatureType agentConf.AgentFeatureType = "llm_pricing"
GenAIRequestModel = "gen_ai.request.model"
GenAIUsageInputTokens = "gen_ai.usage.input_tokens"
GenAIUsageOutputTokens = "gen_ai.usage.output_tokens"
GenAIUsageCacheReadInputTokens = "gen_ai.usage.cache_read.input_tokens"
GenAIUsageCacheCreationInputTokens = "gen_ai.usage.cache_creation.input_tokens"
SignozGenAICostInput = "_signoz.gen_ai.cost_input"
SignozGenAICostOutput = "_signoz.gen_ai.cost_output"
SignozGenAICostCacheRead = "_signoz.gen_ai.cost_cache_read"
SignozGenAICostCacheWrite = "_signoz.gen_ai.cost_cache_write"
SignozGenAITotalCost = "_signoz.gen_ai.total_cost"
)
var (
ErrCodePricingRuleAlreadyExists = errors.MustNewCode("pricing_rule_already_exists")
ErrCodePricingRuleNotFound = errors.MustNewCode("pricing_rule_not_found")
ErrCodePricingRuleInvalidInput = errors.MustNewCode("pricing_rule_invalid_input")
ErrCodeInvalidCollectorConfig = errors.MustNewCode("invalid_collector_config")
ErrCodeBuildPricingProcessorConf = errors.MustNewCode("build_pricing_processor_config")
ErrCodePricingRuleNotFound = errors.MustNewCode("pricing_rule_not_found")
ErrCodePricingRuleInvalidInput = errors.MustNewCode("pricing_rule_invalid_input")
)
type LLMPricingRuleUnit struct {
@@ -203,48 +183,3 @@ func NewGettableLLMPricingRulesFromLLMPricingRules(items []*LLMPricingRule, tota
Limit: limit,
}
}
func NewLLMPricingRuleFromUpdatable(u UpdatableLLMPricingRule, orgID valuer.UUID, userEmail string, now time.Time) *LLMPricingRule {
isOverride := true
if u.IsOverride != nil {
isOverride = *u.IsOverride
} else if u.SourceID != nil {
isOverride = false
}
return &LLMPricingRule{
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
TimeAuditable: types.TimeAuditable{CreatedAt: now, UpdatedAt: now},
UserAuditable: types.UserAuditable{CreatedBy: userEmail, UpdatedBy: userEmail},
OrgID: orgID,
SourceID: u.SourceID,
Model: u.Model,
Provider: u.Provider,
ModelPattern: StringSlice(u.ModelPattern),
Unit: u.Unit,
Pricing: u.Pricing,
IsOverride: isOverride,
SyncedAt: &now,
Enabled: u.Enabled,
}
}
func (r *LLMPricingRule) Update(u UpdatableLLMPricingRule, userEmail string, now time.Time) {
if u.IsOverride == nil && r.IsOverride {
r.SyncedAt = &now
return
}
r.Model = u.Model
r.Provider = u.Provider
r.ModelPattern = StringSlice(u.ModelPattern)
r.Unit = u.Unit
r.Pricing = u.Pricing
if u.IsOverride != nil {
r.IsOverride = *u.IsOverride
}
r.Enabled = u.Enabled
r.SyncedAt = &now
r.UpdatedAt = now
r.UpdatedBy = userEmail
}

View File

@@ -1,51 +0,0 @@
package llmpricingruletypes
const ProcessorName = "signozllmpricing"
// LLMPricingRuleProcessorConfig is the top-level config for the signozllmpricing
// OTel processor that gets deployed to collectors via OpAMP.
type LLMPricingRuleProcessorConfig struct {
Attrs LLMPricingRuleProcessorAttrs `yaml:"attrs" json:"attrs"`
DefaultPricing LLMPricingRuleProcessorDefaultPricing `yaml:"default_pricing" json:"default_pricing"`
OutputAttrs LLMPricingRuleProcessorOutputAttrs `yaml:"output_attrs" json:"output_attrs"`
}
// LLMPricingRuleProcessorAttrs maps span attribute names to the processor's input fields.
type LLMPricingRuleProcessorAttrs struct {
Model string `yaml:"model" json:"model"`
In string `yaml:"in" json:"in"`
Out string `yaml:"out" json:"out"`
CacheRead string `yaml:"cache_read" json:"cache_read"`
CacheWrite string `yaml:"cache_write" json:"cache_write"`
}
// LLMPricingRuleProcessorDefaultPricing holds the pricing unit and the list of model-specific rules.
type LLMPricingRuleProcessorDefaultPricing struct {
Unit string `yaml:"unit" json:"unit"`
Rules []LLMPricingRuleProcessor `yaml:"rules" json:"rules"`
}
// LLMPricingRuleProcessor is a single pricing rule inside the processor config.
type LLMPricingRuleProcessor struct {
Name string `yaml:"name" json:"name"`
Pattern []string `yaml:"pattern" json:"pattern"`
Cache LLMPricingRuleProcessorCache `yaml:"cache" json:"cache"`
In float64 `yaml:"in" json:"in"`
Out float64 `yaml:"out" json:"out"`
}
// LLMPricingRuleProcessorCache describes how cached tokens are accounted for.
type LLMPricingRuleProcessorCache struct {
Mode string `yaml:"mode" json:"mode"`
Read float64 `yaml:"read" json:"read"`
Write float64 `yaml:"write" json:"write"`
}
// LLMPricingRuleProcessorOutputAttrs maps the processor's computed cost fields to span attribute names.
type LLMPricingRuleProcessorOutputAttrs struct {
In string `yaml:"in" json:"in"`
Out string `yaml:"out" json:"out"`
CacheRead string `yaml:"cache_read" json:"cache_read"`
CacheWrite string `yaml:"cache_write" json:"cache_write"`
Total string `yaml:"total" json:"total"`
}

View File

@@ -7,11 +7,10 @@ import (
)
type Store interface {
List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*LLMPricingRule, int, error)
Get(ctx context.Context, orgID, id valuer.UUID) (*LLMPricingRule, error)
GetBySourceID(ctx context.Context, orgID, sourceID valuer.UUID) (*LLMPricingRule, error)
Create(ctx context.Context, rule *LLMPricingRule) error
Update(ctx context.Context, rule *LLMPricingRule) error
List(ctx context.Context, orgID valuer.UUID, offset, limit int) ([]*StorableLLMPricingRule, int, error)
Get(ctx context.Context, orgID, id valuer.UUID) (*StorableLLMPricingRule, error)
GetBySourceID(ctx context.Context, orgID, sourceID valuer.UUID) (*StorableLLMPricingRule, error)
Create(ctx context.Context, rule *StorableLLMPricingRule) error
Update(ctx context.Context, rule *StorableLLMPricingRule) error
Delete(ctx context.Context, orgID, id valuer.UUID) error
RunInTx(ctx context.Context, cb func(ctx context.Context) error) error
}

View File

@@ -64,6 +64,74 @@ type QueryRangeResponse struct {
QBEvent *QBEvent `json:"-"`
}
// QueryRangePreviewResponse describes the dry-run output of a query range
// request. Each entry corresponds to a single query in the composite query.
type QueryRangePreviewResponse struct {
Type RequestType `json:"type"`
Statements []*PreviewStatement `json:"statements"`
}
// ExplainVariant identifies one of the ClickHouse EXPLAIN modes that the
// preview endpoint can run against a rendered SQL statement.
type ExplainVariant string
const (
ExplainVariantNone ExplainVariant = ""
ExplainVariantPlan ExplainVariant = "plan"
ExplainVariantAST ExplainVariant = "ast"
ExplainVariantSyntax ExplainVariant = "syntax"
ExplainVariantPipeline ExplainVariant = "pipeline"
ExplainVariantEstimate ExplainVariant = "estimate"
ExplainVariantQueryTree ExplainVariant = "query_tree"
)
// IsValid reports whether the variant is one of the supported values
// (including the empty/no-explain sentinel).
func (v ExplainVariant) IsValid() bool {
switch v {
case ExplainVariantNone,
ExplainVariantPlan,
ExplainVariantAST,
ExplainVariantSyntax,
ExplainVariantPipeline,
ExplainVariantEstimate,
ExplainVariantQueryTree:
return true
}
return false
}
// QueryRangePreviewOptions carries per-call options for the query range
// preview (dry-run) endpoint. The zero value is meaningful and produces a
// validation-only preview with no EXPLAIN.
type QueryRangePreviewOptions struct {
// Explain selects which ClickHouse EXPLAIN variant to run for each
// rendered SQL statement. Leave empty to skip EXPLAIN.
Explain ExplainVariant
}
// PrepareJSONSchema adds description to the QueryRangePreviewResponse schema.
func (q *QueryRangePreviewResponse) PrepareJSONSchema(schema *jsonschema.Schema) error {
schema.WithDescription("Response from the v5 query range preview (dry-run) endpoint. Returns the rendered SQL/PromQL for each query in the composite query along with optional EXPLAIN output when requested.")
return nil
}
// PreviewStatement is the rendered form of a single query.
type PreviewStatement struct {
QueryName string `json:"queryName"`
QueryType string `json:"queryType"`
Query string `json:"query"`
Args []any `json:"args,omitempty"`
Error error `json:"error"`
Warnings []string `json:"warnings,omitempty"`
// ExplainVariant is the EXPLAIN mode that produced Explain. Empty when
// no EXPLAIN was requested.
ExplainVariant string `json:"explainVariant,omitempty"`
// Explain is the formatted output returned by ClickHouse for the
// requested EXPLAIN variant. Empty when no EXPLAIN was requested.
Explain string `json:"explain,omitempty"`
}
var _ jsonschema.Preparer = &QueryRangeResponse{}
// PrepareJSONSchema adds description to the QueryRangeResponse schema.