Compare commits

..

1 Commits

Author SHA1 Message Date
nityanandagohain
a9d33e83a2 chore: module ref 2026-02-26 18:29:28 +05:30
10 changed files with 790 additions and 72 deletions

View File

@@ -0,0 +1,164 @@
package implawsprovider
import (
"context"
"slices"
"github.com/SigNoz/signoz/pkg/modules/cloudintegrations"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
)
var _ cloudintegrations.CloudProvider = (*AWSProvider)(nil)
type AWSProvider struct {
store integrationtypes.Store
}
func NewAWSProvider(store integrationtypes.Store) *AWSProvider {
return &AWSProvider{store: store}
}
func (a *AWSProvider) AgentCheckIn(ctx context.Context, req *cloudintegrations.PostableAgentCheckInPayload) (any, error) {
// if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
// return nil, apiErr
// }
// existingAccount, apiErr := c.accountsRepo.get(ctx, orgId, cloudProvider, req.ID)
// if existingAccount != nil && existingAccount.AccountID != nil && *existingAccount.AccountID != req.AccountID {
// return nil, model.BadRequest(fmt.Errorf(
// "can't check in with new %s account id %s for account %s with existing %s id %s",
// cloudProvider, req.AccountID, existingAccount.ID.StringValue(), cloudProvider, *existingAccount.AccountID,
// ))
// }
// existingAccount, apiErr = c.accountsRepo.getConnectedCloudAccount(ctx, orgId, cloudProvider, req.AccountID)
// if existingAccount != nil && existingAccount.ID.StringValue() != req.ID {
// return nil, model.BadRequest(fmt.Errorf(
// "can't check in to %s account %s with id %s. already connected with id %s",
// cloudProvider, req.AccountID, req.ID, existingAccount.ID.StringValue(),
// ))
// }
// agentReport := types.AgentReport{
// TimestampMillis: time.Now().UnixMilli(),
// Data: req.Data,
// }
// account, apiErr := c.accountsRepo.upsert(
// ctx, orgId, cloudProvider, &req.ID, nil, &req.AccountID, &agentReport, nil,
// )
// if apiErr != nil {
// return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
// }
// // prepare and return integration config to be consumed by agent
// compiledStrategy, err := NewCompiledCollectionStrategy(cloudProvider)
// if err != nil {
// return nil, model.InternalError(fmt.Errorf(
// "couldn't init telemetry collection strategy: %w", err,
// ))
// }
// agentConfig := IntegrationConfigForAgent{
// EnabledRegions: []string{},
// TelemetryCollectionStrategy: compiledStrategy,
// }
// if account.Config != nil && account.Config.EnabledRegions != nil {
// agentConfig.EnabledRegions = account.Config.EnabledRegions
// }
// services, err := services.Map(cloudProvider)
// if err != nil {
// return nil, err
// }
// svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount(
// ctx, orgId, account.ID.StringValue(),
// )
// if apiErr != nil {
// return nil, model.WrapApiError(
// apiErr, "couldn't get service configs for cloud account",
// )
// }
// // accumulate config in a fixed order to ensure same config generated across runs
// configuredServices := maps.Keys(svcConfigs)
// slices.Sort(configuredServices)
// for _, svcType := range configuredServices {
// definition, ok := services[svcType]
// if !ok {
// continue
// }
// config := svcConfigs[svcType]
// err := AddServiceStrategy(svcType, compiledStrategy, definition.Strategy, config)
// if err != nil {
// return nil, err
// }
// }
// return &AgentCheckInResponse{
// AccountId: account.ID.StringValue(),
// CloudAccountId: *account.AccountID,
// RemovedAt: account.RemovedAt,
// IntegrationConfig: agentConfig,
// }, nil
}
func (a *AWSProvider) ListServices(ctx context.Context, orgID string, cloudAccountID *string) (any, error) {
svcConfigs := make(map[string]*integrationtypes.AWSServiceConfig)
if cloudAccountID != nil {
activeAccount, err := a.store.GetConnectedCloudAccount(ctx, orgID, a.GetName().String(), *cloudAccountID)
if err != nil {
return nil, err
}
serviceConfigs, err := a.ServiceConfigRepo.GetAllForAccount(ctx, orgID, activeAccount.ID.String())
if err != nil {
return nil, err
}
for svcType, config := range serviceConfigs {
serviceConfig := new(integrationtypes.AWSServiceConfig)
err = integrationtypes.UnmarshalJSON(config, serviceConfig)
if err != nil {
return nil, err
}
svcConfigs[svcType] = serviceConfig
}
}
summaries := make([]integrationtypes.AWSServiceSummary, 0)
definitions, err := a.ServiceDefinitions.ListServiceDefinitions(ctx)
if err != nil {
return nil, err
}
for _, def := range definitions {
summary := integrationtypes.AWSServiceSummary{
DefinitionMetadata: def.DefinitionMetadata,
Config: nil,
}
summary.Config = svcConfigs[summary.Id]
summaries = append(summaries, summary)
}
slices.SortFunc(summaries, func(a, b integrationtypes.AWSServiceSummary) int {
if a.DefinitionMetadata.Title < b.DefinitionMetadata.Title {
return -1
}
if a.DefinitionMetadata.Title > b.DefinitionMetadata.Title {
return 1
}
return 0
})
return &integrationtypes.GettableAWSServices{
Services: summaries,
}, nil
}

View File

@@ -0,0 +1,36 @@
package implcloudintegrations
import (
"context"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/modules/cloudintegrations"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
)
type module struct {
store integrationtypes.Store
providers map[integrationtypes.CloudProviderType]cloudintegrations.CloudProvider
}
func NewModule(store integrationtypes.Store, providers map[integrationtypes.CloudProviderType]cloudintegrations.CloudProvider) cloudintegrations.Module {
return &module{store: store}
}
func (m *module) ListServices(ctx context.Context, orgID string, cloudProvider string, cloudAccountId *string) (any, error) {
provider, err := m.getProvider(cloudProvider)
if err != nil {
return nil, err
}
return provider.ListServices(ctx, orgID, cloudAccountId)
}
func (m *module) getProvider(cloudProvider integrationtypes.CloudProviderType) (cloudintegrations.CloudProvider, error) {
provider, ok := m.providers[cloudProvider]
if !ok {
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid cloud provider: %s", cloudProvider)
}
return provider, nil
}

View File

@@ -286,5 +286,6 @@
"brace-expansion": "^2.0.2",
"on-headers": "^1.1.0",
"tmp": "0.2.4"
}
},
"packageManager": "yarn@1.22.22+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e"
}

View File

@@ -0,0 +1,27 @@
package cloudintegrations
import (
"context"
"net/http"
)
// start with moving the agent functions here to get review
// type Module interface {
// AgentCheckIn(http.ResponseWriter, *http.Request)
// }
type Handler interface {
AgentCheckIn(http.ResponseWriter, *http.Request)
ListServices(http.ResponseWriter, *http.Request)
}
type Module interface {
AgentCheckIn(ctx context.Context, req *PostableAgentCheckInPayload) (any, error)
ListServices(ctx context.Context, orgID string, cloudProvider string, cloudAccountId *string) (any, error)
}
// store interface will be in the types package
type CloudProvider interface {
ListServices(ctx context.Context, orgID string, cloudAccountId *string) (any, error)
}

View File

@@ -0,0 +1,99 @@
package implcloudintergations
import (
"context"
"encoding/json"
"net/http"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/http/render"
"github.com/SigNoz/signoz/pkg/modules/cloudintegrations"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/gorilla/mux"
)
type handler struct {
module cloudintegrations.Module
}
func NewHandler(module cloudintegrations.Module) *handler {
return &handler{
module: module,
}
}
func (h *handler) CloudIntegrationsAgentCheckIn(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(ctx)
if err != nil {
render.Error(rw, err)
return
}
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(rw, err)
return
}
req := new(integrationtypes.PostableAgentCheckInPayload)
if err = json.NewDecoder(r.Body).Decode(req); err != nil {
render.Error(rw, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid request body"))
return
}
req.OrgID = claims.OrgID
// we need to get the config
resp, err := h.cloudIntegrationsRegistry[cloudProvider].AgentCheckIn(r.Context(), req)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, resp)
}
func (h *handler) ListServices(rw http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
render.Error(rw, err)
return
}
cloudProviderString := mux.Vars(r)["cloudProvider"]
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
if err != nil {
render.Error(rw, err)
return
}
var cloudAccountId *string
cloudAccountIdQP := r.URL.Query().Get("cloud_account_id")
if len(cloudAccountIdQP) > 0 {
cloudAccountId = &cloudAccountIdQP
}
// give me the provider and then use it
resp, err := h.module.ListServices(ctx, claims.OrgID, cloudProvider, cloudAccountId)
if err != nil {
render.Error(rw, err)
return
}
render.Success(rw, http.StatusOK, resp)
}

View File

@@ -0,0 +1,175 @@
package implcloudintergations
import (
"context"
"database/sql"
"fmt"
"log/slog"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
type store struct {
sqlstore sqlstore.SQLStore
}
func NewStore(sqlstore sqlstore.SQLStore) integrationtypes.Store {
return &store{sqlstore: sqlstore}
}
func (s *store) ListConnected(
ctx context.Context, orgId string, cloudProvider string,
) ([]integrationtypes.CloudIntegration, error) {
accounts := []integrationtypes.CloudIntegration{}
err := s.sqlstore.BunDB().NewSelect().
Model(&accounts).
Where("org_id = ?", orgId).
Where("provider = ?", cloudProvider).
Where("removed_at is NULL").
Where("account_id is not NULL").
Where("last_agent_report is not NULL").
Order("created_at").
Scan(ctx)
if err != nil {
slog.ErrorContext(ctx, "error querying connected cloud accounts", "error", err)
return nil, errors.WrapInternalf(err, errors.CodeInternal, "could not query connected cloud accounts")
}
return accounts, nil
}
func (s *store) Get(
ctx context.Context, orgId string, provider string, id string,
) (*integrationtypes.CloudIntegration, error) {
var result integrationtypes.CloudIntegration
err := s.sqlstore.BunDB().NewSelect().
Model(&result).
Where("org_id = ?", orgId).
Where("provider = ?", provider).
Where("id = ?", id).
Scan(ctx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, s.sqlstore.WrapNotFoundErrf(
err,
integrationtypes.ErrCodeCloudIntegrationAccountNotFound,
"couldn't find account with Id %s", id,
)
}
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query cloud provider account")
}
return &result, nil
}
func (s *store) GetConnectedCloudAccount(
ctx context.Context, orgId string, provider string, accountId string,
) (*integrationtypes.CloudIntegration, error) {
var result integrationtypes.CloudIntegration
err := s.sqlstore.BunDB().NewSelect().
Model(&result).
Where("org_id = ?", orgId).
Where("provider = ?", provider).
Where("account_id = ?", accountId).
Where("last_agent_report is not NULL").
Where("removed_at is NULL").
Scan(ctx)
if errors.Is(err, sql.ErrNoRows) {
return nil, s.sqlstore.WrapNotFoundErrf(err, integrationtypes.ErrCodeCloudIntegrationAccountNotFound, "couldn't find connected cloud account %s", accountId)
} else if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query cloud provider account")
}
return &result, nil
}
func (s *store) Upsert(
ctx context.Context,
orgId string,
provider string,
id *string,
config []byte,
accountId *string,
agentReport *integrationtypes.AgentReport,
removedAt *time.Time,
) (*integrationtypes.CloudIntegration, error) {
if id == nil {
temp := valuer.GenerateUUID().StringValue()
id = &temp
}
onConflictSetStmts := []string{}
setColStatement := func(col string) string {
return fmt.Sprintf("%s=excluded.%s", col, col)
}
if config != nil {
onConflictSetStmts = append(onConflictSetStmts, setColStatement("config"))
}
if accountId != nil {
onConflictSetStmts = append(onConflictSetStmts, setColStatement("account_id"))
}
if agentReport != nil {
onConflictSetStmts = append(onConflictSetStmts, setColStatement("last_agent_report"))
}
if removedAt != nil {
onConflictSetStmts = append(onConflictSetStmts, setColStatement("removed_at"))
}
onConflictSetStmts = append(onConflictSetStmts, setColStatement("updated_at"))
onConflictClause := ""
if len(onConflictSetStmts) > 0 {
onConflictClause = fmt.Sprintf(
"conflict(id, provider, org_id) do update SET\n%s",
strings.Join(onConflictSetStmts, ",\n"),
)
}
integration := integrationtypes.CloudIntegration{
OrgID: orgId,
Provider: provider,
Identifiable: types.Identifiable{ID: valuer.MustNewUUID(*id)},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
Config: string(config),
AccountID: accountId,
LastAgentReport: agentReport,
RemovedAt: removedAt,
}
_, err := s.sqlstore.BunDB().NewInsert().
Model(&integration).
On(onConflictClause).
Exec(ctx)
if err != nil {
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't upsert cloud integration account")
}
upsertedAccount, err := s.Get(ctx, orgId, provider, *id)
if err != nil {
slog.ErrorContext(ctx, "error upserting cloud integration account", "error", err)
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't get upserted cloud integration account")
}
return upsertedAccount, nil
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz-otel-collector/utils"
@@ -61,10 +62,15 @@ var (
}
)
type fieldMapper struct {}
type fieldMapper struct {
evolutionMetadataStore qbtypes.KeyEvolutionMetadataStore
}
func NewFieldMapper() qbtypes.FieldMapper {
return &fieldMapper{}
func NewFieldMapper(evolutionMetadataStore qbtypes.KeyEvolutionMetadataStore) qbtypes.FieldMapper {
// this can take evolution metadata as an argument and store it in the field mapper
return &fieldMapper{
evolutionMetadataStore: evolutionMetadataStore,
}
}
func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
switch key.FieldContext {
@@ -150,12 +156,17 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
default:
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String)
}
case schema.ColumnTypeEnumLowCardinality:
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
case schema.ColumnTypeEnumString:
return column.Name, nil
default:
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
baseColumn := logsV2Columns["resources_string"]
tsStartTime := time.Unix(0, int64(tsStart))
// Check all evolutions for this key to see if any were released after tsStart.
// If so, it means the new column wasn't available yet at tsStart, so we need to check the old column.
evolutions := m.evolutionMetadataStore.Get(baseColumn.Name)
// restricting now to just one entry where we know we changes from map to json
if len(evolutions) > 0 && evolutions[0].ReleaseTime.After(tsStartTime) {
return fmt.Sprintf("%s.`%s`::String", column.Name, key.Name), nil
}
case schema.ColumnTypeEnumString,
schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:

View File

@@ -3,6 +3,7 @@ package telemetrylogs
import (
"context"
"testing"
"time"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
@@ -11,6 +12,38 @@ import (
"github.com/stretchr/testify/require"
)
// mockKeyEvolutionMetadataStore is a mock implementation of KeyEvolutionMetadataStore for testing
type mockKeyEvolutionMetadataStore struct {
metadata map[string][]*qbtypes.KeyEvolutionMetadataKey
}
func newMockKeyEvolutionMetadataStore() *mockKeyEvolutionMetadataStore {
return &mockKeyEvolutionMetadataStore{
metadata: make(map[string][]*qbtypes.KeyEvolutionMetadataKey),
}
}
func (m *mockKeyEvolutionMetadataStore) Get(keyName string) []*qbtypes.KeyEvolutionMetadataKey {
if m.metadata == nil {
return nil
}
keys, exists := m.metadata[keyName]
if !exists {
return nil
}
// Return a copy to prevent external modification
result := make([]*qbtypes.KeyEvolutionMetadataKey, len(keys))
copy(result, keys)
return result
}
func (m *mockKeyEvolutionMetadataStore) Add(keyName string, key *qbtypes.KeyEvolutionMetadataKey) {
if m.metadata == nil {
m.metadata = make(map[string][]*qbtypes.KeyEvolutionMetadataKey)
}
m.metadata[keyName] = append(m.metadata[keyName], key)
}
func TestGetColumn(t *testing.T) {
ctx := context.Background()
@@ -164,7 +197,8 @@ func TestGetColumn(t *testing.T) {
},
}
fm := NewFieldMapper()
mockStore := newMockKeyEvolutionMetadataStore()
fm := NewFieldMapper(mockStore)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@@ -189,45 +223,45 @@ func TestGetFieldKeyName(t *testing.T) {
expectedResult string
expectedError error
}{
{
name: "Simple column type - timestamp",
key: telemetrytypes.TelemetryFieldKey{
Name: "timestamp",
FieldContext: telemetrytypes.FieldContextLog,
},
expectedResult: "timestamp",
expectedError: nil,
},
{
name: "Map column type - string attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "user.id",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeString,
},
expectedResult: "attributes_string['user.id']",
expectedError: nil,
},
{
name: "Map column type - number attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.size",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeNumber,
},
expectedResult: "attributes_number['request.size']",
expectedError: nil,
},
{
name: "Map column type - bool attribute",
key: telemetrytypes.TelemetryFieldKey{
Name: "request.success",
FieldContext: telemetrytypes.FieldContextAttribute,
FieldDataType: telemetrytypes.FieldDataTypeBool,
},
expectedResult: "attributes_bool['request.success']",
expectedError: nil,
},
// {
// name: "Simple column type - timestamp",
// key: telemetrytypes.TelemetryFieldKey{
// Name: "timestamp",
// FieldContext: telemetrytypes.FieldContextLog,
// },
// expectedResult: "timestamp",
// expectedError: nil,
// },
// {
// name: "Map column type - string attribute",
// key: telemetrytypes.TelemetryFieldKey{
// Name: "user.id",
// FieldContext: telemetrytypes.FieldContextAttribute,
// FieldDataType: telemetrytypes.FieldDataTypeString,
// },
// expectedResult: "attributes_string['user.id']",
// expectedError: nil,
// },
// {
// name: "Map column type - number attribute",
// key: telemetrytypes.TelemetryFieldKey{
// Name: "request.size",
// FieldContext: telemetrytypes.FieldContextAttribute,
// FieldDataType: telemetrytypes.FieldDataTypeNumber,
// },
// expectedResult: "attributes_number['request.size']",
// expectedError: nil,
// },
// {
// name: "Map column type - bool attribute",
// key: telemetrytypes.TelemetryFieldKey{
// Name: "request.success",
// FieldContext: telemetrytypes.FieldContextAttribute,
// FieldDataType: telemetrytypes.FieldDataTypeBool,
// },
// expectedResult: "attributes_bool['request.success']",
// expectedError: nil,
// },
{
name: "Map column type - resource attribute",
key: telemetrytypes.TelemetryFieldKey{
@@ -237,32 +271,168 @@ func TestGetFieldKeyName(t *testing.T) {
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
expectedError: nil,
},
{
name: "Map column type - resource attribute - Materialized",
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
FieldDataType: telemetrytypes.FieldDataTypeString,
Materialized: true,
},
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, `resource_string_service$$name_exists`==true, `resource_string_service$$name`, NULL)",
expectedError: nil,
},
{
name: "Non-existent column",
key: telemetrytypes.TelemetryFieldKey{
Name: "nonexistent_field",
FieldContext: telemetrytypes.FieldContextLog,
},
expectedResult: "",
expectedError: qbtypes.ErrColumnNotFound,
},
// {
// name: "Map column type - resource attribute - Materialized",
// key: telemetrytypes.TelemetryFieldKey{
// Name: "service.name",
// FieldContext: telemetrytypes.FieldContextResource,
// FieldDataType: telemetrytypes.FieldDataTypeString,
// Materialized: true,
// },
// expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, `resource_string_service$$name_exists`==true, `resource_string_service$$name`, NULL)",
// expectedError: nil,
// },
// {
// name: "Map column type - resource attribute - json",
// tsStart: uint64(time.Now().Add(10 * time.Second).UnixNano()),
// tsEnd: uint64(time.Now().Add(20 * time.Second).UnixNano()),
// key: telemetrytypes.TelemetryFieldKey{
// Name: "service.name",
// FieldContext: telemetrytypes.FieldContextResource,
// },
// expectedResult: "resource.`service.name`::String",
// expectedError: nil,
// },
// {
// name: "Map column type - resource attribute - Materialized - json",
// tsStart: uint64(time.Now().Add(10 * time.Second).UnixNano()),
// tsEnd: uint64(time.Now().Add(20 * time.Second).UnixNano()),
// key: telemetrytypes.TelemetryFieldKey{
// Name: "service.name",
// FieldContext: telemetrytypes.FieldContextResource,
// FieldDataType: telemetrytypes.FieldDataTypeString,
// Materialized: true,
// },
// expectedResult: "resource.`service.name`::String",
// expectedError: nil,
// },
// {
// name: "Non-existent column",
// key: telemetrytypes.TelemetryFieldKey{
// Name: "nonexistent_field",
// FieldContext: telemetrytypes.FieldContextLog,
// },
// expectedResult: "",
// expectedError: qbtypes.ErrColumnNotFound,
// },
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fm := NewFieldMapper()
result, err := fm.FieldFor(ctx, &tc.key)
mockStore := newMockKeyEvolutionMetadataStore()
fm := NewFieldMapper(mockStore)
result, err := fm.FieldFor(ctx, tc.tsStart, tc.tsEnd, &tc.key)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)
} else {
require.NoError(t, err)
assert.Equal(t, tc.expectedResult, result)
}
})
}
}
func TestFieldForWithEvolutionMetadata(t *testing.T) {
ctx := context.Background()
// Create a test release time
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
releaseTimeNano := uint64(releaseTime.UnixNano())
testCases := []struct {
name string
tsStart uint64
tsEnd uint64
key telemetrytypes.TelemetryFieldKey
setupMock func(*mockKeyEvolutionMetadataStore)
expectedResult string
expectedError error
}{
{
name: "Resource attribute - tsStart before release time (use new JSON column only)",
tsStart: releaseTimeNano - uint64(24*time.Hour.Nanoseconds()), // 1 day before release
tsEnd: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
},
setupMock: func(m *mockKeyEvolutionMetadataStore) {
m.Add("resources_string", &qbtypes.KeyEvolutionMetadataKey{
BaseColumn: "resources_string",
BaseColumnType: "Map(LowCardinality(String), String)",
NewColumn: "resource",
NewColumnType: "JSON(max_dynamic_paths=100)",
ReleaseTime: releaseTime,
})
},
expectedResult: "resource.`service.name`::String",
expectedError: nil,
},
{
name: "Resource attribute - tsStart after release time (use fallback with multiIf)",
tsStart: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()), // 1 day after release
tsEnd: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
},
setupMock: func(m *mockKeyEvolutionMetadataStore) {
m.Add("resources_string", &qbtypes.KeyEvolutionMetadataKey{
BaseColumn: "resources_string",
BaseColumnType: "Map(LowCardinality(String), String)",
NewColumn: "resource",
NewColumnType: "JSON(max_dynamic_paths=100)",
ReleaseTime: releaseTime,
})
},
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
expectedError: nil,
},
{
name: "Resource attribute - no evolution metadata (use fallback with multiIf)",
tsStart: releaseTimeNano,
tsEnd: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
},
setupMock: func(m *mockKeyEvolutionMetadataStore) {
// No metadata added - empty mock
},
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
expectedError: nil,
},
{
name: "Resource attribute - tsStart exactly at release time (use fallback with multiIf)",
tsStart: releaseTimeNano,
tsEnd: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
key: telemetrytypes.TelemetryFieldKey{
Name: "service.name",
FieldContext: telemetrytypes.FieldContextResource,
},
setupMock: func(m *mockKeyEvolutionMetadataStore) {
m.Add("resources_string", &qbtypes.KeyEvolutionMetadataKey{
BaseColumn: "resources_string",
BaseColumnType: "Map(LowCardinality(String), String)",
NewColumn: "resource",
NewColumnType: "JSON(max_dynamic_paths=100)",
ReleaseTime: releaseTime,
})
},
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
expectedError: nil,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockStore := newMockKeyEvolutionMetadataStore()
if tc.setupMock != nil {
tc.setupMock(mockStore)
}
fm := NewFieldMapper(mockStore)
result, err := fm.FieldFor(ctx, tc.tsStart, tc.tsEnd, &tc.key)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)

View File

@@ -0,0 +1,21 @@
package integrationtypes
import (
"context"
"time"
"github.com/SigNoz/signoz/pkg/errors"
)
var (
ErrCodeCloudIntegrationAccountNotFound errors.Code = errors.MustNewCode("cloud_integration_account_not_found")
)
// CloudIntegrationAccountsStore defines the interface for cloud integration accounts persistence.
type Store interface {
ListConnected(ctx context.Context, orgId string, provider string) ([]CloudIntegration, error)
Get(ctx context.Context, orgId string, provider string, id string) (*CloudIntegration, error)
GetConnectedCloudAccount(ctx context.Context, orgId, provider string, accountID string) (*CloudIntegration, error)
// Upsert inserts an account or updates it by (cloudProvider, id) for specified non-empty fields.
Upsert(ctx context.Context, orgId string, provider string, id *string, config []byte, accountId *string, agentReport *AgentReport, removedAt *time.Time) (*CloudIntegration, error)
}

View File

@@ -2,6 +2,7 @@ package querybuildertypesv5
import (
"context"
"time"
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
"github.com/SigNoz/signoz/pkg/errors"
@@ -59,3 +60,16 @@ type TraceOperatorStatementBuilder interface {
// Build builds the trace operator query.
Build(ctx context.Context, start, end uint64, requestType RequestType, query QueryBuilderTraceOperator, compositeQuery *CompositeQuery) (*Statement, error)
}
type KeyEvolutionMetadataKey struct {
BaseColumn string
BaseColumnType string
NewColumn string
NewColumnType string
ReleaseTime time.Time
}
type KeyEvolutionMetadataStore interface {
Get(keyName string) []*KeyEvolutionMetadataKey
Add(keyName string, key *KeyEvolutionMetadataKey)
}