mirror of
https://github.com/SigNoz/signoz.git
synced 2026-02-09 11:12:21 +00:00
Compare commits
3 Commits
chore/sync
...
v0.65.0-cl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
60dc479a19 | ||
|
|
85cf4f4e2e | ||
|
|
83aa48c721 |
@@ -6,6 +6,8 @@ import (
|
||||
|
||||
func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
|
||||
timeRange := (end - start) / 1000000000
|
||||
tsBucketStart := (start / 1000000000) - 1800
|
||||
tsBucketEnd := end / 1000000000
|
||||
query := fmt.Sprintf(`
|
||||
WITH consumer_query AS (
|
||||
SELECT
|
||||
@@ -18,6 +20,8 @@ WITH consumer_query AS (
|
||||
WHERE
|
||||
timestamp >= '%d'
|
||||
AND timestamp <= '%d'
|
||||
AND ts_bucket_start >= '%d'
|
||||
AND ts_bucket_start <= '%d'
|
||||
AND kind = 5
|
||||
AND attribute_string_messaging$$system = '%s'
|
||||
AND attributes_string['messaging.destination.name'] = '%s'
|
||||
@@ -36,13 +40,15 @@ FROM
|
||||
consumer_query
|
||||
ORDER BY
|
||||
resource_string_service$$name;
|
||||
`, start, end, queueType, topic, partition, consumerGroup, timeRange)
|
||||
`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, consumerGroup, timeRange)
|
||||
return query
|
||||
}
|
||||
|
||||
// S1 landing
|
||||
func generatePartitionLatencySQL(start, end int64, queueType string) string {
|
||||
timeRange := (end - start) / 1000000000
|
||||
tsBucketStart := (start / 1000000000) - 1800
|
||||
tsBucketEnd := end / 1000000000
|
||||
query := fmt.Sprintf(`
|
||||
WITH partition_query AS (
|
||||
SELECT
|
||||
@@ -54,6 +60,8 @@ WITH partition_query AS (
|
||||
WHERE
|
||||
timestamp >= '%d'
|
||||
AND timestamp <= '%d'
|
||||
AND ts_bucket_start >= '%d'
|
||||
AND ts_bucket_start <= '%d'
|
||||
AND kind = 4
|
||||
AND attribute_string_messaging$$system = '%s'
|
||||
GROUP BY topic, partition
|
||||
@@ -68,13 +76,15 @@ FROM
|
||||
partition_query
|
||||
ORDER BY
|
||||
topic;
|
||||
`, start, end, queueType, timeRange)
|
||||
`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange)
|
||||
return query
|
||||
}
|
||||
|
||||
// S1 consumer
|
||||
func generateConsumerPartitionLatencySQL(start, end int64, topic, partition, queueType string) string {
|
||||
timeRange := (end - start) / 1000000000
|
||||
tsBucketStart := (start / 1000000000) - 1800
|
||||
tsBucketEnd := end / 1000000000
|
||||
query := fmt.Sprintf(`
|
||||
WITH consumer_pl AS (
|
||||
SELECT
|
||||
@@ -87,6 +97,8 @@ WITH consumer_pl AS (
|
||||
WHERE
|
||||
timestamp >= '%d'
|
||||
AND timestamp <= '%d'
|
||||
AND ts_bucket_start >= '%d'
|
||||
AND ts_bucket_start <= '%d'
|
||||
AND kind = 5
|
||||
AND attribute_string_messaging$$system = '%s'
|
||||
AND attributes_string['messaging.destination.name'] = '%s'
|
||||
@@ -104,14 +116,15 @@ FROM
|
||||
consumer_pl
|
||||
ORDER BY
|
||||
consumer_group;
|
||||
`, start, end, queueType, topic, partition, timeRange)
|
||||
`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, timeRange)
|
||||
return query
|
||||
}
|
||||
|
||||
// S3, producer overview
|
||||
func generateProducerPartitionThroughputSQL(start, end int64, queueType string) string {
|
||||
timeRange := (end - start) / 1000000000
|
||||
// t, svc, rps, byte*, p99, err
|
||||
tsBucketStart := (start / 1000000000) - 1800
|
||||
tsBucketEnd := end / 1000000000 // t, svc, rps, byte*, p99, err
|
||||
query := fmt.Sprintf(`
|
||||
WITH producer_latency AS (
|
||||
SELECT
|
||||
@@ -124,6 +137,8 @@ WITH producer_latency AS (
|
||||
WHERE
|
||||
timestamp >= '%d'
|
||||
AND timestamp <= '%d'
|
||||
AND ts_bucket_start >= '%d'
|
||||
AND ts_bucket_start <= '%d'
|
||||
AND kind = 4
|
||||
AND attribute_string_messaging$$system = '%s'
|
||||
GROUP BY topic, resource_string_service$$name
|
||||
@@ -137,13 +152,15 @@ SELECT
|
||||
COALESCE(total_requests / %d, 0) AS throughput
|
||||
FROM
|
||||
producer_latency
|
||||
`, start, end, queueType, timeRange)
|
||||
`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange)
|
||||
return query
|
||||
}
|
||||
|
||||
// S3, producer topic/service overview
|
||||
func generateProducerTopicLatencySQL(start, end int64, topic, service, queueType string) string {
|
||||
timeRange := (end - start) / 1000000000
|
||||
tsBucketStart := (start / 1000000000) - 1800
|
||||
tsBucketEnd := end / 1000000000
|
||||
query := fmt.Sprintf(`
|
||||
WITH consumer_latency AS (
|
||||
SELECT
|
||||
@@ -155,6 +172,8 @@ WITH consumer_latency AS (
|
||||
WHERE
|
||||
timestamp >= '%d'
|
||||
AND timestamp <= '%d'
|
||||
AND ts_bucket_start >= '%d'
|
||||
AND ts_bucket_start <= '%d'
|
||||
AND kind = 4
|
||||
AND resource_string_service$$name = '%s'
|
||||
AND attribute_string_messaging$$system = '%s'
|
||||
@@ -169,13 +188,15 @@ SELECT
|
||||
COALESCE(total_requests / %d, 0) AS throughput
|
||||
FROM
|
||||
consumer_latency
|
||||
`, start, end, service, queueType, topic, timeRange)
|
||||
`, start, end, tsBucketStart, tsBucketEnd, service, queueType, topic, timeRange)
|
||||
return query
|
||||
}
|
||||
|
||||
// S3 consumer overview
|
||||
func generateConsumerLatencySQL(start, end int64, queueType string) string {
|
||||
timeRange := (end - start) / 1000000000
|
||||
tsBucketStart := (start / 1000000000) - 1800
|
||||
tsBucketEnd := end / 1000000000
|
||||
query := fmt.Sprintf(`
|
||||
WITH consumer_latency AS (
|
||||
SELECT
|
||||
@@ -189,6 +210,8 @@ WITH consumer_latency AS (
|
||||
WHERE
|
||||
timestamp >= '%d'
|
||||
AND timestamp <= '%d'
|
||||
AND ts_bucket_start >= '%d'
|
||||
AND ts_bucket_start <= '%d'
|
||||
AND kind = 5
|
||||
AND attribute_string_messaging$$system = '%s'
|
||||
GROUP BY topic, resource_string_service$$name
|
||||
@@ -205,13 +228,15 @@ FROM
|
||||
consumer_latency
|
||||
ORDER BY
|
||||
topic;
|
||||
`, start, end, queueType, timeRange, timeRange)
|
||||
`, start, end, tsBucketStart, tsBucketEnd, queueType, timeRange, timeRange)
|
||||
return query
|
||||
}
|
||||
|
||||
// S3 consumer topic/service
|
||||
func generateConsumerServiceLatencySQL(start, end int64, topic, service, queueType string) string {
|
||||
timeRange := (end - start) / 1000000000
|
||||
tsBucketStart := (start / 1000000000) - 1800
|
||||
tsBucketEnd := end / 1000000000
|
||||
query := fmt.Sprintf(`
|
||||
WITH consumer_latency AS (
|
||||
SELECT
|
||||
@@ -223,6 +248,8 @@ WITH consumer_latency AS (
|
||||
WHERE
|
||||
timestamp >= '%d'
|
||||
AND timestamp <= '%d'
|
||||
AND ts_bucket_start >= '%d'
|
||||
AND ts_bucket_start <= '%d'
|
||||
AND kind = 5
|
||||
AND resource_string_service$$name = '%s'
|
||||
AND attribute_string_messaging$$system = '%s'
|
||||
@@ -237,7 +264,7 @@ SELECT
|
||||
COALESCE(total_requests / %d, 0) AS throughput
|
||||
FROM
|
||||
consumer_latency
|
||||
`, start, end, service, queueType, topic, timeRange)
|
||||
`, start, end, tsBucketStart, tsBucketEnd, service, queueType, topic, timeRange)
|
||||
return query
|
||||
}
|
||||
|
||||
@@ -293,6 +320,8 @@ GROUP BY
|
||||
|
||||
func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
|
||||
timeRange := (end - start) / 1000000000
|
||||
tsBucketStart := (start / 1000000000) - 1800
|
||||
tsBucketEnd := end / 1000000000
|
||||
query := fmt.Sprintf(`
|
||||
WITH producer_query AS (
|
||||
SELECT
|
||||
@@ -304,6 +333,8 @@ WITH producer_query AS (
|
||||
WHERE
|
||||
timestamp >= '%d'
|
||||
AND timestamp <= '%d'
|
||||
AND ts_bucket_start >= '%d'
|
||||
AND ts_bucket_start <= '%d'
|
||||
AND kind = 4
|
||||
AND attribute_string_messaging$$system = '%s'
|
||||
AND attributes_string['messaging.destination.name'] = '%s'
|
||||
@@ -320,33 +351,39 @@ FROM
|
||||
producer_query
|
||||
ORDER BY
|
||||
resource_string_service$$name;
|
||||
`, start, end, queueType, topic, partition, timeRange)
|
||||
`, start, end, tsBucketStart, tsBucketEnd, queueType, topic, partition, timeRange)
|
||||
return query
|
||||
}
|
||||
|
||||
func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partitionID, queueType string) string {
|
||||
timeRange := (end - start) / 1000000000
|
||||
tsBucketStart := (start / 1000000000) - 1800
|
||||
tsBucketEnd := end / 1000000000
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
attributes_string['messaging.client_id'] AS client_id,
|
||||
attributes_string['service.instance.id'] AS service_instance_id,
|
||||
resources_string['service.instance.id'] AS service_instance_id,
|
||||
resource_string_service$$name AS service_name,
|
||||
count(*) / %d AS throughput
|
||||
FROM signoz_traces.distributed_signoz_index_v3
|
||||
WHERE
|
||||
timestamp >= '%d'
|
||||
AND timestamp <= '%d'
|
||||
AND ts_bucket_start >= '%d'
|
||||
AND ts_bucket_start <= '%d'
|
||||
AND kind = 5
|
||||
AND attribute_string_messaging$$system = '%s'
|
||||
AND attributes_string['messaging.kafka.consumer.group'] = '%s'
|
||||
AND attributes_string['messaging.destination.partition.id'] = '%s'
|
||||
GROUP BY service_name, client_id, service_instance_id
|
||||
ORDER BY throughput DESC
|
||||
`, timeRange, start, end, queueType, consumerGroup, partitionID)
|
||||
`, timeRange, start, end, tsBucketStart, tsBucketEnd, queueType, consumerGroup, partitionID)
|
||||
return query
|
||||
}
|
||||
|
||||
func onboardProducersSQL(start, end int64, queueType string) string {
|
||||
tsBucketStart := (start / 1000000000) - 1800
|
||||
tsBucketEnd := end / 1000000000
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
COUNT(*) = 0 AS entries,
|
||||
@@ -358,11 +395,15 @@ FROM
|
||||
signoz_traces.distributed_signoz_index_v3
|
||||
WHERE
|
||||
timestamp >= '%d'
|
||||
AND timestamp <= '%d';`, queueType, start, end)
|
||||
AND timestamp <= '%d'
|
||||
AND ts_bucket_start >= '%d'
|
||||
AND ts_bucket_start <= '%d';`, queueType, start, end, tsBucketStart, tsBucketEnd)
|
||||
return query
|
||||
}
|
||||
|
||||
func onboardConsumerSQL(start, end int64, queueType string) string {
|
||||
tsBucketStart := (start / 1000000000) - 1800
|
||||
tsBucketEnd := end / 1000000000
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
COUNT(*) = 0 AS entries,
|
||||
@@ -374,10 +415,12 @@ SELECT
|
||||
COUNT(IF(has(attributes_string, 'messaging.kafka.consumer.group'), 1, NULL)) = 0 AS cgroup,
|
||||
COUNT(IF(has(attributes_number, 'messaging.message.body.size'), 1, NULL)) = 0 AS bodysize,
|
||||
COUNT(IF(has(attributes_string, 'messaging.client_id'), 1, NULL)) = 0 AS clientid,
|
||||
COUNT(IF(has(attributes_string, 'service.instance.id'), 1, NULL)) = 0 AS instanceid
|
||||
COUNT(IF(has(resources_string, 'service.instance.id'), 1, NULL)) = 0 AS instanceid
|
||||
FROM signoz_traces.distributed_signoz_index_v3
|
||||
WHERE
|
||||
timestamp >= '%d'
|
||||
AND timestamp <= '%d';`, queueType, start, end)
|
||||
AND timestamp <= '%d'
|
||||
AND ts_bucket_start >= '%d'
|
||||
AND ts_bucket_start <= '%d' ;`, queueType, start, end, tsBucketStart, tsBucketEnd)
|
||||
return query
|
||||
}
|
||||
|
||||
@@ -190,7 +190,7 @@ func (q *querier) runBuilderQuery(
|
||||
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
|
||||
return
|
||||
}
|
||||
query = fmt.Sprintf(placeholderQuery, limitQuery)
|
||||
query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1)
|
||||
} else {
|
||||
query, err = tracesQueryBuilder(
|
||||
start,
|
||||
|
||||
@@ -190,7 +190,7 @@ func (q *querier) runBuilderQuery(
|
||||
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
|
||||
return
|
||||
}
|
||||
query = fmt.Sprintf(placeholderQuery, limitQuery)
|
||||
query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1)
|
||||
} else {
|
||||
query, err = tracesQueryBuilder(
|
||||
start,
|
||||
|
||||
@@ -34,8 +34,8 @@ func enrichKeyWithMetadata(key v3.AttributeKey, keys map[string]v3.AttributeKey)
|
||||
return v
|
||||
}
|
||||
|
||||
for _, key := range utils.GenerateEnrichmentKeys(key) {
|
||||
if val, ok := keys[key]; ok {
|
||||
for _, tkey := range utils.GenerateEnrichmentKeys(key) {
|
||||
if val, ok := keys[tkey]; ok {
|
||||
return val
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,6 +74,19 @@ func getSelectLabels(groupBy []v3.AttributeKey) string {
|
||||
return strings.Join(labels, ",")
|
||||
}
|
||||
|
||||
// TODO(nitya): use the _exists columns as well in the future similar to logs
|
||||
func existsSubQueryForFixedColumn(key v3.AttributeKey, op v3.FilterOperator) (string, error) {
|
||||
if key.DataType == v3.AttributeKeyDataTypeString {
|
||||
if op == v3.FilterOperatorExists {
|
||||
return fmt.Sprintf("%s %s ''", getColumnName(key), tracesOperatorMappingV3[v3.FilterOperatorNotEqual]), nil
|
||||
} else {
|
||||
return fmt.Sprintf("%s %s ''", getColumnName(key), tracesOperatorMappingV3[v3.FilterOperatorEqual]), nil
|
||||
}
|
||||
} else {
|
||||
return "", fmt.Errorf("unsupported operation, exists and not exists can only be applied on custom attributes or string type columns")
|
||||
}
|
||||
}
|
||||
|
||||
func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
|
||||
var conditions []string
|
||||
|
||||
@@ -110,7 +123,7 @@ func buildTracesFilterQuery(fs *v3.FilterSet) (string, error) {
|
||||
conditions = append(conditions, fmt.Sprintf(operator, columnName, fmtVal))
|
||||
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
|
||||
if item.Key.IsColumn {
|
||||
subQuery, err := tracesV3.ExistsSubQueryForFixedColumn(item.Key, item.Operator)
|
||||
subQuery, err := existsSubQueryForFixedColumn(item.Key, item.Operator)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -312,7 +325,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3.
|
||||
}
|
||||
|
||||
if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
|
||||
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", tracesV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "%s)"
|
||||
filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", tracesV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)"
|
||||
}
|
||||
|
||||
switch mq.AggregateOperator {
|
||||
@@ -350,7 +363,7 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3.
|
||||
case v3.AggregateOperatorCount:
|
||||
if mq.AggregateAttribute.Key != "" {
|
||||
if mq.AggregateAttribute.IsColumn {
|
||||
subQuery, err := tracesV3.ExistsSubQueryForFixedColumn(mq.AggregateAttribute, v3.FilterOperatorExists)
|
||||
subQuery, err := existsSubQueryForFixedColumn(mq.AggregateAttribute, v3.FilterOperatorExists)
|
||||
if err == nil {
|
||||
filterSubQuery = fmt.Sprintf("%s AND %s", filterSubQuery, subQuery)
|
||||
}
|
||||
|
||||
@@ -265,9 +265,11 @@ func Test_buildTracesFilterQuery(t *testing.T) {
|
||||
{Key: v3.AttributeKey{Key: "isDone", DataType: v3.AttributeKeyDataTypeBool, Type: v3.AttributeKeyTypeTag}, Operator: v3.FilterOperatorNotExists},
|
||||
{Key: v3.AttributeKey{Key: "host1", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Operator: v3.FilterOperatorNotExists},
|
||||
{Key: v3.AttributeKey{Key: "path", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Operator: v3.FilterOperatorNotExists},
|
||||
{Key: v3.AttributeKey{Key: "http_url", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Operator: v3.FilterOperatorNotExists},
|
||||
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Operator: v3.FilterOperatorNotExists},
|
||||
}},
|
||||
},
|
||||
want: "mapContains(attributes_string, 'host') AND mapContains(attributes_number, 'duration') AND NOT mapContains(attributes_bool, 'isDone') AND NOT mapContains(attributes_string, 'host1') AND path = ''",
|
||||
want: "mapContains(attributes_string, 'host') AND mapContains(attributes_number, 'duration') AND NOT mapContains(attributes_bool, 'isDone') AND NOT mapContains(attributes_string, 'host1') AND `attribute_string_path` = '' AND http_url = '' AND `attribute_string_http$$route` = ''",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
@@ -683,7 +685,7 @@ func TestPrepareTracesQuery(t *testing.T) {
|
||||
},
|
||||
},
|
||||
want: "SELECT attributes_string['function'] as `function`, toFloat64(count(distinct(name))) as value from signoz_traces.distributed_signoz_index_v3 where " +
|
||||
"(timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND mapContains(attributes_string, 'function') AND (`function`) GLOBAL IN (%s) group by `function` order by value DESC",
|
||||
"(timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND mapContains(attributes_string, 'function') AND (`function`) GLOBAL IN (#LIMIT_PLACEHOLDER) group by `function` order by value DESC",
|
||||
},
|
||||
{
|
||||
name: "test with limit with resources- first",
|
||||
@@ -766,7 +768,7 @@ func TestPrepareTracesQuery(t *testing.T) {
|
||||
want: "SELECT `attribute_string_function` as `function`, serviceName as `serviceName`, toFloat64(count(distinct(name))) as value from signoz_traces.distributed_signoz_index_v3 " +
|
||||
"where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_number['line'] = 100 " +
|
||||
"AND (resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_traces.distributed_traces_v3_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) " +
|
||||
"AND simpleJSONExtractString(labels, 'hostname') = 'server1' AND labels like '%hostname%server1%')) AND (`function`,`serviceName`) GLOBAL IN (%s) group by `function`,`serviceName` order by value DESC",
|
||||
"AND simpleJSONExtractString(labels, 'hostname') = 'server1' AND labels like '%hostname%server1%')) AND (`function`,`serviceName`) GLOBAL IN (#LIMIT_PLACEHOLDER) group by `function`,`serviceName` order by value DESC",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -90,7 +90,7 @@ func EnableHostsInfraMonitoring() bool {
|
||||
return GetOrDefaultEnv("ENABLE_INFRA_METRICS", "true") == "true"
|
||||
}
|
||||
|
||||
var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "true")
|
||||
var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "false")
|
||||
|
||||
func IsDurationSortFeatureEnabled() bool {
|
||||
isDurationSortFeatureEnabledStr := DurationSortFeature
|
||||
|
||||
Reference in New Issue
Block a user