Compare commits

...

2 Commits

Author SHA1 Message Date
Nityananda Gohain
2613313b9f Merge branch 'main' into issue_3812 2026-02-19 19:26:51 +05:30
nityanandagohain
991854680e fix: instrumentation changes to capture query duration properly 2026-02-19 19:14:53 +05:30

View File

@@ -40,7 +40,6 @@ func New(ctx context.Context, providerSettings factory.ProviderSettings, config
return nil, err
}
hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories))
for i, hookFactory := range hookFactories {
hook, err := hookFactory.New(ctx, providerSettings, config)
@@ -78,16 +77,52 @@ func (p *provider) Stats() driver.Stats {
return p.clickHouseConn.Stats()
}
// rowsWithHooks wraps driver.Rows and defers AfterQuery hooks until Close(),
// so the instrumentation span covers the full query lifecycle including row consumption.
type rowsWithHooks struct {
driver.Rows
ctx context.Context
event *telemetrystore.QueryEvent
onClose func()
closed bool
}
func (r *rowsWithHooks) Close() error {
// delegate to the original rows.Close() if already closed
if r.closed {
return r.Rows.Close()
}
// mark as closed and run the onClose hook
r.closed = true
if err := r.Rows.Err(); err != nil {
r.event.Err = err
}
closeErr := r.Rows.Close()
if closeErr != nil {
r.event.Err = closeErr
}
r.onClose()
return closeErr
}
func (p *provider) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
event := telemetrystore.NewQueryEvent(query, args)
ctx = telemetrystore.WrapBeforeQuery(p.hooks, ctx, event)
rows, err := p.clickHouseConn.Query(ctx, query, args...)
if err != nil {
event.Err = err
telemetrystore.WrapAfterQuery(p.hooks, ctx, event)
return nil, err
}
event.Err = err
telemetrystore.WrapAfterQuery(p.hooks, ctx, event)
return rows, err
return &rowsWithHooks{
Rows: rows,
ctx: ctx,
event: event,
onClose: func() { telemetrystore.WrapAfterQuery(p.hooks, ctx, event) },
}, nil
}
func (p *provider) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {