mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-27 10:42:53 +00:00
Compare commits
1 Commits
feat/cloud
...
nitya/clou
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9d33e83a2 |
164
ee/modules/cloudintegrations/implawsprovider/provider.go
Normal file
164
ee/modules/cloudintegrations/implawsprovider/provider.go
Normal file
@@ -0,0 +1,164 @@
|
||||
package implawsprovider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegrations"
|
||||
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
|
||||
)
|
||||
|
||||
var _ cloudintegrations.CloudProvider = (*AWSProvider)(nil)
|
||||
|
||||
type AWSProvider struct {
|
||||
store integrationtypes.Store
|
||||
}
|
||||
|
||||
func NewAWSProvider(store integrationtypes.Store) *AWSProvider {
|
||||
return &AWSProvider{store: store}
|
||||
}
|
||||
|
||||
func (a *AWSProvider) AgentCheckIn(ctx context.Context, req *cloudintegrations.PostableAgentCheckInPayload) (any, error) {
|
||||
// if apiErr := validateCloudProviderName(cloudProvider); apiErr != nil {
|
||||
// return nil, apiErr
|
||||
// }
|
||||
|
||||
// existingAccount, apiErr := c.accountsRepo.get(ctx, orgId, cloudProvider, req.ID)
|
||||
// if existingAccount != nil && existingAccount.AccountID != nil && *existingAccount.AccountID != req.AccountID {
|
||||
// return nil, model.BadRequest(fmt.Errorf(
|
||||
// "can't check in with new %s account id %s for account %s with existing %s id %s",
|
||||
// cloudProvider, req.AccountID, existingAccount.ID.StringValue(), cloudProvider, *existingAccount.AccountID,
|
||||
// ))
|
||||
// }
|
||||
|
||||
// existingAccount, apiErr = c.accountsRepo.getConnectedCloudAccount(ctx, orgId, cloudProvider, req.AccountID)
|
||||
// if existingAccount != nil && existingAccount.ID.StringValue() != req.ID {
|
||||
// return nil, model.BadRequest(fmt.Errorf(
|
||||
// "can't check in to %s account %s with id %s. already connected with id %s",
|
||||
// cloudProvider, req.AccountID, req.ID, existingAccount.ID.StringValue(),
|
||||
// ))
|
||||
// }
|
||||
|
||||
// agentReport := types.AgentReport{
|
||||
// TimestampMillis: time.Now().UnixMilli(),
|
||||
// Data: req.Data,
|
||||
// }
|
||||
|
||||
// account, apiErr := c.accountsRepo.upsert(
|
||||
// ctx, orgId, cloudProvider, &req.ID, nil, &req.AccountID, &agentReport, nil,
|
||||
// )
|
||||
// if apiErr != nil {
|
||||
// return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
|
||||
// }
|
||||
|
||||
// // prepare and return integration config to be consumed by agent
|
||||
// compiledStrategy, err := NewCompiledCollectionStrategy(cloudProvider)
|
||||
// if err != nil {
|
||||
// return nil, model.InternalError(fmt.Errorf(
|
||||
// "couldn't init telemetry collection strategy: %w", err,
|
||||
// ))
|
||||
// }
|
||||
|
||||
// agentConfig := IntegrationConfigForAgent{
|
||||
// EnabledRegions: []string{},
|
||||
// TelemetryCollectionStrategy: compiledStrategy,
|
||||
// }
|
||||
|
||||
// if account.Config != nil && account.Config.EnabledRegions != nil {
|
||||
// agentConfig.EnabledRegions = account.Config.EnabledRegions
|
||||
// }
|
||||
|
||||
// services, err := services.Map(cloudProvider)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
// svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount(
|
||||
// ctx, orgId, account.ID.StringValue(),
|
||||
// )
|
||||
// if apiErr != nil {
|
||||
// return nil, model.WrapApiError(
|
||||
// apiErr, "couldn't get service configs for cloud account",
|
||||
// )
|
||||
// }
|
||||
|
||||
// // accumulate config in a fixed order to ensure same config generated across runs
|
||||
// configuredServices := maps.Keys(svcConfigs)
|
||||
// slices.Sort(configuredServices)
|
||||
|
||||
// for _, svcType := range configuredServices {
|
||||
// definition, ok := services[svcType]
|
||||
// if !ok {
|
||||
// continue
|
||||
// }
|
||||
// config := svcConfigs[svcType]
|
||||
|
||||
// err := AddServiceStrategy(svcType, compiledStrategy, definition.Strategy, config)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// }
|
||||
|
||||
// return &AgentCheckInResponse{
|
||||
// AccountId: account.ID.StringValue(),
|
||||
// CloudAccountId: *account.AccountID,
|
||||
// RemovedAt: account.RemovedAt,
|
||||
// IntegrationConfig: agentConfig,
|
||||
// }, nil
|
||||
}
|
||||
|
||||
func (a *AWSProvider) ListServices(ctx context.Context, orgID string, cloudAccountID *string) (any, error) {
|
||||
svcConfigs := make(map[string]*integrationtypes.AWSServiceConfig)
|
||||
if cloudAccountID != nil {
|
||||
activeAccount, err := a.store.GetConnectedCloudAccount(ctx, orgID, a.GetName().String(), *cloudAccountID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
serviceConfigs, err := a.ServiceConfigRepo.GetAllForAccount(ctx, orgID, activeAccount.ID.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for svcType, config := range serviceConfigs {
|
||||
serviceConfig := new(integrationtypes.AWSServiceConfig)
|
||||
err = integrationtypes.UnmarshalJSON(config, serviceConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
svcConfigs[svcType] = serviceConfig
|
||||
}
|
||||
}
|
||||
|
||||
summaries := make([]integrationtypes.AWSServiceSummary, 0)
|
||||
|
||||
definitions, err := a.ServiceDefinitions.ListServiceDefinitions(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, def := range definitions {
|
||||
summary := integrationtypes.AWSServiceSummary{
|
||||
DefinitionMetadata: def.DefinitionMetadata,
|
||||
Config: nil,
|
||||
}
|
||||
|
||||
summary.Config = svcConfigs[summary.Id]
|
||||
|
||||
summaries = append(summaries, summary)
|
||||
}
|
||||
|
||||
slices.SortFunc(summaries, func(a, b integrationtypes.AWSServiceSummary) int {
|
||||
if a.DefinitionMetadata.Title < b.DefinitionMetadata.Title {
|
||||
return -1
|
||||
}
|
||||
if a.DefinitionMetadata.Title > b.DefinitionMetadata.Title {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
|
||||
return &integrationtypes.GettableAWSServices{
|
||||
Services: summaries,
|
||||
}, nil
|
||||
}
|
||||
36
ee/modules/cloudintegrations/implcloudintegrations/module.go
Normal file
36
ee/modules/cloudintegrations/implcloudintegrations/module.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package implcloudintegrations
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegrations"
|
||||
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
|
||||
)
|
||||
|
||||
type module struct {
|
||||
store integrationtypes.Store
|
||||
providers map[integrationtypes.CloudProviderType]cloudintegrations.CloudProvider
|
||||
}
|
||||
|
||||
func NewModule(store integrationtypes.Store, providers map[integrationtypes.CloudProviderType]cloudintegrations.CloudProvider) cloudintegrations.Module {
|
||||
return &module{store: store}
|
||||
}
|
||||
|
||||
func (m *module) ListServices(ctx context.Context, orgID string, cloudProvider string, cloudAccountId *string) (any, error) {
|
||||
|
||||
provider, err := m.getProvider(cloudProvider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return provider.ListServices(ctx, orgID, cloudAccountId)
|
||||
}
|
||||
|
||||
func (m *module) getProvider(cloudProvider integrationtypes.CloudProviderType) (cloudintegrations.CloudProvider, error) {
|
||||
provider, ok := m.providers[cloudProvider]
|
||||
if !ok {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid cloud provider: %s", cloudProvider)
|
||||
}
|
||||
return provider, nil
|
||||
}
|
||||
@@ -286,5 +286,6 @@
|
||||
"brace-expansion": "^2.0.2",
|
||||
"on-headers": "^1.1.0",
|
||||
"tmp": "0.2.4"
|
||||
}
|
||||
},
|
||||
"packageManager": "yarn@1.22.22+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e"
|
||||
}
|
||||
|
||||
27
pkg/modules/cloudintegrations/cloudintegrations.go
Normal file
27
pkg/modules/cloudintegrations/cloudintegrations.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package cloudintegrations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// start with moving the agent functions here to get review
|
||||
|
||||
// type Module interface {
|
||||
// AgentCheckIn(http.ResponseWriter, *http.Request)
|
||||
// }
|
||||
|
||||
type Handler interface {
|
||||
AgentCheckIn(http.ResponseWriter, *http.Request)
|
||||
ListServices(http.ResponseWriter, *http.Request)
|
||||
}
|
||||
|
||||
type Module interface {
|
||||
AgentCheckIn(ctx context.Context, req *PostableAgentCheckInPayload) (any, error)
|
||||
ListServices(ctx context.Context, orgID string, cloudProvider string, cloudAccountId *string) (any, error)
|
||||
}
|
||||
|
||||
// store interface will be in the types package
|
||||
type CloudProvider interface {
|
||||
ListServices(ctx context.Context, orgID string, cloudAccountId *string) (any, error)
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
package implcloudintergations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/http/render"
|
||||
"github.com/SigNoz/signoz/pkg/modules/cloudintegrations"
|
||||
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
||||
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
module cloudintegrations.Module
|
||||
}
|
||||
|
||||
func NewHandler(module cloudintegrations.Module) *handler {
|
||||
return &handler{
|
||||
module: module,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) CloudIntegrationsAgentCheckIn(rw http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, err := authtypes.ClaimsFromContext(ctx)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
cloudProviderString := mux.Vars(r)["cloudProvider"]
|
||||
|
||||
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
req := new(integrationtypes.PostableAgentCheckInPayload)
|
||||
if err = json.NewDecoder(r.Body).Decode(req); err != nil {
|
||||
render.Error(rw, errors.WrapInvalidInputf(err, errors.CodeInvalidInput, "invalid request body"))
|
||||
return
|
||||
}
|
||||
|
||||
req.OrgID = claims.OrgID
|
||||
|
||||
// we need to get the config
|
||||
|
||||
resp, err := h.cloudIntegrationsRegistry[cloudProvider].AgentCheckIn(r.Context(), req)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func (h *handler) ListServices(rw http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
claims, err := authtypes.ClaimsFromContext(r.Context())
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
cloudProviderString := mux.Vars(r)["cloudProvider"]
|
||||
|
||||
cloudProvider, err := integrationtypes.NewCloudProvider(cloudProviderString)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
var cloudAccountId *string
|
||||
|
||||
cloudAccountIdQP := r.URL.Query().Get("cloud_account_id")
|
||||
if len(cloudAccountIdQP) > 0 {
|
||||
cloudAccountId = &cloudAccountIdQP
|
||||
}
|
||||
|
||||
// give me the provider and then use it
|
||||
|
||||
resp, err := h.module.ListServices(ctx, claims.OrgID, cloudProvider, cloudAccountId)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
render.Success(rw, http.StatusOK, resp)
|
||||
|
||||
}
|
||||
175
pkg/modules/cloudintegrations/implcloudintergations/store.go
Normal file
175
pkg/modules/cloudintegrations/implcloudintergations/store.go
Normal file
@@ -0,0 +1,175 @@
|
||||
package implcloudintergations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/sqlstore"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
"github.com/SigNoz/signoz/pkg/types/integrationtypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
)
|
||||
|
||||
type store struct {
|
||||
sqlstore sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func NewStore(sqlstore sqlstore.SQLStore) integrationtypes.Store {
|
||||
return &store{sqlstore: sqlstore}
|
||||
}
|
||||
|
||||
func (s *store) ListConnected(
|
||||
ctx context.Context, orgId string, cloudProvider string,
|
||||
) ([]integrationtypes.CloudIntegration, error) {
|
||||
accounts := []integrationtypes.CloudIntegration{}
|
||||
|
||||
err := s.sqlstore.BunDB().NewSelect().
|
||||
Model(&accounts).
|
||||
Where("org_id = ?", orgId).
|
||||
Where("provider = ?", cloudProvider).
|
||||
Where("removed_at is NULL").
|
||||
Where("account_id is not NULL").
|
||||
Where("last_agent_report is not NULL").
|
||||
Order("created_at").
|
||||
Scan(ctx)
|
||||
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "error querying connected cloud accounts", "error", err)
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "could not query connected cloud accounts")
|
||||
}
|
||||
|
||||
return accounts, nil
|
||||
}
|
||||
|
||||
func (s *store) Get(
|
||||
ctx context.Context, orgId string, provider string, id string,
|
||||
) (*integrationtypes.CloudIntegration, error) {
|
||||
var result integrationtypes.CloudIntegration
|
||||
|
||||
err := s.sqlstore.BunDB().NewSelect().
|
||||
Model(&result).
|
||||
Where("org_id = ?", orgId).
|
||||
Where("provider = ?", provider).
|
||||
Where("id = ?", id).
|
||||
Scan(ctx)
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, s.sqlstore.WrapNotFoundErrf(
|
||||
err,
|
||||
integrationtypes.ErrCodeCloudIntegrationAccountNotFound,
|
||||
"couldn't find account with Id %s", id,
|
||||
)
|
||||
}
|
||||
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query cloud provider account")
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (s *store) GetConnectedCloudAccount(
|
||||
ctx context.Context, orgId string, provider string, accountId string,
|
||||
) (*integrationtypes.CloudIntegration, error) {
|
||||
var result integrationtypes.CloudIntegration
|
||||
|
||||
err := s.sqlstore.BunDB().NewSelect().
|
||||
Model(&result).
|
||||
Where("org_id = ?", orgId).
|
||||
Where("provider = ?", provider).
|
||||
Where("account_id = ?", accountId).
|
||||
Where("last_agent_report is not NULL").
|
||||
Where("removed_at is NULL").
|
||||
Scan(ctx)
|
||||
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, s.sqlstore.WrapNotFoundErrf(err, integrationtypes.ErrCodeCloudIntegrationAccountNotFound, "couldn't find connected cloud account %s", accountId)
|
||||
} else if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't query cloud provider account")
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (s *store) Upsert(
|
||||
ctx context.Context,
|
||||
orgId string,
|
||||
provider string,
|
||||
id *string,
|
||||
config []byte,
|
||||
accountId *string,
|
||||
agentReport *integrationtypes.AgentReport,
|
||||
removedAt *time.Time,
|
||||
) (*integrationtypes.CloudIntegration, error) {
|
||||
if id == nil {
|
||||
temp := valuer.GenerateUUID().StringValue()
|
||||
id = &temp
|
||||
}
|
||||
|
||||
onConflictSetStmts := []string{}
|
||||
setColStatement := func(col string) string {
|
||||
return fmt.Sprintf("%s=excluded.%s", col, col)
|
||||
}
|
||||
|
||||
if config != nil {
|
||||
onConflictSetStmts = append(onConflictSetStmts, setColStatement("config"))
|
||||
}
|
||||
|
||||
if accountId != nil {
|
||||
onConflictSetStmts = append(onConflictSetStmts, setColStatement("account_id"))
|
||||
}
|
||||
|
||||
if agentReport != nil {
|
||||
onConflictSetStmts = append(onConflictSetStmts, setColStatement("last_agent_report"))
|
||||
}
|
||||
|
||||
if removedAt != nil {
|
||||
onConflictSetStmts = append(onConflictSetStmts, setColStatement("removed_at"))
|
||||
}
|
||||
|
||||
onConflictSetStmts = append(onConflictSetStmts, setColStatement("updated_at"))
|
||||
|
||||
onConflictClause := ""
|
||||
if len(onConflictSetStmts) > 0 {
|
||||
onConflictClause = fmt.Sprintf(
|
||||
"conflict(id, provider, org_id) do update SET\n%s",
|
||||
strings.Join(onConflictSetStmts, ",\n"),
|
||||
)
|
||||
}
|
||||
|
||||
integration := integrationtypes.CloudIntegration{
|
||||
OrgID: orgId,
|
||||
Provider: provider,
|
||||
Identifiable: types.Identifiable{ID: valuer.MustNewUUID(*id)},
|
||||
TimeAuditable: types.TimeAuditable{
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
},
|
||||
Config: string(config),
|
||||
AccountID: accountId,
|
||||
LastAgentReport: agentReport,
|
||||
RemovedAt: removedAt,
|
||||
}
|
||||
|
||||
_, err := s.sqlstore.BunDB().NewInsert().
|
||||
Model(&integration).
|
||||
On(onConflictClause).
|
||||
Exec(ctx)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't upsert cloud integration account")
|
||||
}
|
||||
|
||||
upsertedAccount, err := s.Get(ctx, orgId, provider, *id)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "error upserting cloud integration account", "error", err)
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "couldn't get upserted cloud integration account")
|
||||
}
|
||||
|
||||
return upsertedAccount, nil
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz-otel-collector/utils"
|
||||
@@ -61,10 +62,15 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
type fieldMapper struct {}
|
||||
type fieldMapper struct {
|
||||
evolutionMetadataStore qbtypes.KeyEvolutionMetadataStore
|
||||
}
|
||||
|
||||
func NewFieldMapper() qbtypes.FieldMapper {
|
||||
return &fieldMapper{}
|
||||
func NewFieldMapper(evolutionMetadataStore qbtypes.KeyEvolutionMetadataStore) qbtypes.FieldMapper {
|
||||
// this can take evolution metadata as an argument and store it in the field mapper
|
||||
return &fieldMapper{
|
||||
evolutionMetadataStore: evolutionMetadataStore,
|
||||
}
|
||||
}
|
||||
func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.TelemetryFieldKey) (*schema.Column, error) {
|
||||
switch key.FieldContext {
|
||||
@@ -150,12 +156,17 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
|
||||
default:
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource/body context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
}
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
return column.Name, nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
|
||||
|
||||
baseColumn := logsV2Columns["resources_string"]
|
||||
tsStartTime := time.Unix(0, int64(tsStart))
|
||||
|
||||
// Check all evolutions for this key to see if any were released after tsStart.
|
||||
// If so, it means the new column wasn't available yet at tsStart, so we need to check the old column.
|
||||
evolutions := m.evolutionMetadataStore.Get(baseColumn.Name)
|
||||
|
||||
// restricting now to just one entry where we know we changes from map to json
|
||||
if len(evolutions) > 0 && evolutions[0].ReleaseTime.After(tsStartTime) {
|
||||
return fmt.Sprintf("%s.`%s`::String", column.Name, key.Name), nil
|
||||
}
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
|
||||
|
||||
@@ -3,6 +3,7 @@ package telemetrylogs
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
@@ -11,6 +12,38 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// mockKeyEvolutionMetadataStore is a mock implementation of KeyEvolutionMetadataStore for testing
|
||||
type mockKeyEvolutionMetadataStore struct {
|
||||
metadata map[string][]*qbtypes.KeyEvolutionMetadataKey
|
||||
}
|
||||
|
||||
func newMockKeyEvolutionMetadataStore() *mockKeyEvolutionMetadataStore {
|
||||
return &mockKeyEvolutionMetadataStore{
|
||||
metadata: make(map[string][]*qbtypes.KeyEvolutionMetadataKey),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockKeyEvolutionMetadataStore) Get(keyName string) []*qbtypes.KeyEvolutionMetadataKey {
|
||||
if m.metadata == nil {
|
||||
return nil
|
||||
}
|
||||
keys, exists := m.metadata[keyName]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
// Return a copy to prevent external modification
|
||||
result := make([]*qbtypes.KeyEvolutionMetadataKey, len(keys))
|
||||
copy(result, keys)
|
||||
return result
|
||||
}
|
||||
|
||||
func (m *mockKeyEvolutionMetadataStore) Add(keyName string, key *qbtypes.KeyEvolutionMetadataKey) {
|
||||
if m.metadata == nil {
|
||||
m.metadata = make(map[string][]*qbtypes.KeyEvolutionMetadataKey)
|
||||
}
|
||||
m.metadata[keyName] = append(m.metadata[keyName], key)
|
||||
}
|
||||
|
||||
func TestGetColumn(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -164,7 +197,8 @@ func TestGetColumn(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
fm := NewFieldMapper()
|
||||
mockStore := newMockKeyEvolutionMetadataStore()
|
||||
fm := NewFieldMapper(mockStore)
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
@@ -189,45 +223,45 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
expectedResult string
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "Simple column type - timestamp",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "timestamp",
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
},
|
||||
expectedResult: "timestamp",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - string attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "user.id",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
},
|
||||
expectedResult: "attributes_string['user.id']",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - number attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "request.size",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
},
|
||||
expectedResult: "attributes_number['request.size']",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - bool attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "request.success",
|
||||
FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeBool,
|
||||
},
|
||||
expectedResult: "attributes_bool['request.success']",
|
||||
expectedError: nil,
|
||||
},
|
||||
// {
|
||||
// name: "Simple column type - timestamp",
|
||||
// key: telemetrytypes.TelemetryFieldKey{
|
||||
// Name: "timestamp",
|
||||
// FieldContext: telemetrytypes.FieldContextLog,
|
||||
// },
|
||||
// expectedResult: "timestamp",
|
||||
// expectedError: nil,
|
||||
// },
|
||||
// {
|
||||
// name: "Map column type - string attribute",
|
||||
// key: telemetrytypes.TelemetryFieldKey{
|
||||
// Name: "user.id",
|
||||
// FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
// FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
// },
|
||||
// expectedResult: "attributes_string['user.id']",
|
||||
// expectedError: nil,
|
||||
// },
|
||||
// {
|
||||
// name: "Map column type - number attribute",
|
||||
// key: telemetrytypes.TelemetryFieldKey{
|
||||
// Name: "request.size",
|
||||
// FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
// FieldDataType: telemetrytypes.FieldDataTypeNumber,
|
||||
// },
|
||||
// expectedResult: "attributes_number['request.size']",
|
||||
// expectedError: nil,
|
||||
// },
|
||||
// {
|
||||
// name: "Map column type - bool attribute",
|
||||
// key: telemetrytypes.TelemetryFieldKey{
|
||||
// Name: "request.success",
|
||||
// FieldContext: telemetrytypes.FieldContextAttribute,
|
||||
// FieldDataType: telemetrytypes.FieldDataTypeBool,
|
||||
// },
|
||||
// expectedResult: "attributes_bool['request.success']",
|
||||
// expectedError: nil,
|
||||
// },
|
||||
{
|
||||
name: "Map column type - resource attribute",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
@@ -237,32 +271,168 @@ func TestGetFieldKeyName(t *testing.T) {
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Map column type - resource attribute - Materialized",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
Materialized: true,
|
||||
},
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, `resource_string_service$$name_exists`==true, `resource_string_service$$name`, NULL)",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Non-existent column",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "nonexistent_field",
|
||||
FieldContext: telemetrytypes.FieldContextLog,
|
||||
},
|
||||
expectedResult: "",
|
||||
expectedError: qbtypes.ErrColumnNotFound,
|
||||
},
|
||||
// {
|
||||
// name: "Map column type - resource attribute - Materialized",
|
||||
// key: telemetrytypes.TelemetryFieldKey{
|
||||
// Name: "service.name",
|
||||
// FieldContext: telemetrytypes.FieldContextResource,
|
||||
// FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
// Materialized: true,
|
||||
// },
|
||||
// expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, `resource_string_service$$name_exists`==true, `resource_string_service$$name`, NULL)",
|
||||
// expectedError: nil,
|
||||
// },
|
||||
// {
|
||||
// name: "Map column type - resource attribute - json",
|
||||
// tsStart: uint64(time.Now().Add(10 * time.Second).UnixNano()),
|
||||
// tsEnd: uint64(time.Now().Add(20 * time.Second).UnixNano()),
|
||||
// key: telemetrytypes.TelemetryFieldKey{
|
||||
// Name: "service.name",
|
||||
// FieldContext: telemetrytypes.FieldContextResource,
|
||||
// },
|
||||
// expectedResult: "resource.`service.name`::String",
|
||||
// expectedError: nil,
|
||||
// },
|
||||
// {
|
||||
// name: "Map column type - resource attribute - Materialized - json",
|
||||
// tsStart: uint64(time.Now().Add(10 * time.Second).UnixNano()),
|
||||
// tsEnd: uint64(time.Now().Add(20 * time.Second).UnixNano()),
|
||||
// key: telemetrytypes.TelemetryFieldKey{
|
||||
// Name: "service.name",
|
||||
// FieldContext: telemetrytypes.FieldContextResource,
|
||||
// FieldDataType: telemetrytypes.FieldDataTypeString,
|
||||
// Materialized: true,
|
||||
// },
|
||||
// expectedResult: "resource.`service.name`::String",
|
||||
// expectedError: nil,
|
||||
// },
|
||||
// {
|
||||
// name: "Non-existent column",
|
||||
// key: telemetrytypes.TelemetryFieldKey{
|
||||
// Name: "nonexistent_field",
|
||||
// FieldContext: telemetrytypes.FieldContextLog,
|
||||
// },
|
||||
// expectedResult: "",
|
||||
// expectedError: qbtypes.ErrColumnNotFound,
|
||||
// },
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fm := NewFieldMapper()
|
||||
result, err := fm.FieldFor(ctx, &tc.key)
|
||||
mockStore := newMockKeyEvolutionMetadataStore()
|
||||
fm := NewFieldMapper(mockStore)
|
||||
result, err := fm.FieldFor(ctx, tc.tsStart, tc.tsEnd, &tc.key)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, tc.expectedResult, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFieldForWithEvolutionMetadata(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Create a test release time
|
||||
releaseTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC)
|
||||
releaseTimeNano := uint64(releaseTime.UnixNano())
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
tsStart uint64
|
||||
tsEnd uint64
|
||||
key telemetrytypes.TelemetryFieldKey
|
||||
setupMock func(*mockKeyEvolutionMetadataStore)
|
||||
expectedResult string
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "Resource attribute - tsStart before release time (use new JSON column only)",
|
||||
tsStart: releaseTimeNano - uint64(24*time.Hour.Nanoseconds()), // 1 day before release
|
||||
tsEnd: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
},
|
||||
setupMock: func(m *mockKeyEvolutionMetadataStore) {
|
||||
m.Add("resources_string", &qbtypes.KeyEvolutionMetadataKey{
|
||||
BaseColumn: "resources_string",
|
||||
BaseColumnType: "Map(LowCardinality(String), String)",
|
||||
NewColumn: "resource",
|
||||
NewColumnType: "JSON(max_dynamic_paths=100)",
|
||||
ReleaseTime: releaseTime,
|
||||
})
|
||||
},
|
||||
expectedResult: "resource.`service.name`::String",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Resource attribute - tsStart after release time (use fallback with multiIf)",
|
||||
tsStart: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()), // 1 day after release
|
||||
tsEnd: releaseTimeNano + uint64(48*time.Hour.Nanoseconds()),
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
},
|
||||
setupMock: func(m *mockKeyEvolutionMetadataStore) {
|
||||
m.Add("resources_string", &qbtypes.KeyEvolutionMetadataKey{
|
||||
BaseColumn: "resources_string",
|
||||
BaseColumnType: "Map(LowCardinality(String), String)",
|
||||
NewColumn: "resource",
|
||||
NewColumnType: "JSON(max_dynamic_paths=100)",
|
||||
ReleaseTime: releaseTime,
|
||||
})
|
||||
},
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Resource attribute - no evolution metadata (use fallback with multiIf)",
|
||||
tsStart: releaseTimeNano,
|
||||
tsEnd: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
},
|
||||
setupMock: func(m *mockKeyEvolutionMetadataStore) {
|
||||
// No metadata added - empty mock
|
||||
},
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "Resource attribute - tsStart exactly at release time (use fallback with multiIf)",
|
||||
tsStart: releaseTimeNano,
|
||||
tsEnd: releaseTimeNano + uint64(24*time.Hour.Nanoseconds()),
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "service.name",
|
||||
FieldContext: telemetrytypes.FieldContextResource,
|
||||
},
|
||||
setupMock: func(m *mockKeyEvolutionMetadataStore) {
|
||||
m.Add("resources_string", &qbtypes.KeyEvolutionMetadataKey{
|
||||
BaseColumn: "resources_string",
|
||||
BaseColumnType: "Map(LowCardinality(String), String)",
|
||||
NewColumn: "resource",
|
||||
NewColumnType: "JSON(max_dynamic_paths=100)",
|
||||
ReleaseTime: releaseTime,
|
||||
})
|
||||
},
|
||||
expectedResult: "multiIf(resource.`service.name` IS NOT NULL, resource.`service.name`::String, mapContains(resources_string, 'service.name'), resources_string['service.name'], NULL)",
|
||||
expectedError: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
mockStore := newMockKeyEvolutionMetadataStore()
|
||||
if tc.setupMock != nil {
|
||||
tc.setupMock(mockStore)
|
||||
}
|
||||
fm := NewFieldMapper(mockStore)
|
||||
result, err := fm.FieldFor(ctx, tc.tsStart, tc.tsEnd, &tc.key)
|
||||
|
||||
if tc.expectedError != nil {
|
||||
assert.Equal(t, tc.expectedError, err)
|
||||
|
||||
21
pkg/types/integrationtypes/store.go
Normal file
21
pkg/types/integrationtypes/store.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package integrationtypes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCodeCloudIntegrationAccountNotFound errors.Code = errors.MustNewCode("cloud_integration_account_not_found")
|
||||
)
|
||||
|
||||
// CloudIntegrationAccountsStore defines the interface for cloud integration accounts persistence.
|
||||
type Store interface {
|
||||
ListConnected(ctx context.Context, orgId string, provider string) ([]CloudIntegration, error)
|
||||
Get(ctx context.Context, orgId string, provider string, id string) (*CloudIntegration, error)
|
||||
GetConnectedCloudAccount(ctx context.Context, orgId, provider string, accountID string) (*CloudIntegration, error)
|
||||
// Upsert inserts an account or updates it by (cloudProvider, id) for specified non-empty fields.
|
||||
Upsert(ctx context.Context, orgId string, provider string, id *string, config []byte, accountId *string, agentReport *AgentReport, removedAt *time.Time) (*CloudIntegration, error)
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package querybuildertypesv5
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
@@ -59,3 +60,16 @@ type TraceOperatorStatementBuilder interface {
|
||||
// Build builds the trace operator query.
|
||||
Build(ctx context.Context, start, end uint64, requestType RequestType, query QueryBuilderTraceOperator, compositeQuery *CompositeQuery) (*Statement, error)
|
||||
}
|
||||
|
||||
type KeyEvolutionMetadataKey struct {
|
||||
BaseColumn string
|
||||
BaseColumnType string
|
||||
NewColumn string
|
||||
NewColumnType string
|
||||
ReleaseTime time.Time
|
||||
}
|
||||
|
||||
type KeyEvolutionMetadataStore interface {
|
||||
Get(keyName string) []*KeyEvolutionMetadataKey
|
||||
Add(keyName string, key *KeyEvolutionMetadataKey)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user