mirror of
https://github.com/SigNoz/signoz.git
synced 2026-04-28 06:30:33 +01:00
Compare commits
16 Commits
fix/recurr
...
issue_4361
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3a90c0ed66 | ||
|
|
8f531b3937 | ||
|
|
b0f4760b78 | ||
|
|
bb67754425 | ||
|
|
0d626185fa | ||
|
|
a281b108ce | ||
|
|
33480937d9 | ||
|
|
3b3c3dd421 | ||
|
|
2d6d343e68 | ||
|
|
4b58a16280 | ||
|
|
224f053c8d | ||
|
|
c4051791f9 | ||
|
|
ecb566dca3 | ||
|
|
118835b54d | ||
|
|
774eff4780 | ||
|
|
4e996c2be8 |
1
.go-version
Normal file
1
.go-version
Normal file
@@ -0,0 +1 @@
|
||||
1.25.7
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/cache/memorycache"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanmapper/implspanmapper"
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
|
||||
@@ -111,9 +112,11 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
|
||||
}
|
||||
|
||||
// initiate agent config handler
|
||||
spanAttrMappingFeature := implspanmapper.NewSpanAttrMappingFeature(signoz.Modules.SpanMapper)
|
||||
|
||||
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
|
||||
Store: signoz.SQLStore,
|
||||
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController},
|
||||
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController, spanAttrMappingFeature},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
85
pkg/modules/spanmapper/implspanmapper/agent_feature.go
Normal file
85
pkg/modules/spanmapper/implspanmapper/agent_feature.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package implspanmapper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/types/opamptypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/spantypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
const SpanAttrMappingFeatureType agentConf.AgentFeatureType = "span_attr_mapping"
|
||||
|
||||
// SpanAttrMappingFeature implements agentConf.AgentFeature. It reads enabled
|
||||
// mapping groups and mappers from the module and generates the
|
||||
// signozspanmappingprocessor config for deployment via OpAMP.
|
||||
type SpanAttrMappingFeature struct {
|
||||
module spanmapper.Module
|
||||
}
|
||||
|
||||
func NewSpanAttrMappingFeature(module spanmapper.Module) *SpanAttrMappingFeature {
|
||||
return &SpanAttrMappingFeature{module: module}
|
||||
}
|
||||
|
||||
func (f *SpanAttrMappingFeature) AgentFeatureType() agentConf.AgentFeatureType {
|
||||
return SpanAttrMappingFeatureType
|
||||
}
|
||||
|
||||
func (f *SpanAttrMappingFeature) RecommendAgentConfig(
|
||||
orgId valuer.UUID,
|
||||
currentConfYaml []byte,
|
||||
configVersion *opamptypes.AgentConfigVersion,
|
||||
) ([]byte, string, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
groups, err := f.getEnabled(ctx, orgId)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
updatedConf, err := generateCollectorConfigWithSpanMapping(currentConfYaml, groups)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
serialized, err := json.Marshal(groups)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
return updatedConf, string(serialized), nil
|
||||
}
|
||||
|
||||
// getEnabled returns enabled groups alongside their enabled mappers. Groups
|
||||
// with no enabled mappers are still included so the collector sees the
|
||||
// exists_any condition, even if the attributes list is empty.
|
||||
func (f *SpanAttrMappingFeature) getEnabled(ctx context.Context, orgId valuer.UUID) ([]enabledGroup, error) {
|
||||
if f.module == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
enabled := true
|
||||
groups, err := f.module.ListGroups(ctx, orgId, &spantypes.ListSpanMapperGroupsQuery{Enabled: &enabled})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := make([]enabledGroup, 0, len(groups))
|
||||
for _, g := range groups {
|
||||
mappers, err := f.module.ListMappers(ctx, orgId, g.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
enabledMappers := make([]*spantypes.SpanMapper, 0, len(mappers))
|
||||
for _, m := range mappers {
|
||||
if m.Enabled {
|
||||
enabledMappers = append(enabledMappers, m)
|
||||
}
|
||||
}
|
||||
out = append(out, enabledGroup{group: g, mappers: enabledMappers})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
137
pkg/modules/spanmapper/implspanmapper/collector_config.go
Normal file
137
pkg/modules/spanmapper/implspanmapper/collector_config.go
Normal file
@@ -0,0 +1,137 @@
|
||||
package implspanmapper
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/types/spantypes"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
const processorName = "signozspanmappingprocessor"
|
||||
|
||||
const (
|
||||
// Collector context values (see signozspanmappingprocessor.ContextAttributes/ContextResource).
|
||||
ctxAttributes = "attributes"
|
||||
ctxResource = "resource"
|
||||
|
||||
// Collector action values.
|
||||
actionCopy = "copy"
|
||||
actionMove = "move"
|
||||
|
||||
// Source key prefix the collector treats as "read from resource".
|
||||
resourcePrefix = "resource."
|
||||
)
|
||||
|
||||
// enabledGroup pairs an enabled group with its enabled mappers.
|
||||
type enabledGroup struct {
|
||||
group *spantypes.SpanMapperGroup
|
||||
mappers []*spantypes.SpanMapper
|
||||
}
|
||||
|
||||
// buildProcessorConfig converts enabled groups + mappers into the
|
||||
// signozspanmappingprocessor config shape.
|
||||
func buildProcessorConfig(groups []enabledGroup) *spantypes.SpanMappingProcessorConfig {
|
||||
out := make([]spantypes.SpanMappingGroup, 0, len(groups))
|
||||
|
||||
for _, eg := range groups {
|
||||
rules := make([]spantypes.SpanMappingAttribute, 0, len(eg.mappers))
|
||||
for _, m := range eg.mappers {
|
||||
rules = append(rules, buildAttributeRule(m))
|
||||
}
|
||||
|
||||
out = append(out, spantypes.SpanMappingGroup{
|
||||
ID: eg.group.Name,
|
||||
ExistsAny: spantypes.SpanMappingExistsAny{
|
||||
Attributes: eg.group.Condition.Attributes,
|
||||
Resource: eg.group.Condition.Resource,
|
||||
},
|
||||
Attributes: rules,
|
||||
})
|
||||
}
|
||||
|
||||
return &spantypes.SpanMappingProcessorConfig{Groups: out}
|
||||
}
|
||||
|
||||
// buildAttributeRule maps a single SpanMapper to a collector AttributeRule.
|
||||
// Sources are sorted by Priority DESC (highest-priority first), and read-from-
|
||||
// resource sources are encoded via the "resource." prefix. The rule-level
|
||||
// action is derived from the sources' operations (all sources within one
|
||||
// mapper are expected to share the same operation; the highest-priority
|
||||
// source's operation is used).
|
||||
func buildAttributeRule(m *spantypes.SpanMapper) spantypes.SpanMappingAttribute {
|
||||
sources := make([]spantypes.SpanMapperSource, len(m.Config.Sources))
|
||||
copy(sources, m.Config.Sources)
|
||||
sort.SliceStable(sources, func(i, j int) bool { return sources[i].Priority > sources[j].Priority })
|
||||
|
||||
keys := make([]string, 0, len(sources))
|
||||
for _, s := range sources {
|
||||
if s.Context == spantypes.FieldContextResource {
|
||||
keys = append(keys, resourcePrefix+s.Key)
|
||||
} else {
|
||||
keys = append(keys, s.Key)
|
||||
}
|
||||
}
|
||||
|
||||
action := actionCopy
|
||||
if len(sources) > 0 && sources[0].Operation == spantypes.SpanMapperOperationMove {
|
||||
action = actionMove
|
||||
}
|
||||
|
||||
ctx := ctxAttributes
|
||||
if m.FieldContext == spantypes.FieldContextResource {
|
||||
ctx = ctxResource
|
||||
}
|
||||
|
||||
return spantypes.SpanMappingAttribute{
|
||||
Target: m.Name,
|
||||
Context: ctx,
|
||||
Action: action,
|
||||
Sources: keys,
|
||||
}
|
||||
}
|
||||
|
||||
// generateCollectorConfigWithSpanMapping injects (or replaces) the
|
||||
// signozspanmappingprocessor block in the collector YAML. Pipeline wiring is
|
||||
// handled by the collector's baseline config, not here.
|
||||
func generateCollectorConfigWithSpanMapping(
|
||||
currentConfYaml []byte,
|
||||
groups []enabledGroup,
|
||||
) ([]byte, error) {
|
||||
// Empty input: nothing to inject into. Pass through unchanged so we don't
|
||||
// turn it into "null\n" or fail on yaml.v3's EOF.
|
||||
if len(bytes.TrimSpace(currentConfYaml)) == 0 {
|
||||
return currentConfYaml, nil
|
||||
}
|
||||
|
||||
var collectorConf map[string]any
|
||||
if err := yaml.Unmarshal(currentConfYaml, &collectorConf); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal collector config: %w", err)
|
||||
}
|
||||
if collectorConf == nil {
|
||||
collectorConf = map[string]any{}
|
||||
}
|
||||
|
||||
processors := map[string]any{}
|
||||
if collectorConf["processors"] != nil {
|
||||
if p, ok := collectorConf["processors"].(map[string]any); ok {
|
||||
processors = p
|
||||
}
|
||||
}
|
||||
|
||||
procConfig := buildProcessorConfig(groups)
|
||||
configBytes, err := yaml.Marshal(procConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal span attr mapping processor config: %w", err)
|
||||
}
|
||||
var configMap any
|
||||
if err := yaml.Unmarshal(configBytes, &configMap); err != nil {
|
||||
return nil, fmt.Errorf("failed to re-unmarshal span attr mapping processor config: %w", err)
|
||||
}
|
||||
|
||||
processors[processorName] = configMap
|
||||
collectorConf["processors"] = processors
|
||||
|
||||
return yaml.Marshal(collectorConf)
|
||||
}
|
||||
179
pkg/modules/spanmapper/implspanmapper/module.go
Normal file
179
pkg/modules/spanmapper/implspanmapper/module.go
Normal file
@@ -0,0 +1,179 @@
|
||||
package implspanmapper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/spantypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type module struct {
|
||||
store spantypes.Store
|
||||
}
|
||||
|
||||
func NewModule(store spantypes.Store) spanmapper.Module {
|
||||
return &module{store: store}
|
||||
}
|
||||
|
||||
func (m *module) ListGroups(ctx context.Context, orgID valuer.UUID, q *spantypes.ListSpanMapperGroupsQuery) ([]*spantypes.SpanMapperGroup, error) {
|
||||
storables, err := m.store.ListSpanMapperGroups(ctx, orgID, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return spantypes.NewSpanMapperGroupsFromStorableGroups(storables), nil
|
||||
}
|
||||
|
||||
func (m *module) GetGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID) (*spantypes.SpanMapperGroup, error) {
|
||||
s, err := m.store.GetSpanMapperGroup(ctx, orgID, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return spantypes.NewSpanMapperGroupFromStorable(s), nil
|
||||
}
|
||||
|
||||
func (m *module) CreateGroup(ctx context.Context, orgID valuer.UUID, createdBy string, group *spantypes.SpanMapperGroup) error {
|
||||
now := time.Now()
|
||||
group.ID = valuer.GenerateUUID()
|
||||
group.OrgID = orgID
|
||||
group.CreatedAt = now
|
||||
group.UpdatedAt = now
|
||||
group.CreatedBy = createdBy
|
||||
group.UpdatedBy = createdBy
|
||||
|
||||
storable := &spantypes.StorableSpanMapperGroup{
|
||||
Identifiable: types.Identifiable{ID: group.ID},
|
||||
TimeAuditable: types.TimeAuditable{CreatedAt: now, UpdatedAt: now},
|
||||
UserAuditable: types.UserAuditable{CreatedBy: createdBy, UpdatedBy: createdBy},
|
||||
OrgID: orgID,
|
||||
Name: group.Name,
|
||||
Category: group.Category,
|
||||
Condition: group.Condition,
|
||||
Enabled: group.Enabled,
|
||||
}
|
||||
|
||||
if err := m.store.CreateSpanMapperGroup(ctx, storable); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
agentConf.NotifyConfigUpdate(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *module) UpdateGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID, updatedBy string, group *spantypes.SpanMapperGroup) error {
|
||||
existing, err := m.store.GetSpanMapperGroup(ctx, orgID, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if group.Name != "" {
|
||||
existing.Name = group.Name
|
||||
}
|
||||
if len(group.Condition.Attributes) > 0 || len(group.Condition.Resource) > 0 {
|
||||
existing.Condition = group.Condition
|
||||
}
|
||||
existing.Enabled = group.Enabled
|
||||
existing.UpdatedAt = time.Now()
|
||||
existing.UpdatedBy = updatedBy
|
||||
|
||||
if err := m.store.UpdateSpanMapperGroup(ctx, existing); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
agentConf.NotifyConfigUpdate(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *module) DeleteGroup(ctx context.Context, orgID valuer.UUID, id valuer.UUID) error {
|
||||
if err := m.store.DeleteSpanMapperGroup(ctx, orgID, id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
agentConf.NotifyConfigUpdate(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *module) ListMappers(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID) ([]*spantypes.SpanMapper, error) {
|
||||
storables, err := m.store.ListSpanMappers(ctx, orgID, groupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return spantypes.NewSpanMappersFromStorableSpanMappers(storables), nil
|
||||
}
|
||||
|
||||
func (m *module) GetMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID) (*spantypes.SpanMapper, error) {
|
||||
s, err := m.store.GetSpanMapper(ctx, orgID, groupID, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return spantypes.NewSpanMapperFromStorable(s), nil
|
||||
}
|
||||
|
||||
func (m *module) CreateMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, createdBy string, mapper *spantypes.SpanMapper) error {
|
||||
// Ensure the group belongs to the org before inserting the child row.
|
||||
if _, err := m.store.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
mapper.ID = valuer.GenerateUUID()
|
||||
mapper.GroupID = groupID
|
||||
mapper.CreatedAt = now
|
||||
mapper.UpdatedAt = now
|
||||
mapper.CreatedBy = createdBy
|
||||
mapper.UpdatedBy = createdBy
|
||||
|
||||
storable := &spantypes.StorableSpanMapper{
|
||||
Identifiable: types.Identifiable{ID: mapper.ID},
|
||||
TimeAuditable: types.TimeAuditable{CreatedAt: now, UpdatedAt: now},
|
||||
UserAuditable: types.UserAuditable{CreatedBy: createdBy, UpdatedBy: createdBy},
|
||||
GroupID: groupID,
|
||||
Name: mapper.Name,
|
||||
FieldContext: mapper.FieldContext,
|
||||
Config: mapper.Config,
|
||||
Enabled: mapper.Enabled,
|
||||
}
|
||||
|
||||
if err := m.store.CreateSpanMapper(ctx, storable); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
agentConf.NotifyConfigUpdate(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *module) UpdateMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID, updatedBy string, mapper *spantypes.SpanMapper) error {
|
||||
existing, err := m.store.GetSpanMapper(ctx, orgID, groupID, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if mapper.FieldContext != (spantypes.FieldContext{}) {
|
||||
existing.FieldContext = mapper.FieldContext
|
||||
}
|
||||
if mapper.Config.Sources != nil {
|
||||
existing.Config = mapper.Config
|
||||
}
|
||||
existing.Enabled = mapper.Enabled
|
||||
existing.UpdatedAt = time.Now()
|
||||
existing.UpdatedBy = updatedBy
|
||||
|
||||
if err := m.store.UpdateSpanMapper(ctx, existing); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
agentConf.NotifyConfigUpdate(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *module) DeleteMapper(ctx context.Context, orgID valuer.UUID, groupID valuer.UUID, id valuer.UUID) error {
|
||||
if err := m.store.DeleteSpanMapper(ctx, orgID, groupID, id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
agentConf.NotifyConfigUpdate(ctx)
|
||||
return nil
|
||||
}
|
||||
236
pkg/modules/spanmapper/implspanmapper/store.go
Normal file
236
pkg/modules/spanmapper/implspanmapper/store.go
Normal file
@@ -0,0 +1,236 @@
|
||||
package implspanmapper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types/spantypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type store struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewStore(sqlstore sqlstore.SQLStore) spantypes.Store {
|
||||
return &store{sqlstore: sqlstore}
|
||||
}
|
||||
|
||||
func (s *store) ListSpanMapperGroups(ctx context.Context, orgID valuer.UUID, q *spantypes.ListSpanMapperGroupsQuery) ([]*spantypes.StorableSpanMapperGroup, error) {
|
||||
groups := make([]*spantypes.StorableSpanMapperGroup, 0)
|
||||
|
||||
sel := s.sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(&groups).
|
||||
Where("org_id = ?", orgID)
|
||||
|
||||
if q != nil {
|
||||
if q.Category != nil {
|
||||
sel = sel.Where("category = ?", valuer.String(*q.Category).StringValue())
|
||||
}
|
||||
if q.Enabled != nil {
|
||||
sel = sel.Where("enabled = ?", *q.Enabled)
|
||||
}
|
||||
}
|
||||
|
||||
if err := sel.Order("created_at DESC").Scan(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return groups, nil
|
||||
}
|
||||
|
||||
func (s *store) GetSpanMapperGroup(ctx context.Context, orgID, id valuer.UUID) (*spantypes.StorableSpanMapperGroup, error) {
|
||||
group := new(spantypes.StorableSpanMapperGroup)
|
||||
|
||||
err := s.sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(group).
|
||||
Where("org_id = ?", orgID).
|
||||
Where("id = ?", id).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, s.sqlstore.WrapNotFoundErrf(err, spantypes.ErrCodeMappingGroupNotFound, "span mapper group %s not found", id)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return group, nil
|
||||
}
|
||||
|
||||
func (s *store) CreateSpanMapperGroup(ctx context.Context, group *spantypes.StorableSpanMapperGroup) error {
|
||||
_, err := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewInsert().
|
||||
Model(group).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return s.sqlstore.WrapAlreadyExistsErrf(err, spantypes.ErrCodeMappingGroupAlreadyExists, "span mapper group %q already exists", group.Name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) UpdateSpanMapperGroup(ctx context.Context, group *spantypes.StorableSpanMapperGroup) error {
|
||||
res, err := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewUpdate().
|
||||
Model(group).
|
||||
Where("org_id = ?", group.OrgID).
|
||||
Where("id = ?", group.ID).
|
||||
ExcludeColumn("id", "org_id", "created_at", "created_by").
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return errors.Newf(errors.TypeNotFound, spantypes.ErrCodeMappingGroupNotFound, "span mapper group %s not found", group.ID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) DeleteSpanMapperGroup(ctx context.Context, orgID, id valuer.UUID) error {
|
||||
tx, err := s.sqlstore.BunDBCtx(ctx).BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
// Cascade: remove mappers belonging to this group first.
|
||||
if _, err := tx.NewDelete().
|
||||
Model((*spantypes.StorableSpanMapper)(nil)).
|
||||
Where("group_id = ?", id).
|
||||
Exec(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := tx.NewDelete().
|
||||
Model((*spantypes.StorableSpanMapperGroup)(nil)).
|
||||
Where("org_id = ?", orgID).
|
||||
Where("id = ?", id).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return errors.Newf(errors.TypeNotFound, spantypes.ErrCodeMappingGroupNotFound, "span mapper group %s not found", id)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (s *store) ListSpanMappers(ctx context.Context, orgID, groupID valuer.UUID) ([]*spantypes.StorableSpanMapper, error) {
|
||||
mappers := make([]*spantypes.StorableSpanMapper, 0)
|
||||
|
||||
// Scope by org via the parent group's org_id.
|
||||
if _, err := s.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := s.sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(&mappers).
|
||||
Where("group_id = ?", groupID).
|
||||
Order("created_at DESC").
|
||||
Scan(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return mappers, nil
|
||||
}
|
||||
|
||||
func (s *store) GetSpanMapper(ctx context.Context, orgID, groupID, id valuer.UUID) (*spantypes.StorableSpanMapper, error) {
|
||||
// Ensure the group belongs to the org.
|
||||
if _, err := s.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mapper := new(spantypes.StorableSpanMapper)
|
||||
err := s.sqlstore.
|
||||
BunDB().
|
||||
NewSelect().
|
||||
Model(mapper).
|
||||
Where("group_id = ?", groupID).
|
||||
Where("id = ?", id).
|
||||
Scan(ctx)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, s.sqlstore.WrapNotFoundErrf(err, spantypes.ErrCodeMapperNotFound, "span mapper %s not found", id)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return mapper, nil
|
||||
}
|
||||
|
||||
func (s *store) CreateSpanMapper(ctx context.Context, mapper *spantypes.StorableSpanMapper) error {
|
||||
_, err := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewInsert().
|
||||
Model(mapper).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return s.sqlstore.WrapAlreadyExistsErrf(err, spantypes.ErrCodeMapperAlreadyExists, "span mapper %q already exists", mapper.Name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) UpdateSpanMapper(ctx context.Context, mapper *spantypes.StorableSpanMapper) error {
|
||||
res, err := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewUpdate().
|
||||
Model(mapper).
|
||||
Where("group_id = ?", mapper.GroupID).
|
||||
Where("id = ?", mapper.ID).
|
||||
ExcludeColumn("id", "group_id", "created_at", "created_by").
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return errors.Newf(errors.TypeNotFound, spantypes.ErrCodeMapperNotFound, "span mapper %s not found", mapper.ID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) DeleteSpanMapper(ctx context.Context, orgID, groupID, id valuer.UUID) error {
|
||||
if _, err := s.GetSpanMapperGroup(ctx, orgID, groupID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := s.sqlstore.
|
||||
BunDBCtx(ctx).
|
||||
NewDelete().
|
||||
Model((*spantypes.StorableSpanMapper)(nil)).
|
||||
Where("group_id = ?", groupID).
|
||||
Where("id = ?", id).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return errors.Newf(errors.TypeNotFound, spantypes.ErrCodeMapperNotFound, "span mapper %s not found", id)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/cache/memorycache"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanmapper/implspanmapper"
|
||||
"github.com/SigNoz/signoz/pkg/queryparser"
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
@@ -129,11 +130,14 @@ func NewServer(config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
|
||||
|
||||
opAmpModel.Init(signoz.SQLStore, signoz.Instrumentation.Logger(), signoz.Modules.OrgGetter)
|
||||
|
||||
spanAttrMappingFeature := implspanmapper.NewSpanAttrMappingFeature(signoz.Modules.SpanMapper)
|
||||
|
||||
agentConfMgr, err := agentConf.Initiate(
|
||||
&agentConf.ManagerOptions{
|
||||
Store: signoz.SQLStore,
|
||||
AgentFeatures: []agentConf.AgentFeature{
|
||||
logParsingPipelineController,
|
||||
spanAttrMappingFeature,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
@@ -117,7 +117,7 @@ func NewHandlers(
|
||||
RegistryHandler: registryHandler,
|
||||
RuleStateHistory: implrulestatehistory.NewHandler(modules.RuleStateHistory),
|
||||
CloudIntegrationHandler: implcloudintegration.NewHandler(modules.CloudIntegration),
|
||||
SpanMapperHandler: implspanmapper.NewHandler(nil, providerSettings), // todo(nitya): will update this in future PR
|
||||
SpanMapperHandler: implspanmapper.NewHandler(modules.SpanMapper, providerSettings),
|
||||
AlertmanagerHandler: signozalertmanager.NewHandler(alertmanagerService),
|
||||
TraceDetail: impltracedetail.NewHandler(modules.TraceDetail),
|
||||
RulerHandler: signozruler.NewHandler(rulerService),
|
||||
|
||||
@@ -37,6 +37,8 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/modules/services/implservices"
|
||||
"github.com/SigNoz/signoz/pkg/modules/session"
|
||||
"github.com/SigNoz/signoz/pkg/modules/session/implsession"
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanmapper"
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanmapper/implspanmapper"
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanpercentile"
|
||||
"github.com/SigNoz/signoz/pkg/modules/spanpercentile/implspanpercentile"
|
||||
"github.com/SigNoz/signoz/pkg/modules/tracedetail"
|
||||
@@ -79,6 +81,7 @@ type Modules struct {
|
||||
CloudIntegration cloudintegration.Module
|
||||
RuleStateHistory rulestatehistory.Module
|
||||
TraceDetail tracedetail.Module
|
||||
SpanMapper spanmapper.Module
|
||||
}
|
||||
|
||||
func NewModules(
|
||||
@@ -131,5 +134,6 @@ func NewModules(
|
||||
RuleStateHistory: implrulestatehistory.NewModule(implrulestatehistory.NewStore(telemetryStore, telemetryMetadataStore, providerSettings.Logger)),
|
||||
CloudIntegration: cloudIntegrationModule,
|
||||
TraceDetail: impltracedetail.NewModule(impltracedetail.NewTraceStore(telemetryStore), providerSettings, config.TraceDetail),
|
||||
SpanMapper: implspanmapper.NewModule(implspanmapper.NewStore(sqlstore)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,6 +195,7 @@ func NewSQLMigrationProviderFactories(
|
||||
sqlmigration.NewServiceAccountAuthzactory(sqlstore),
|
||||
sqlmigration.NewDropUserDeletedAtFactory(sqlstore, sqlschema),
|
||||
sqlmigration.NewMigrateAWSAllRegionsFactory(sqlstore),
|
||||
sqlmigration.NewAddSpanMapperFactory(sqlstore, sqlschema),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
119
pkg/sqlmigration/082_add_span_mapper.go
Normal file
119
pkg/sqlmigration/082_add_span_mapper.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package sqlmigration
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/factory"
|
||||
"github.com/SigNoz/signoz/pkg/sqlschema"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/uptrace/bun"
|
||||
"github.com/uptrace/bun/migrate"
|
||||
)
|
||||
|
||||
type addSpanMapper struct {
|
||||
sqlschema sqlschema.SQLSchema
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewAddSpanMapperFactory(sqlstore sqlstore.SQLStore, sqlschema sqlschema.SQLSchema) factory.ProviderFactory[SQLMigration, Config] {
|
||||
return factory.NewProviderFactory(factory.MustNewName("add_span_mapper"), func(_ context.Context, _ factory.ProviderSettings, _ Config) (SQLMigration, error) {
|
||||
return &addSpanMapper{sqlschema: sqlschema, sqlstore: sqlstore}, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (migration *addSpanMapper) Register(migrations *migrate.Migrations) error {
|
||||
return migrations.Register(migration.Up, migration.Down)
|
||||
}
|
||||
|
||||
func (migration *addSpanMapper) Up(ctx context.Context, db *bun.DB) error {
|
||||
tx, err := db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = tx.Rollback()
|
||||
}()
|
||||
|
||||
sqls := [][]byte{}
|
||||
|
||||
groupSQLs := migration.sqlschema.Operator().CreateTable(&sqlschema.Table{
|
||||
Name: "span_mapper_group",
|
||||
Columns: []*sqlschema.Column{
|
||||
{Name: "id", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "created_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
|
||||
{Name: "updated_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
|
||||
{Name: "created_by", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "updated_by", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "org_id", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "name", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "category", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "condition", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "enabled", DataType: sqlschema.DataTypeBoolean, Nullable: false, Default: "true"},
|
||||
},
|
||||
PrimaryKeyConstraint: &sqlschema.PrimaryKeyConstraint{
|
||||
ColumnNames: []sqlschema.ColumnName{"id"},
|
||||
},
|
||||
ForeignKeyConstraints: []*sqlschema.ForeignKeyConstraint{
|
||||
{
|
||||
ReferencingColumnName: sqlschema.ColumnName("org_id"),
|
||||
ReferencedTableName: sqlschema.TableName("organizations"),
|
||||
ReferencedColumnName: sqlschema.ColumnName("id"),
|
||||
},
|
||||
},
|
||||
})
|
||||
sqls = append(sqls, groupSQLs...)
|
||||
|
||||
groupIdxSQLs := migration.sqlschema.Operator().CreateIndex(
|
||||
&sqlschema.UniqueIndex{
|
||||
TableName: "span_mapper_group",
|
||||
ColumnNames: []sqlschema.ColumnName{"org_id", "name"},
|
||||
})
|
||||
sqls = append(sqls, groupIdxSQLs...)
|
||||
|
||||
mapperSQLs := migration.sqlschema.Operator().CreateTable(&sqlschema.Table{
|
||||
Name: "span_mapper",
|
||||
Columns: []*sqlschema.Column{
|
||||
{Name: "id", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "created_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
|
||||
{Name: "updated_at", DataType: sqlschema.DataTypeTimestamp, Nullable: false},
|
||||
{Name: "created_by", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "updated_by", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "group_id", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "name", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "field_context", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "config", DataType: sqlschema.DataTypeText, Nullable: false},
|
||||
{Name: "enabled", DataType: sqlschema.DataTypeBoolean, Nullable: false, Default: "true"},
|
||||
},
|
||||
PrimaryKeyConstraint: &sqlschema.PrimaryKeyConstraint{
|
||||
ColumnNames: []sqlschema.ColumnName{"id"},
|
||||
},
|
||||
ForeignKeyConstraints: []*sqlschema.ForeignKeyConstraint{
|
||||
{
|
||||
ReferencingColumnName: sqlschema.ColumnName("group_id"),
|
||||
ReferencedTableName: sqlschema.TableName("span_mapper_group"),
|
||||
ReferencedColumnName: sqlschema.ColumnName("id"),
|
||||
},
|
||||
},
|
||||
})
|
||||
sqls = append(sqls, mapperSQLs...)
|
||||
|
||||
mapperIdxSQLs := migration.sqlschema.Operator().CreateIndex(
|
||||
&sqlschema.UniqueIndex{
|
||||
TableName: "span_mapper",
|
||||
ColumnNames: []sqlschema.ColumnName{"group_id", "name"},
|
||||
})
|
||||
sqls = append(sqls, mapperIdxSQLs...)
|
||||
|
||||
for _, sql := range sqls {
|
||||
if _, err := tx.ExecContext(ctx, string(sql)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (migration *addSpanMapper) Down(context.Context, *bun.DB) error {
|
||||
return nil
|
||||
}
|
||||
@@ -11,7 +11,9 @@ import (
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
var ErrCodeInvalidPlannedMaintenancePayload = errors.MustNewCode("invalid_planned_maintenance_payload")
|
||||
var (
|
||||
ErrCodeInvalidPlannedMaintenancePayload = errors.MustNewCode("invalid_planned_maintenance_payload")
|
||||
)
|
||||
|
||||
type MaintenanceStatus struct {
|
||||
valuer.String
|
||||
@@ -61,15 +63,15 @@ type StorablePlannedMaintenance struct {
|
||||
}
|
||||
|
||||
type PlannedMaintenance struct {
|
||||
ID valuer.UUID `json:"id" required:"true"`
|
||||
Name string `json:"name" required:"true"`
|
||||
Description string `json:"description"`
|
||||
Schedule *Schedule `json:"schedule" required:"true"`
|
||||
RuleIDs []string `json:"alertIds"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
CreatedBy string `json:"createdBy"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
UpdatedBy string `json:"updatedBy"`
|
||||
ID valuer.UUID `json:"id" required:"true"`
|
||||
Name string `json:"name" required:"true"`
|
||||
Description string `json:"description"`
|
||||
Schedule *Schedule `json:"schedule" required:"true"`
|
||||
RuleIDs []string `json:"alertIds"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
CreatedBy string `json:"createdBy"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
UpdatedBy string `json:"updatedBy"`
|
||||
Status MaintenanceStatus `json:"status" required:"true"`
|
||||
Kind MaintenanceKind `json:"kind" required:"true"`
|
||||
}
|
||||
@@ -159,11 +161,8 @@ func (m *PlannedMaintenance) ShouldSkip(ruleID string, now time.Time) bool {
|
||||
|
||||
currentTime := now.In(loc)
|
||||
|
||||
// fixed schedule — only when no recurrence is configured.
|
||||
// When recurrence is set, the recurring check below handles everything;
|
||||
// falling through here would cause the window to match the absolute
|
||||
// StartTime–EndTime range instead of the daily/weekly/monthly pattern.
|
||||
if m.Schedule.Recurrence == nil && !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
|
||||
// fixed schedule
|
||||
if !m.Schedule.StartTime.IsZero() && !m.Schedule.EndTime.IsZero() {
|
||||
startTime := m.Schedule.StartTime.In(loc)
|
||||
endTime := m.Schedule.EndTime.In(loc)
|
||||
if currentTime.Equal(startTime) || currentTime.Equal(endTime) ||
|
||||
|
||||
@@ -13,6 +13,7 @@ func timePtr(t time.Time) *time.Time {
|
||||
}
|
||||
|
||||
func TestShouldSkipMaintenance(t *testing.T) {
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
maintenance *PlannedMaintenance
|
||||
@@ -498,7 +499,7 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 4, 1, 12, 10, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 04, 1, 12, 10, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
{
|
||||
@@ -507,14 +508,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeWeekly,
|
||||
RepeatOn: []RepeatOn{RepeatOnMonday},
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 4, 15, 12, 10, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 04, 15, 12, 10, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
{
|
||||
@@ -523,14 +524,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeWeekly,
|
||||
RepeatOn: []RepeatOn{RepeatOnMonday},
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 4, 14, 12, 10, 0, 0, time.UTC), // 14th 04 is sunday
|
||||
ts: time.Date(2024, 04, 14, 12, 10, 0, 0, time.UTC), // 14th 04 is sunday
|
||||
skip: false,
|
||||
},
|
||||
{
|
||||
@@ -539,14 +540,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeWeekly,
|
||||
RepeatOn: []RepeatOn{RepeatOnMonday},
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 4, 16, 12, 10, 0, 0, time.UTC), // 16th 04 is tuesday
|
||||
ts: time.Date(2024, 04, 16, 12, 10, 0, 0, time.UTC), // 16th 04 is tuesday
|
||||
skip: false,
|
||||
},
|
||||
{
|
||||
@@ -555,14 +556,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeWeekly,
|
||||
RepeatOn: []RepeatOn{RepeatOnMonday},
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 5, 6, 12, 10, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 05, 06, 12, 10, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
{
|
||||
@@ -571,14 +572,14 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 1, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 01, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeWeekly,
|
||||
RepeatOn: []RepeatOn{RepeatOnMonday},
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 5, 6, 14, 0, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 05, 06, 14, 00, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
{
|
||||
@@ -587,13 +588,13 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 4, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeMonthly,
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 4, 4, 12, 10, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 04, 04, 12, 10, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
{
|
||||
@@ -602,13 +603,13 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 4, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeMonthly,
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 4, 4, 14, 10, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 04, 04, 14, 10, 0, 0, time.UTC),
|
||||
skip: false,
|
||||
},
|
||||
{
|
||||
@@ -617,52 +618,13 @@ func TestShouldSkipMaintenance(t *testing.T) {
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2024, 4, 4, 12, 0, 0, 0, time.UTC),
|
||||
StartTime: time.Date(2024, 04, 04, 12, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeMonthly,
|
||||
},
|
||||
},
|
||||
},
|
||||
ts: time.Date(2024, 5, 4, 12, 10, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
// The recurrence should govern, when set. Not the fixed range.
|
||||
{
|
||||
name: "recurring-daily-with-fixed-times-outside-daily-window",
|
||||
maintenance: &PlannedMaintenance{
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
// These fixed fields should be ignored when Recurrence is set.
|
||||
StartTime: time.Date(2026, 4, 1, 10, 0, 0, 0, time.UTC),
|
||||
EndTime: time.Date(2026, 4, 30, 18, 0, 0, 0, time.UTC),
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2026, 4, 1, 14, 0, 0, 0, time.UTC), // daily at 14:00
|
||||
Duration: valuer.MustParseTextDuration("2h"), // until 16:00
|
||||
RepeatType: RepeatTypeDaily,
|
||||
},
|
||||
},
|
||||
},
|
||||
// 11:00 is inside the fixed range but outside the daily 14:00-16:00 window.
|
||||
// Before the fix this returned true (bug); after fix it returns false.
|
||||
ts: time.Date(2026, 4, 15, 11, 0, 0, 0, time.UTC),
|
||||
skip: false,
|
||||
},
|
||||
{
|
||||
name: "recurring-daily-with-fixed-times-inside-daily-window",
|
||||
maintenance: &PlannedMaintenance{
|
||||
Schedule: &Schedule{
|
||||
Timezone: "UTC",
|
||||
StartTime: time.Date(2026, 4, 1, 10, 0, 0, 0, time.UTC),
|
||||
EndTime: time.Date(2026, 4, 30, 18, 0, 0, 0, time.UTC),
|
||||
Recurrence: &Recurrence{
|
||||
StartTime: time.Date(2026, 4, 1, 14, 0, 0, 0, time.UTC),
|
||||
Duration: valuer.MustParseTextDuration("2h"),
|
||||
RepeatType: RepeatTypeDaily,
|
||||
},
|
||||
},
|
||||
},
|
||||
// 15:00 is inside the daily 14:00-16:00 window — should skip.
|
||||
ts: time.Date(2026, 4, 15, 15, 0, 0, 0, time.UTC),
|
||||
ts: time.Date(2024, 05, 04, 12, 10, 0, 0, time.UTC),
|
||||
skip: true,
|
||||
},
|
||||
}
|
||||
|
||||
23
pkg/types/spantypes/processor_config.go
Normal file
23
pkg/types/spantypes/processor_config.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package spantypes
|
||||
|
||||
type SpanMappingProcessorConfig struct {
|
||||
Groups []SpanMappingGroup `yaml:"groups" json:"groups"`
|
||||
}
|
||||
|
||||
type SpanMappingGroup struct {
|
||||
ID string `yaml:"id" json:"id"`
|
||||
ExistsAny SpanMappingExistsAny `yaml:"exists_any" json:"exists_any"`
|
||||
Attributes []SpanMappingAttribute `yaml:"attributes" json:"attributes"`
|
||||
}
|
||||
|
||||
type SpanMappingExistsAny struct {
|
||||
Attributes []string `yaml:"attributes,omitempty" json:"attributes,omitempty"`
|
||||
Resource []string `yaml:"resource,omitempty" json:"resource,omitempty"`
|
||||
}
|
||||
|
||||
type SpanMappingAttribute struct {
|
||||
Target string `yaml:"target" json:"target"`
|
||||
Context string `yaml:"context,omitempty" json:"context,omitempty"`
|
||||
Action string `yaml:"action,omitempty" json:"action,omitempty"`
|
||||
Sources []string `yaml:"sources" json:"sources"`
|
||||
}
|
||||
Reference in New Issue
Block a user