Compare commits

..

1 Commits

Author SHA1 Message Date
nityanandagohain
9499228398 feat: module and store for span mapper 2026-04-28 16:08:44 +05:30
17 changed files with 260 additions and 593 deletions

View File

@@ -1 +0,0 @@
1.25.7

View File

@@ -4500,6 +4500,7 @@ components:
enum:
- attribute
- resource
nullable: true
type: string
SpantypesGettableSpanMapperGroups:
properties:
@@ -4527,8 +4528,6 @@ components:
type: object
SpantypesPostableSpanMapperGroup:
properties:
category:
$ref: '#/components/schemas/SpantypesSpanMapperGroupCategory'
condition:
$ref: '#/components/schemas/SpantypesSpanMapperGroupCondition'
enabled:
@@ -4537,7 +4536,6 @@ components:
type: string
required:
- name
- category
- condition
type: object
SpantypesSpanMapper:
@@ -4573,6 +4571,7 @@ components:
- enabled
type: object
SpantypesSpanMapperConfig:
nullable: true
properties:
sources:
items:
@@ -4584,8 +4583,6 @@ components:
type: object
SpantypesSpanMapperGroup:
properties:
category:
$ref: '#/components/schemas/SpantypesSpanMapperGroupCategory'
condition:
$ref: '#/components/schemas/SpantypesSpanMapperGroupCondition'
createdAt:
@@ -4610,13 +4607,11 @@ components:
- id
- orgId
- name
- category
- condition
- enabled
type: object
SpantypesSpanMapperGroupCategory:
type: object
SpantypesSpanMapperGroupCondition:
nullable: true
properties:
attributes:
items:
@@ -9423,12 +9418,6 @@ paths:
org.
operationId: ListSpanMapperGroups
parameters:
- explode: true
in: query
name: category
schema:
$ref: '#/components/schemas/SpantypesSpanMapperGroupCategory'
style: deepObject
- in: query
name: enabled
schema:

View File

@@ -12,7 +12,6 @@ import (
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/spanmapper/implspanmapper"
"github.com/gorilla/handlers"
@@ -112,11 +111,9 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
}
// initiate agent config handler
spanAttrMappingFeature := implspanmapper.NewSpanAttrMappingFeature(signoz.Modules.SpanMapper)
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
Store: signoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController, spanAttrMappingFeature},
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController},
})
if err != nil {
return nil, err

View File

@@ -5472,9 +5472,13 @@ export interface Sigv4SigV4ConfigDTO {
[key: string]: unknown;
}
/**
* @nullable
*/
export enum SpantypesFieldContextDTO {
attribute = 'attribute',
resource = 'resource',
null = null,
}
export interface SpantypesGettableSpanMapperGroupsDTO {
/**
@@ -5497,7 +5501,6 @@ export interface SpantypesPostableSpanMapperDTO {
}
export interface SpantypesPostableSpanMapperGroupDTO {
category: SpantypesSpanMapperGroupCategoryDTO;
condition: SpantypesSpanMapperGroupConditionDTO;
/**
* @type boolean
@@ -5548,16 +5551,18 @@ export interface SpantypesSpanMapperDTO {
updatedBy?: string;
}
export interface SpantypesSpanMapperConfigDTO {
/**
* @nullable
*/
export type SpantypesSpanMapperConfigDTO = {
/**
* @type array
* @nullable true
*/
sources: SpantypesSpanMapperSourceDTO[] | null;
}
} | null;
export interface SpantypesSpanMapperGroupDTO {
category: SpantypesSpanMapperGroupCategoryDTO;
condition: SpantypesSpanMapperGroupConditionDTO;
/**
* @type string
@@ -5595,11 +5600,10 @@ export interface SpantypesSpanMapperGroupDTO {
updatedBy?: string;
}
export interface SpantypesSpanMapperGroupCategoryDTO {
[key: string]: unknown;
}
export interface SpantypesSpanMapperGroupConditionDTO {
/**
* @nullable
*/
export type SpantypesSpanMapperGroupConditionDTO = {
/**
* @type array
* @nullable true
@@ -5610,7 +5614,7 @@ export interface SpantypesSpanMapperGroupConditionDTO {
* @nullable true
*/
resource: string[] | null;
}
} | null;
export enum SpantypesSpanMapperOperationDTO {
move = 'move',
@@ -7104,10 +7108,6 @@ export type GetMyServiceAccount200 = {
};
export type ListSpanMapperGroupsParams = {
/**
* @description undefined
*/
category?: SpantypesSpanMapperGroupCategoryDTO;
/**
* @type boolean
* @nullable true

View File

@@ -1,85 +0,0 @@
package implspanmapper
import (
"context"
"encoding/json"
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
const SpanAttrMappingFeatureType agentConf.AgentFeatureType = "span_attr_mapping"
// SpanAttrMappingFeature implements agentConf.AgentFeature. It reads enabled
// mapping groups and mappers from the module and generates the
// signozspanmappingprocessor config for deployment via OpAMP.
type SpanAttrMappingFeature struct {
module spanmapper.Module
}
func NewSpanAttrMappingFeature(module spanmapper.Module) *SpanAttrMappingFeature {
return &SpanAttrMappingFeature{module: module}
}
func (f *SpanAttrMappingFeature) AgentFeatureType() agentConf.AgentFeatureType {
return SpanAttrMappingFeatureType
}
func (f *SpanAttrMappingFeature) RecommendAgentConfig(
orgId valuer.UUID,
currentConfYaml []byte,
configVersion *opamptypes.AgentConfigVersion,
) ([]byte, string, error) {
ctx := context.Background()
groups, err := f.getEnabled(ctx, orgId)
if err != nil {
return nil, "", err
}
updatedConf, err := generateCollectorConfigWithSpanMapping(currentConfYaml, groups)
if err != nil {
return nil, "", err
}
serialized, err := json.Marshal(groups)
if err != nil {
return nil, "", err
}
return updatedConf, string(serialized), nil
}
// getEnabled returns enabled groups alongside their enabled mappers. Groups
// with no enabled mappers are still included so the collector sees the
// exists_any condition, even if the attributes list is empty.
func (f *SpanAttrMappingFeature) getEnabled(ctx context.Context, orgId valuer.UUID) ([]enabledGroup, error) {
if f.module == nil {
return nil, nil
}
enabled := true
groups, err := f.module.ListGroups(ctx, orgId, &spantypes.ListSpanMapperGroupsQuery{Enabled: &enabled})
if err != nil {
return nil, err
}
out := make([]enabledGroup, 0, len(groups))
for _, g := range groups {
mappers, err := f.module.ListMappers(ctx, orgId, g.ID)
if err != nil {
return nil, err
}
enabledMappers := make([]*spantypes.SpanMapper, 0, len(mappers))
for _, m := range mappers {
if m.Enabled {
enabledMappers = append(enabledMappers, m)
}
}
out = append(out, enabledGroup{group: g, mappers: enabledMappers})
}
return out, nil
}

View File

@@ -1,137 +0,0 @@
package implspanmapper
import (
"bytes"
"fmt"
"sort"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"gopkg.in/yaml.v3"
)
const processorName = "signozspanmappingprocessor"
const (
// Collector context values (see signozspanmappingprocessor.ContextAttributes/ContextResource).
ctxAttributes = "attributes"
ctxResource = "resource"
// Collector action values.
actionCopy = "copy"
actionMove = "move"
// Source key prefix the collector treats as "read from resource".
resourcePrefix = "resource."
)
// enabledGroup pairs an enabled group with its enabled mappers.
type enabledGroup struct {
group *spantypes.SpanMapperGroup
mappers []*spantypes.SpanMapper
}
// buildProcessorConfig converts enabled groups + mappers into the
// signozspanmappingprocessor config shape.
func buildProcessorConfig(groups []enabledGroup) *spantypes.SpanMappingProcessorConfig {
out := make([]spantypes.SpanMappingGroup, 0, len(groups))
for _, eg := range groups {
rules := make([]spantypes.SpanMappingAttribute, 0, len(eg.mappers))
for _, m := range eg.mappers {
rules = append(rules, buildAttributeRule(m))
}
out = append(out, spantypes.SpanMappingGroup{
ID: eg.group.Name,
ExistsAny: spantypes.SpanMappingExistsAny{
Attributes: eg.group.Condition.Attributes,
Resource: eg.group.Condition.Resource,
},
Attributes: rules,
})
}
return &spantypes.SpanMappingProcessorConfig{Groups: out}
}
// buildAttributeRule maps a single SpanMapper to a collector AttributeRule.
// Sources are sorted by Priority DESC (highest-priority first), and read-from-
// resource sources are encoded via the "resource." prefix. The rule-level
// action is derived from the sources' operations (all sources within one
// mapper are expected to share the same operation; the highest-priority
// source's operation is used).
func buildAttributeRule(m *spantypes.SpanMapper) spantypes.SpanMappingAttribute {
sources := make([]spantypes.SpanMapperSource, len(m.Config.Sources))
copy(sources, m.Config.Sources)
sort.SliceStable(sources, func(i, j int) bool { return sources[i].Priority > sources[j].Priority })
keys := make([]string, 0, len(sources))
for _, s := range sources {
if s.Context == spantypes.FieldContextResource {
keys = append(keys, resourcePrefix+s.Key)
} else {
keys = append(keys, s.Key)
}
}
action := actionCopy
if len(sources) > 0 && sources[0].Operation == spantypes.SpanMapperOperationMove {
action = actionMove
}
ctx := ctxAttributes
if m.FieldContext == spantypes.FieldContextResource {
ctx = ctxResource
}
return spantypes.SpanMappingAttribute{
Target: m.Name,
Context: ctx,
Action: action,
Sources: keys,
}
}
// generateCollectorConfigWithSpanMapping injects (or replaces) the
// signozspanmappingprocessor block in the collector YAML. Pipeline wiring is
// handled by the collector's baseline config, not here.
func generateCollectorConfigWithSpanMapping(
currentConfYaml []byte,
groups []enabledGroup,
) ([]byte, error) {
// Empty input: nothing to inject into. Pass through unchanged so we don't
// turn it into "null\n" or fail on yaml.v3's EOF.
if len(bytes.TrimSpace(currentConfYaml)) == 0 {
return currentConfYaml, nil
}
var collectorConf map[string]any
if err := yaml.Unmarshal(currentConfYaml, &collectorConf); err != nil {
return nil, fmt.Errorf("failed to unmarshal collector config: %w", err)
}
if collectorConf == nil {
collectorConf = map[string]any{}
}
processors := map[string]any{}
if collectorConf["processors"] != nil {
if p, ok := collectorConf["processors"].(map[string]any); ok {
processors = p
}
}
procConfig := buildProcessorConfig(groups)
configBytes, err := yaml.Marshal(procConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal span attr mapping processor config: %w", err)
}
var configMap any
if err := yaml.Unmarshal(configBytes, &configMap); err != nil {
return nil, fmt.Errorf("failed to re-unmarshal span attr mapping processor config: %w", err)
}
processors[processorName] = configMap
collectorConf["processors"] = processors
return yaml.Marshal(collectorConf)
}

View File

@@ -72,10 +72,9 @@ func (h *handler) CreateGroup(rw http.ResponseWriter, r *http.Request) {
return
}
group := spantypes.NewSpanMapperGroupFromPostable(req)
group := spantypes.NewSpanMapperGroup(orgID, claims.Email, req)
err = h.module.CreateGroup(ctx, orgID, claims.Email, group)
if err != nil {
if err := h.module.CreateGroup(ctx, orgID, group); err != nil {
render.Error(rw, err)
return
}
@@ -107,11 +106,17 @@ func (h *handler) UpdateGroup(rw http.ResponseWriter, r *http.Request) {
return
}
err = h.module.UpdateGroup(ctx, orgID, id, claims.Email, spantypes.NewSpanMapperGroupFromUpdatable(req))
group, err := h.module.GetGroup(ctx, orgID, id)
if err != nil {
render.Error(rw, err)
return
}
group.Update(req, claims.Email)
if err := h.module.UpdateGroup(ctx, orgID, id, group); err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusNoContent, nil)
}
@@ -195,10 +200,9 @@ func (h *handler) CreateMapper(rw http.ResponseWriter, r *http.Request) {
render.Error(rw, err)
return
}
mapper := spantypes.NewSpanMapperFromPostable(req)
mapper := spantypes.NewSpanMapper(groupID, claims.Email, req)
err = h.module.CreateMapper(ctx, orgID, groupID, claims.Email, mapper)
if err != nil {
if err := h.module.CreateMapper(ctx, orgID, groupID, mapper); err != nil {
render.Error(rw, err)
return
}
@@ -237,11 +241,17 @@ func (h *handler) UpdateMapper(rw http.ResponseWriter, r *http.Request) {
return
}
err = h.module.UpdateMapper(ctx, orgID, groupID, mapperID, claims.Email, spantypes.NewSpanMapperFromUpdatable(req))
mapper, err := h.module.GetMapper(ctx, orgID, groupID, mapperID)
if err != nil {
render.Error(rw, err)
return
}
mapper.Update(req, claims.Email)
if err := h.module.UpdateMapper(ctx, orgID, groupID, mapperID, mapper); err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusNoContent, nil)
}

View File

@@ -2,11 +2,8 @@ package implspanmapper
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/spantypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -19,161 +16,49 @@ func NewModule(store spantypes.Store) spanmapper.Module {
return &module{store: store}
}
func (m *module) ListGroups(ctx context.Context, orgID valuer.UUID, q *spantypes.ListSpanMapperGroupsQuery) ([]*spantypes.SpanMapperGroup, error) {
storables, err := m.store.ListSpanMapperGroups(ctx, orgID, q)
if err != nil {
return nil, err
}
return spantypes.NewSpanMapperGroupsFromStorableGroups(storables), nil
func (module *module) ListGroups(ctx context.Context, orgID valuer.UUID, q *spantypes.ListSpanMapperGroupsQuery) ([]*spantypes.SpanMapperGroup, error) {
return module.store.ListSpanMapperGroups(ctx, orgID, q)
}
func (m *module) GetGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID) (*spantypes.SpanMapperGroup, error) {
s, err := m.store.GetSpanMapperGroup(ctx, orgID, id)
if err != nil {
return nil, err
}
return spantypes.NewSpanMapperGroupFromStorable(s), nil
func (module *module) GetGroup(ctx context.Context, orgID, id valuer.UUID) (*spantypes.SpanMapperGroup, error) {
return module.store.GetSpanMapperGroup(ctx, orgID, id)
}
func (m *module) CreateGroup(ctx context.Context, orgID valuer.UUID, createdBy string, group *spantypes.SpanMapperGroup) error {
now := time.Now()
group.ID = valuer.GenerateUUID()
group.OrgID = orgID
group.CreatedAt = now
group.UpdatedAt = now
group.CreatedBy = createdBy
group.UpdatedBy = createdBy
storable := &spantypes.StorableSpanMapperGroup{
Identifiable: types.Identifiable{ID: group.ID},
TimeAuditable: types.TimeAuditable{CreatedAt: now, UpdatedAt: now},
UserAuditable: types.UserAuditable{CreatedBy: createdBy, UpdatedBy: createdBy},
OrgID: orgID,
Name: group.Name,
Category: group.Category,
Condition: group.Condition,
Enabled: group.Enabled,
}
if err := m.store.CreateSpanMapperGroup(ctx, storable); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
func (module *module) CreateGroup(ctx context.Context, orgID valuer.UUID, group *spantypes.SpanMapperGroup) error {
return module.store.CreateSpanMapperGroup(ctx, group)
}
func (m *module) UpdateGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID, updatedBy string, group *spantypes.SpanMapperGroup) error {
existing, err := m.store.GetSpanMapperGroup(ctx, orgID, id)
if err != nil {
return err
}
if group.Name != "" {
existing.Name = group.Name
}
if len(group.Condition.Attributes) > 0 || len(group.Condition.Resource) > 0 {
existing.Condition = group.Condition
}
existing.Enabled = group.Enabled
existing.UpdatedAt = time.Now()
existing.UpdatedBy = updatedBy
if err := m.store.UpdateSpanMapperGroup(ctx, existing); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
func (module *module) UpdateGroup(ctx context.Context, orgID, id valuer.UUID, group *spantypes.SpanMapperGroup) error {
return module.store.UpdateSpanMapperGroup(ctx, group)
}
func (m *module) DeleteGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID) error {
if err := m.store.DeleteSpanMapperGroup(ctx, orgID, id); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
func (module *module) DeleteGroup(ctx context.Context, orgID, id valuer.UUID) error {
return module.store.DeleteSpanMapperGroup(ctx, orgID, id)
}
func (m *module) ListMappers(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID) ([]*spantypes.SpanMapper, error) {
storables, err := m.store.ListSpanMappers(ctx, orgID, groupID)
if err != nil {
return nil, err
}
return spantypes.NewSpanMappersFromStorableSpanMappers(storables), nil
func (module *module) ListMappers(ctx context.Context, orgID, groupID valuer.UUID) ([]*spantypes.SpanMapper, error) {
return module.store.ListSpanMappers(ctx, orgID, groupID)
}
func (m *module) GetMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID) (*spantypes.SpanMapper, error) {
s, err := m.store.GetSpanMapper(ctx, orgID, groupID, id)
if err != nil {
return nil, err
}
return spantypes.NewSpanMapperFromStorable(s), nil
func (module *module) GetMapper(ctx context.Context, orgID, groupID, id valuer.UUID) (*spantypes.SpanMapper, error) {
return module.store.GetSpanMapper(ctx, orgID, groupID, id)
}
func (m *module) CreateMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, createdBy string, mapper *spantypes.SpanMapper) error {
func (module *module) CreateMapper(ctx context.Context, orgID, groupID valuer.UUID, mapper *spantypes.SpanMapper) error {
// Ensure the group belongs to the org before inserting the child row.
if _, err := m.store.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
if _, err := module.store.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
return err
}
now := time.Now()
mapper.ID = valuer.GenerateUUID()
mapper.GroupID = groupID
mapper.CreatedAt = now
mapper.UpdatedAt = now
mapper.CreatedBy = createdBy
mapper.UpdatedBy = createdBy
storable := &spantypes.StorableSpanMapper{
Identifiable: types.Identifiable{ID: mapper.ID},
TimeAuditable: types.TimeAuditable{CreatedAt: now, UpdatedAt: now},
UserAuditable: types.UserAuditable{CreatedBy: createdBy, UpdatedBy: createdBy},
GroupID: groupID,
Name: mapper.Name,
FieldContext: mapper.FieldContext,
Config: mapper.Config,
Enabled: mapper.Enabled,
}
if err := m.store.CreateSpanMapper(ctx, storable); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
return module.store.CreateSpanMapper(ctx, mapper)
}
func (m *module) UpdateMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID, updatedBy string, mapper *spantypes.SpanMapper) error {
existing, err := m.store.GetSpanMapper(ctx, orgID, groupID, id)
if err != nil {
func (module *module) UpdateMapper(ctx context.Context, orgID, groupID, id valuer.UUID, mapper *spantypes.SpanMapper) error {
if _, err := module.store.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
return err
}
if mapper.FieldContext != (spantypes.FieldContext{}) {
existing.FieldContext = mapper.FieldContext
}
if mapper.Config.Sources != nil {
existing.Config = mapper.Config
}
existing.Enabled = mapper.Enabled
existing.UpdatedAt = time.Now()
existing.UpdatedBy = updatedBy
if err := m.store.UpdateSpanMapper(ctx, existing); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
return module.store.UpdateSpanMapper(ctx, mapper)
}
func (m *module) DeleteMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID) error {
if err := m.store.DeleteSpanMapper(ctx, orgID, groupID, id); err != nil {
return err
}
agentConf.NotifyConfigUpdate(ctx)
return nil
func (module *module) DeleteMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error {
return module.store.DeleteSpanMapper(ctx, orgID, groupID, id)
}

View File

@@ -18,19 +18,45 @@ func NewStore(sqlstore sqlstore.SQLStore) spantypes.Store {
return &store{sqlstore: sqlstore}
}
func (s *store) ListSpanMapperGroups(ctx context.Context, orgID valuer.UUID, q *spantypes.ListSpanMapperGroupsQuery) ([]*spantypes.StorableSpanMapperGroup, error) {
groups := make([]*spantypes.StorableSpanMapperGroup, 0)
func (s *store) CreateSpanMapperGroup(ctx context.Context, group *spantypes.SpanMapperGroup) error {
storable := spantypes.NewStorableSpanMapperGroupFromGroup(group)
_, err := s.sqlstore.
BunDBCtx(ctx).
NewInsert().
Model(storable).
Exec(ctx)
if err != nil {
return s.sqlstore.WrapAlreadyExistsErrf(err, spantypes.ErrCodeMappingGroupAlreadyExists, "span mapper group %q already exists", group.Name)
}
return nil
}
func (s *store) GetSpanMapperGroup(ctx context.Context, orgID, id valuer.UUID) (*spantypes.SpanMapperGroup, error) {
storable := new(spantypes.StorableSpanMapperGroup)
err := s.sqlstore.
BunDB().
NewSelect().
Model(storable).
Where("org_id = ?", orgID).
Where("id = ?", id).
Scan(ctx)
if err != nil {
return nil, s.sqlstore.WrapNotFoundErrf(err, spantypes.ErrCodeMappingGroupNotFound, "span mapper group %s not found", id)
}
return spantypes.NewSpanMapperGroupFromStorable(storable), nil
}
func (s *store) ListSpanMapperGroups(ctx context.Context, orgID valuer.UUID, q *spantypes.ListSpanMapperGroupsQuery) ([]*spantypes.SpanMapperGroup, error) {
storables := make([]*spantypes.StorableSpanMapperGroup, 0)
sel := s.sqlstore.
BunDB().
NewSelect().
Model(&groups).
Model(&storables).
Where("org_id = ?", orgID)
if q != nil {
if q.Category != nil {
sel = sel.Where("category = ?", valuer.String(*q.Category).StringValue())
}
if q.Enabled != nil {
sel = sel.Where("enabled = ?", *q.Enabled)
}
@@ -39,48 +65,17 @@ func (s *store) ListSpanMapperGroups(ctx context.Context, orgID valuer.UUID, q *
if err := sel.Order("created_at DESC").Scan(ctx); err != nil {
return nil, err
}
return groups, nil
return spantypes.NewSpanMapperGroupsFromStorableGroups(storables), nil
}
func (s *store) GetSpanMapperGroup(ctx context.Context, orgID, id valuer.UUID) (*spantypes.StorableSpanMapperGroup, error) {
group := new(spantypes.StorableSpanMapperGroup)
err := s.sqlstore.
BunDB().
NewSelect().
Model(group).
Where("org_id = ?", orgID).
Where("id = ?", id).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return nil, s.sqlstore.WrapNotFoundErrf(err, spantypes.ErrCodeMappingGroupNotFound, "span mapper group %s not found", id)
}
return nil, err
}
return group, nil
}
func (s *store) CreateSpanMapperGroup(ctx context.Context, group *spantypes.StorableSpanMapperGroup) error {
_, err := s.sqlstore.
BunDBCtx(ctx).
NewInsert().
Model(group).
Exec(ctx)
if err != nil {
return s.sqlstore.WrapAlreadyExistsErrf(err, spantypes.ErrCodeMappingGroupAlreadyExists, "span mapper group %q already exists", group.Name)
}
return nil
}
func (s *store) UpdateSpanMapperGroup(ctx context.Context, group *spantypes.StorableSpanMapperGroup) error {
func (s *store) UpdateSpanMapperGroup(ctx context.Context, group *spantypes.SpanMapperGroup) error {
storable := spantypes.NewStorableSpanMapperGroupFromGroup(group)
res, err := s.sqlstore.
BunDBCtx(ctx).
NewUpdate().
Model(group).
Where("org_id = ?", group.OrgID).
Where("id = ?", group.ID).
ExcludeColumn("id", "org_id", "created_at", "created_by").
Model(storable).
Where("org_id = ?", storable.OrgID).
Where("id = ?", storable.ID).
Exec(ctx)
if err != nil {
return err
@@ -131,54 +126,12 @@ func (s *store) DeleteSpanMapperGroup(ctx context.Context, orgID, id valuer.UUID
return tx.Commit()
}
func (s *store) ListSpanMappers(ctx context.Context, orgID, groupID valuer.UUID) ([]*spantypes.StorableSpanMapper, error) {
mappers := make([]*spantypes.StorableSpanMapper, 0)
// Scope by org via the parent group's org_id.
if _, err := s.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
return nil, err
}
if err := s.sqlstore.
BunDB().
NewSelect().
Model(&mappers).
Where("group_id = ?", groupID).
Order("created_at DESC").
Scan(ctx); err != nil {
return nil, err
}
return mappers, nil
}
func (s *store) GetSpanMapper(ctx context.Context, orgID, groupID, id valuer.UUID) (*spantypes.StorableSpanMapper, error) {
// Ensure the group belongs to the org.
if _, err := s.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
return nil, err
}
mapper := new(spantypes.StorableSpanMapper)
err := s.sqlstore.
BunDB().
NewSelect().
Model(mapper).
Where("group_id = ?", groupID).
Where("id = ?", id).
Scan(ctx)
if err != nil {
if err == sql.ErrNoRows {
return nil, s.sqlstore.WrapNotFoundErrf(err, spantypes.ErrCodeMapperNotFound, "span mapper %s not found", id)
}
return nil, err
}
return mapper, nil
}
func (s *store) CreateSpanMapper(ctx context.Context, mapper *spantypes.StorableSpanMapper) error {
func (s *store) CreateSpanMapper(ctx context.Context, mapper *spantypes.SpanMapper) error {
storable := spantypes.NewStorableSpanMapperFromMapper(mapper)
_, err := s.sqlstore.
BunDBCtx(ctx).
NewInsert().
Model(mapper).
Model(storable).
Exec(ctx)
if err != nil {
return s.sqlstore.WrapAlreadyExistsErrf(err, spantypes.ErrCodeMapperAlreadyExists, "span mapper %q already exists", mapper.Name)
@@ -186,14 +139,56 @@ func (s *store) CreateSpanMapper(ctx context.Context, mapper *spantypes.Storable
return nil
}
func (s *store) UpdateSpanMapper(ctx context.Context, mapper *spantypes.StorableSpanMapper) error {
func (s *store) GetSpanMapper(ctx context.Context, orgID, groupID, id valuer.UUID) (*spantypes.SpanMapper, error) {
// Ensure the group belongs to the org.
if _, err := s.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
return nil, err
}
storable := new(spantypes.StorableSpanMapper)
err := s.sqlstore.
BunDB().
NewSelect().
Model(storable).
Where("group_id = ?", groupID).
Where("id = ?", id).
Scan(ctx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, s.sqlstore.WrapNotFoundErrf(err, spantypes.ErrCodeMapperNotFound, "span mapper %s not found", id)
}
return nil, err
}
return spantypes.NewSpanMapperFromStorable(storable), nil
}
func (s *store) ListSpanMappers(ctx context.Context, orgID, groupID valuer.UUID) ([]*spantypes.SpanMapper, error) {
// Scope by org via the parent group's org_id.
if _, err := s.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
return nil, err
}
storables := make([]*spantypes.StorableSpanMapper, 0)
if err := s.sqlstore.
BunDB().
NewSelect().
Model(&storables).
Where("group_id = ?", groupID).
Order("created_at DESC").
Scan(ctx); err != nil {
return nil, err
}
return spantypes.NewSpanMappersFromStorableSpanMappers(storables), nil
}
func (s *store) UpdateSpanMapper(ctx context.Context, mapper *spantypes.SpanMapper) error {
storable := spantypes.NewStorableSpanMapperFromMapper(mapper)
res, err := s.sqlstore.
BunDBCtx(ctx).
NewUpdate().
Model(mapper).
Where("group_id = ?", mapper.GroupID).
Where("id = ?", mapper.ID).
ExcludeColumn("id", "group_id", "created_at", "created_by").
Model(storable).
Where("group_id = ?", storable.GroupID).
Where("id = ?", storable.ID).
Exec(ctx)
if err != nil {
return err

View File

@@ -12,17 +12,17 @@ import (
type Module interface {
// Group operations
ListGroups(ctx context.Context, orgID valuer.UUID, q *spantypes.ListSpanMapperGroupsQuery) ([]*spantypes.SpanMapperGroup, error)
GetGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID) (*spantypes.SpanMapperGroup, error)
CreateGroup(ctx context.Context, orgID valuer.UUID, createdBy string, group *spantypes.SpanMapperGroup) error
UpdateGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID, updatedBy string, group *spantypes.SpanMapperGroup) error
DeleteGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID) error
GetGroup(ctx context.Context, orgID, id valuer.UUID) (*spantypes.SpanMapperGroup, error)
CreateGroup(ctx context.Context, orgID valuer.UUID, group *spantypes.SpanMapperGroup) error
UpdateGroup(ctx context.Context, orgID, id valuer.UUID, group *spantypes.SpanMapperGroup) error
DeleteGroup(ctx context.Context, orgID, id valuer.UUID) error
// Mapper operations
ListMappers(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID) ([]*spantypes.SpanMapper, error)
GetMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID) (*spantypes.SpanMapper, error)
CreateMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, createdBy string, mapper *spantypes.SpanMapper) error
UpdateMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID, updatedBy string, mapper *spantypes.SpanMapper) error
DeleteMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID) error
ListMappers(ctx context.Context, orgID, groupID valuer.UUID) ([]*spantypes.SpanMapper, error)
GetMapper(ctx context.Context, orgID, groupID, id valuer.UUID) (*spantypes.SpanMapper, error)
CreateMapper(ctx context.Context, orgID, groupID valuer.UUID, mapper *spantypes.SpanMapper) error
UpdateMapper(ctx context.Context, orgID, groupID, id valuer.UUID, mapper *spantypes.SpanMapper) error
DeleteMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error
}
// Handler defines the HTTP handler interface for mapping group and mapper endpoints.

View File

@@ -9,7 +9,6 @@ import (
"github.com/SigNoz/signoz/pkg/cache/memorycache"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/spanmapper/implspanmapper"
"github.com/SigNoz/signoz/pkg/queryparser"
"github.com/gorilla/handlers"
@@ -130,14 +129,11 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
opAmpModel.Init(signoz.SQLStore, signoz.Instrumentation.Logger(), signoz.Modules.OrgGetter)
spanAttrMappingFeature := implspanmapper.NewSpanAttrMappingFeature(signoz.Modules.SpanMapper)
agentConfMgr, err := agentConf.Initiate(
&agentConf.ManagerOptions{
Store: signoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{
logParsingPipelineController,
spanAttrMappingFeature,
},
},
)

View File

@@ -47,7 +47,6 @@ func (migration *addSpanMapper) Up(ctx context.Context, db *bun.DB) error {
{Name: "updated_by", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "org_id", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "name", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "category", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "condition", DataType: sqlschema.DataTypeText, Nullable: false},
{Name: "enabled", DataType: sqlschema.DataTypeBoolean, Nullable: false, Default: "true"},
},

View File

@@ -1,6 +1,8 @@
package spantypes
import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -69,9 +71,9 @@ type PostableSpanMapper struct {
// UpdatableSpanMapper is the HTTP request body for updating a span mapper.
// All fields are optional; only non-nil fields are applied.
type UpdatableSpanMapper struct {
FieldContext FieldContext `json:"field_context,omitempty"`
Config *SpanMapperConfig `json:"config,omitempty"`
Enabled *bool `json:"enabled,omitempty"`
FieldContext *FieldContext `json:"field_context" nullable:"true"`
Config *SpanMapperConfig `json:"config" nullable:"true"`
Enabled *bool `json:"enabled" nullable:"true"`
}
type GettableSpanMapper = SpanMapper
@@ -88,6 +90,53 @@ func (SpanMapperOperation) Enum() []any {
return []any{SpanMapperOperationMove, SpanMapperOperationCopy}
}
func NewSpanMapper(groupID valuer.UUID, createdBy string, p *PostableSpanMapper) *SpanMapper {
now := time.Now()
return &SpanMapper{
ID: valuer.GenerateUUID(),
GroupID: groupID,
Name: p.Name,
FieldContext: p.FieldContext,
Config: p.Config,
Enabled: p.Enabled,
TimeAuditable: types.TimeAuditable{
CreatedAt: now,
UpdatedAt: now,
},
UserAuditable: types.UserAuditable{
CreatedBy: createdBy,
UpdatedBy: createdBy,
},
}
}
func (m *SpanMapper) Update(u *UpdatableSpanMapper, updatedBy string) {
if u.FieldContext != nil {
m.FieldContext = *u.FieldContext
}
if u.Config != nil {
m.Config = *u.Config
}
if u.Enabled != nil {
m.Enabled = *u.Enabled
}
m.UpdatedAt = time.Now()
m.UpdatedBy = updatedBy
}
func NewStorableSpanMapperFromMapper(m *SpanMapper) *StorableSpanMapper {
return &StorableSpanMapper{
Identifiable: types.Identifiable{ID: m.ID},
TimeAuditable: m.TimeAuditable,
UserAuditable: m.UserAuditable,
GroupID: m.GroupID,
Name: m.Name,
FieldContext: m.FieldContext,
Config: m.Config,
Enabled: m.Enabled,
}
}
func NewSpanMapperFromStorable(s *StorableSpanMapper) *SpanMapper {
return &SpanMapper{
TimeAuditable: s.TimeAuditable,
@@ -101,29 +150,6 @@ func NewSpanMapperFromStorable(s *StorableSpanMapper) *SpanMapper {
}
}
func NewSpanMapperFromPostable(req *PostableSpanMapper) *SpanMapper {
return &SpanMapper{
Name: req.Name,
FieldContext: req.FieldContext,
Config: req.Config,
Enabled: req.Enabled,
}
}
func NewSpanMapperFromUpdatable(req *UpdatableSpanMapper) *SpanMapper {
m := &SpanMapper{}
if req.FieldContext != (FieldContext{}) {
m.FieldContext = req.FieldContext
}
if req.Config != nil {
m.Config = *req.Config
}
if req.Enabled != nil {
m.Enabled = *req.Enabled
}
return m
}
func NewSpanMappersFromStorableSpanMappers(ss []*StorableSpanMapper) []*SpanMapper {
mappers := make([]*SpanMapper, len(ss))
for i, s := range ss {

View File

@@ -1,6 +1,8 @@
package spantypes
import (
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -11,9 +13,6 @@ var (
ErrCodeMappingGroupAlreadyExists = errors.MustNewCode("span_attribute_mapping_group_already_exists")
)
// SpanMapperGroupCategory defaults will be llm, tool, agent but user can configure more as they want.
type SpanMapperGroupCategory valuer.String
// A group runs when any of the listed attribute/resource key patterns match.
type SpanMapperGroupCondition struct {
Attributes []string `json:"attributes" required:"true" nullable:"true"`
@@ -28,7 +27,6 @@ type SpanMapperGroup struct {
ID valuer.UUID `json:"id" required:"true"`
OrgID valuer.UUID `json:"orgId" required:"true"`
Name string `json:"name" required:"true"`
Category SpanMapperGroupCategory `json:"category" required:"true"`
Condition SpanMapperGroupCondition `json:"condition" required:"true"`
Enabled bool `json:"enabled" required:"true"`
}
@@ -38,7 +36,6 @@ type GettableSpanMapperGroup = SpanMapperGroup
type PostableSpanMapperGroup struct {
Name string `json:"name" required:"true"`
Category SpanMapperGroupCategory `json:"category" required:"true"`
Condition SpanMapperGroupCondition `json:"condition" required:"true"`
Enabled bool `json:"enabled"`
}
@@ -46,44 +43,39 @@ type PostableSpanMapperGroup struct {
// UpdatableSpanMapperGroup is the HTTP request body for updating a mapping group.
// All fields are optional; only non-nil fields are applied.
type UpdatableSpanMapperGroup struct {
Name *string `json:"name,omitempty"`
Condition *SpanMapperGroupCondition `json:"condition,omitempty"`
Enabled *bool `json:"enabled,omitempty"`
Name *string `json:"name" nullable:"true"`
Condition *SpanMapperGroupCondition `json:"condition" nullable:"true"`
Enabled *bool `json:"enabled" nullable:"true"`
}
type ListSpanMapperGroupsQuery struct {
Category *SpanMapperGroupCategory `query:"category"`
Enabled *bool `query:"enabled"`
Enabled *bool `query:"enabled"`
}
type GettableSpanMapperGroups struct {
Items []*GettableSpanMapperGroup `json:"items" required:"true" nullable:"false"`
}
func NewSpanMapperGroupFromStorable(s *StorableSpanMapperGroup) *SpanMapperGroup {
return &SpanMapperGroup{
TimeAuditable: s.TimeAuditable,
UserAuditable: s.UserAuditable,
ID: s.ID,
OrgID: s.OrgID,
Name: s.Name,
Category: s.Category,
Condition: s.Condition,
Enabled: s.Enabled,
}
}
func NewSpanMapperGroupFromPostable(p *PostableSpanMapperGroup) *SpanMapperGroup {
func NewSpanMapperGroup(orgID valuer.UUID, createdBy string, p *PostableSpanMapperGroup) *SpanMapperGroup {
now := time.Now()
return &SpanMapperGroup{
ID: valuer.GenerateUUID(),
OrgID: orgID,
Name: p.Name,
Category: p.Category,
Condition: p.Condition,
Enabled: p.Enabled,
TimeAuditable: types.TimeAuditable{
CreatedAt: now,
UpdatedAt: now,
},
UserAuditable: types.UserAuditable{
CreatedBy: createdBy,
UpdatedBy: createdBy,
},
}
}
func NewSpanMapperGroupFromUpdatable(u *UpdatableSpanMapperGroup) *SpanMapperGroup {
g := &SpanMapperGroup{}
func (g *SpanMapperGroup) Update(u *UpdatableSpanMapperGroup, updatedBy string) {
if u.Name != nil {
g.Name = *u.Name
}
@@ -93,7 +85,32 @@ func NewSpanMapperGroupFromUpdatable(u *UpdatableSpanMapperGroup) *SpanMapperGro
if u.Enabled != nil {
g.Enabled = *u.Enabled
}
return g
g.UpdatedAt = time.Now()
g.UpdatedBy = updatedBy
}
func NewStorableSpanMapperGroupFromGroup(g *SpanMapperGroup) *StorableSpanMapperGroup {
return &StorableSpanMapperGroup{
Identifiable: types.Identifiable{ID: g.ID},
TimeAuditable: g.TimeAuditable,
UserAuditable: g.UserAuditable,
OrgID: g.OrgID,
Name: g.Name,
Condition: g.Condition,
Enabled: g.Enabled,
}
}
func NewSpanMapperGroupFromStorable(s *StorableSpanMapperGroup) *SpanMapperGroup {
return &SpanMapperGroup{
TimeAuditable: s.TimeAuditable,
UserAuditable: s.UserAuditable,
ID: s.ID,
OrgID: s.OrgID,
Name: s.Name,
Condition: s.Condition,
Enabled: s.Enabled,
}
}
func NewSpanMapperGroupsFromStorableGroups(ss []*StorableSpanMapperGroup) []*SpanMapperGroup {

View File

@@ -1,23 +0,0 @@
package spantypes
type SpanMappingProcessorConfig struct {
Groups []SpanMappingGroup `yaml:"groups" json:"groups"`
}
type SpanMappingGroup struct {
ID string `yaml:"id" json:"id"`
ExistsAny SpanMappingExistsAny `yaml:"exists_any" json:"exists_any"`
Attributes []SpanMappingAttribute `yaml:"attributes" json:"attributes"`
}
type SpanMappingExistsAny struct {
Attributes []string `yaml:"attributes,omitempty" json:"attributes,omitempty"`
Resource []string `yaml:"resource,omitempty" json:"resource,omitempty"`
}
type SpanMappingAttribute struct {
Target string `yaml:"target" json:"target"`
Context string `yaml:"context,omitempty" json:"context,omitempty"`
Action string `yaml:"action,omitempty" json:"action,omitempty"`
Sources []string `yaml:"sources" json:"sources"`
}

View File

@@ -19,7 +19,6 @@ type StorableSpanMapperGroup struct {
OrgID valuer.UUID `bun:"org_id,type:text,notnull"`
Name string `bun:"name,type:text,notnull"`
Category SpanMapperGroupCategory `bun:"category,type:text,notnull"`
Condition SpanMapperGroupCondition `bun:"condition,type:jsonb,notnull"`
Enabled bool `bun:"enabled,notnull,default:true"`
}

View File

@@ -8,16 +8,16 @@ import (
type Store interface {
// Group operations
ListSpanMapperGroups(ctx context.Context, orgID valuer.UUID, q *ListSpanMapperGroupsQuery) ([]*StorableSpanMapperGroup, error)
GetSpanMapperGroup(ctx context.Context, orgID, id valuer.UUID) (*StorableSpanMapperGroup, error)
CreateSpanMapperGroup(ctx context.Context, group *StorableSpanMapperGroup) error
UpdateSpanMapperGroup(ctx context.Context, group *StorableSpanMapperGroup) error
ListSpanMapperGroups(ctx context.Context, orgID valuer.UUID, q *ListSpanMapperGroupsQuery) ([]*SpanMapperGroup, error)
GetSpanMapperGroup(ctx context.Context, orgID, id valuer.UUID) (*SpanMapperGroup, error)
CreateSpanMapperGroup(ctx context.Context, group *SpanMapperGroup) error
UpdateSpanMapperGroup(ctx context.Context, group *SpanMapperGroup) error
DeleteSpanMapperGroup(ctx context.Context, orgID, id valuer.UUID) error
// Mapper operations
ListSpanMappers(ctx context.Context, orgID, groupID valuer.UUID) ([]*StorableSpanMapper, error)
GetSpanMapper(ctx context.Context, orgID, groupID, id valuer.UUID) (*StorableSpanMapper, error)
CreateSpanMapper(ctx context.Context, mapper *StorableSpanMapper) error
UpdateSpanMapper(ctx context.Context, mapper *StorableSpanMapper) error
ListSpanMappers(ctx context.Context, orgID, groupID valuer.UUID) ([]*SpanMapper, error)
GetSpanMapper(ctx context.Context, orgID, groupID, id valuer.UUID) (*SpanMapper, error)
CreateSpanMapper(ctx context.Context, mapper *SpanMapper) error
UpdateSpanMapper(ctx context.Context, mapper *SpanMapper) error
DeleteSpanMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error
}