|
|
|
|
@@ -10,7 +10,6 @@ import (
|
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2/lib/chcol"
|
|
|
|
|
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
|
|
|
|
"github.com/SigNoz/signoz-otel-collector/constants"
|
|
|
|
|
"github.com/SigNoz/signoz-otel-collector/utils"
|
|
|
|
|
"github.com/SigNoz/signoz/pkg/errors"
|
|
|
|
|
"github.com/SigNoz/signoz/pkg/querybuilder"
|
|
|
|
|
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
|
|
|
|
@@ -113,7 +112,7 @@ func (t *telemetryMetaStore) buildBodyJSONPaths(ctx context.Context,
|
|
|
|
|
|
|
|
|
|
for _, fieldKey := range fieldKeys {
|
|
|
|
|
promotedKey := strings.Split(fieldKey.Name, telemetrytypes.ArraySep)[0]
|
|
|
|
|
fieldKey.Materialized = promoted.Contains(promotedKey)
|
|
|
|
|
fieldKey.Materialized = promoted[promotedKey]
|
|
|
|
|
fieldKey.Indexes = indexes[fieldKey.Name]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -295,33 +294,6 @@ func (t *telemetryMetaStore) ListLogsJSONIndexes(ctx context.Context, filters ..
|
|
|
|
|
return indexes, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *telemetryMetaStore) ListPromotedPaths(ctx context.Context, paths ...string) (map[string]struct{}, error) {
|
|
|
|
|
sb := sqlbuilder.Select("path").From(fmt.Sprintf("%s.%s", DBName, PromotedPathsTableName))
|
|
|
|
|
pathConditions := []string{}
|
|
|
|
|
for _, path := range paths {
|
|
|
|
|
pathConditions = append(pathConditions, sb.Equal("path", path))
|
|
|
|
|
}
|
|
|
|
|
sb.Where(sb.Or(pathConditions...))
|
|
|
|
|
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
|
|
|
|
|
|
|
|
|
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errors.WrapInternalf(err, CodeFailLoadPromotedPaths, "failed to load promoted paths")
|
|
|
|
|
}
|
|
|
|
|
defer rows.Close()
|
|
|
|
|
|
|
|
|
|
next := make(map[string]struct{})
|
|
|
|
|
for rows.Next() {
|
|
|
|
|
var path string
|
|
|
|
|
if err := rows.Scan(&path); err != nil {
|
|
|
|
|
return nil, errors.WrapInternalf(err, CodeFailLoadPromotedPaths, "failed to scan promoted path")
|
|
|
|
|
}
|
|
|
|
|
next[path] = struct{}{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return next, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO(Piyush): Remove this if not used in future
|
|
|
|
|
func (t *telemetryMetaStore) ListJSONValues(ctx context.Context, path string, limit int) (*telemetrytypes.TelemetryFieldValues, bool, error) {
|
|
|
|
|
path = CleanPathPrefixes(path)
|
|
|
|
|
@@ -484,11 +456,12 @@ func derefValue(v any) any {
|
|
|
|
|
return val.Interface()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// IsPathPromoted checks if a specific path is promoted
|
|
|
|
|
// IsPathPromoted checks if a specific path is promoted (Column Evolution table: field_name for logs body).
|
|
|
|
|
func (t *telemetryMetaStore) IsPathPromoted(ctx context.Context, path string) (bool, error) {
|
|
|
|
|
split := strings.Split(path, telemetrytypes.ArraySep)
|
|
|
|
|
query := fmt.Sprintf("SELECT 1 FROM %s.%s WHERE path = ? LIMIT 1", DBName, PromotedPathsTableName)
|
|
|
|
|
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, split[0])
|
|
|
|
|
pathSegment := split[0]
|
|
|
|
|
query := fmt.Sprintf("SELECT 1 FROM %s.%s WHERE signal = ? AND column_name = ? AND field_context = ? AND field_name = ? LIMIT 1", DBName, PromotedPathsTableName)
|
|
|
|
|
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, telemetrytypes.SignalLogs, telemetrylogs.LogsV2BodyPromotedColumn, telemetrytypes.FieldContextBody, pathSegment)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return false, errors.WrapInternalf(err, CodeFailCheckPathPromoted, "failed to check if path %s is promoted", path)
|
|
|
|
|
}
|
|
|
|
|
@@ -497,15 +470,24 @@ func (t *telemetryMetaStore) IsPathPromoted(ctx context.Context, path string) (b
|
|
|
|
|
return rows.Next(), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// GetPromotedPaths checks if a specific path is promoted
|
|
|
|
|
func (t *telemetryMetaStore) GetPromotedPaths(ctx context.Context, paths ...string) (*utils.ConcurrentSet[string], error) {
|
|
|
|
|
sb := sqlbuilder.Select("path").From(fmt.Sprintf("%s.%s", DBName, PromotedPathsTableName))
|
|
|
|
|
pathConditions := []string{}
|
|
|
|
|
for _, path := range paths {
|
|
|
|
|
split := strings.Split(path, telemetrytypes.ArraySep)
|
|
|
|
|
pathConditions = append(pathConditions, sb.Equal("path", split[0]))
|
|
|
|
|
// GetPromotedPaths returns promoted paths from the Column Evolution table (field_name for logs body).
|
|
|
|
|
func (t *telemetryMetaStore) GetPromotedPaths(ctx context.Context, paths ...string) (map[string]bool, error) {
|
|
|
|
|
sb := sqlbuilder.Select("field_name").From(fmt.Sprintf("%s.%s", DBName, PromotedPathsTableName))
|
|
|
|
|
conditions := []string{
|
|
|
|
|
sb.Equal("signal", telemetrytypes.SignalLogs),
|
|
|
|
|
sb.Equal("column_name", telemetrylogs.LogsV2BodyPromotedColumn),
|
|
|
|
|
sb.Equal("field_context", telemetrytypes.FieldContextBody),
|
|
|
|
|
sb.NotEqual("field_name", "__all__"),
|
|
|
|
|
}
|
|
|
|
|
sb.Where(sb.Or(pathConditions...))
|
|
|
|
|
if len(paths) > 0 {
|
|
|
|
|
pathArgs := make([]interface{}, len(paths))
|
|
|
|
|
for i, path := range paths {
|
|
|
|
|
split := strings.Split(path, telemetrytypes.ArraySep)
|
|
|
|
|
pathArgs[i] = split[0]
|
|
|
|
|
}
|
|
|
|
|
conditions = append(conditions, sb.In("field_name", pathArgs))
|
|
|
|
|
}
|
|
|
|
|
sb.Where(sb.And(conditions...))
|
|
|
|
|
|
|
|
|
|
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
|
|
|
|
rows, err := t.telemetrystore.ClickhouseDB().Query(ctx, query, args...)
|
|
|
|
|
@@ -514,13 +496,13 @@ func (t *telemetryMetaStore) GetPromotedPaths(ctx context.Context, paths ...stri
|
|
|
|
|
}
|
|
|
|
|
defer rows.Close()
|
|
|
|
|
|
|
|
|
|
promotedPaths := utils.NewConcurrentSet[string]()
|
|
|
|
|
promotedPaths := make(map[string]bool)
|
|
|
|
|
for rows.Next() {
|
|
|
|
|
var path string
|
|
|
|
|
if err := rows.Scan(&path); err != nil {
|
|
|
|
|
var fieldName string
|
|
|
|
|
if err := rows.Scan(&fieldName); err != nil {
|
|
|
|
|
return nil, errors.WrapInternalf(err, CodeFailCheckPathPromoted, "failed to scan promoted path")
|
|
|
|
|
}
|
|
|
|
|
promotedPaths.Insert(path)
|
|
|
|
|
promotedPaths[fieldName] = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return promotedPaths, nil
|
|
|
|
|
@@ -534,21 +516,22 @@ func CleanPathPrefixes(path string) string {
|
|
|
|
|
return path
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// PromotePaths inserts promoted paths into the Column Evolution table (same schema as signoz-otel-collector metadata_migrations).
|
|
|
|
|
func (t *telemetryMetaStore) PromotePaths(ctx context.Context, paths ...string) error {
|
|
|
|
|
batch, err := t.telemetrystore.ClickhouseDB().PrepareBatch(ctx,
|
|
|
|
|
fmt.Sprintf("INSERT INTO %s.%s (path, created_at) VALUES", DBName,
|
|
|
|
|
fmt.Sprintf("INSERT INTO %s.%s (signal, column_name, column_type, field_context, field_name, version, release_time) VALUES", DBName,
|
|
|
|
|
PromotedPathsTableName))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.WrapInternalf(err, CodeFailedToPrepareBatch, "failed to prepare batch")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nowMs := uint64(time.Now().UnixMilli())
|
|
|
|
|
releaseTime := time.Now().UnixNano()
|
|
|
|
|
for _, p := range paths {
|
|
|
|
|
trimmed := strings.TrimSpace(p)
|
|
|
|
|
if trimmed == "" {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if err := batch.Append(trimmed, nowMs); err != nil {
|
|
|
|
|
if err := batch.Append(telemetrytypes.SignalLogs, telemetrylogs.LogsV2BodyPromotedColumn, "JSON()", telemetrytypes.FieldContextBody, trimmed, 0, releaseTime); err != nil {
|
|
|
|
|
_ = batch.Abort()
|
|
|
|
|
return errors.WrapInternalf(err, CodeFailedToAppendPath, "failed to append path")
|
|
|
|
|
}
|
|
|
|
|
|