Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pg_exporter: predicate queries feature (WIP) #47

Merged
merged 2 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,20 @@ Config files are using YAML format, there are lots of examples in the [conf](htt
#┃ * '<tag>' means this query WILL be executed when exporter is tagged with '<tag>'
#┃ ( <tag> could not be cluster,primary,standby,master,replica,etc...)
#┃
#┃ # One or more "predicate queries" may be defined for a metric query. These
#┃ # are run before the main metric query (after any cache hit check). If all
#┃ # of them, when run sequentially, return a single row with a single column
#┃ # boolean true result, the main metric query is executed. If any of them
#┃ # return false or return zero rows, the main query is skipped. If any
#┃ # predicate query returns more than one row, a non-boolean result, or fails
#┃ # with an error the whole query is marked failed. Predicate queries can be
#┃ # used to check for the presence of specific functions, tables, extensions,
#┃ # settings, vendor-specific postgres features etc before running the main
#┃ # query.
#┃ predicate_queries:
#┃ - name: predicate query name
#┃ predicate_query: |
#┃ SELECT EXISTS (SELECT 1 FROM information_schema.routines WHERE routine_schema = 'pg_catalog' AND routine_name = 'pg_backup_start_time');
#┃
#┃ metrics: <---- List of returned columns, each column must have a `name` and `usage`, `rename` and `description` are optional
#┃ - timestamp: <---- Column name, should be exactly the same as returned column name
Expand Down Expand Up @@ -476,4 +490,4 @@ Contributors: https://github.com/Vonng/pg_exporter/graphs/contributors

License: [Apache Apache License Version 2.0](LICENSE)

Copyright: 2018-2024 [email protected]
Copyright: 2018-2024 [email protected]
14 changes: 14 additions & 0 deletions config/collector/000-doc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@
# * `<tag>` means this query WILL be executed when exporter is tagged with `<tag>`
# ( <tag> could not be cluster,primary,standby,master,replica,etc...)
#
# # One or more "predicate queries" may be defined for a metric query. These
# # are run before the main metric query (after any cache hit check). If all
# # of them, when run sequentially, return a single row with a single column
# # boolean true result, the main metric query is executed. If any of them
# # return false or return zero rows, the main query is skipped. If any
# # predicate query returns more than one row, a non-boolean result, or fails
# # with an error the whole query is marked failed. Predicate queries can be
# # used to check for the presence of specific functions, tables, extensions,
# # settings, vendor-specific postgres features etc before running the main
# # query.
# predicate_queries:
# - name: predicate query name
# predicate_query: |
# SELECT EXISTS (SELECT 1 FROM information_schema.routines WHERE routine_schema = 'pg_catalog' AND routine_name = 'pg_backup_start_time');
#
# metrics: # List of returned columns, each column must have a `name` and `usage`, `rename` and `description` are optional
# - timestamp: # Column name, should be exactly the same as returned column name
Expand Down
94 changes: 88 additions & 6 deletions exporter/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Collector struct {
result []prometheus.Metric // cached metrics
descriptors map[string]*prometheus.Desc // maps column index to descriptor, build on init
cacheHit bool // indicate last scrape was served from cache or real execution
predicateSkip string // if nonempty, predicate query caused skip of this scrape
err error

// stats
Expand Down Expand Up @@ -74,11 +75,17 @@ func (q *Collector) ResultSize() int {
return len(q.result)
}

// Error wraps query error
// Error wraps query error (including error in predicate query)
func (q *Collector) Error() error {
return q.err
}

// Did the last scrape skip due to predicate query and if so which predicate
// query caused the skip?
func (q *Collector) PredicateSkip() (bool, string) {
return q.predicateSkip != "", q.predicateSkip
}

// Duration returns last scrape duration in float64 seconds
func (q *Collector) Duration() float64 {
return q.scrapeDone.Sub(q.scrapeBegin).Seconds()
Expand All @@ -89,23 +96,98 @@ func (q *Collector) CacheHit() bool {
return q.cacheHit
}

// Run any predicate queries for this query. Return true only if all predicate queries pass.
// As a side effect sets predicateSkip to the first predicate query that failed, using
// the predicate query name if specified otherwise the index.
func (q *Collector) executePredicateQueries(ctx context.Context) bool {
for i, predicateQuery := range q.PredicateQueries {
predicateQueryName := predicateQuery.Name
if predicateQueryName == "" {
predicateQueryName = fmt.Sprintf("%d", i)
}
q.predicateSkip = predicateQueryName

msgPrefix := fmt.Sprintf("predicate query [%s] for query [%s] @ server [%s]", predicateQueryName, q.Name, q.Server.Database)

// Execute the predicate query.
logDebugf("%s executing predicate query", msgPrefix)
rows, err := q.Server.QueryContext(ctx, predicateQuery.SQL)
if err != nil {
// If a predicate query fails that's treated as a skip, and the err
// flag is set so Fatal will be respected if set.
if err == context.DeadlineExceeded { // timeout
q.err = fmt.Errorf("%s timeout because duration %v exceed limit %v",
msgPrefix, time.Now().Sub(q.scrapeBegin), q.TimeoutDuration())
} else {
q.err = fmt.Errorf("%s failed: %w", msgPrefix, err)
}
return false
}
defer rows.Close()

// The predicate passes if it returns exactly one row with one column
// that is a boolean true.
colTypes, err := rows.ColumnTypes()
if err != nil {
q.err = fmt.Errorf("%s failed to get column types: %w", msgPrefix, err)
}
if len(colTypes) != 1 {
q.err = fmt.Errorf("%s failed because it returned %d columns, expected 1", msgPrefix, len(colTypes))
}
if colTypes[0].DatabaseTypeName() != "BOOL" {
q.err = fmt.Errorf("%s failed because it returned a column of type %s, expected BOOL. Consider a CAST(colname AS boolean) or colname::boolean in the query.", msgPrefix, colTypes[0].DatabaseTypeName())
}
firstRow := true
predicatePass := sql.NullBool{}
for rows.Next() {
if ! firstRow {
q.err = fmt.Errorf("%s failed because it returned more than one row", msgPrefix)
return false
}
firstRow = false
err = rows.Scan(&predicatePass)
if err != nil {
q.err = fmt.Errorf("%s failed scanning in expected 1-row 1-column nullable boolean result: %w", msgPrefix, err)
return false
}
}
if ! (predicatePass.Valid && predicatePass.Bool) {
// succesfully executed predicate query requested a skip
logDebugf("%s returned false, null or zero rows, skipping query", msgPrefix)
return false
}
logDebugf("%s returned true", msgPrefix)
}
// If we get here, all predicate queries passed.
q.predicateSkip = ""
return true
}

// execute will run this query to registered server, result and err are registered
func (q *Collector) execute() {
q.result = q.result[:0] // reset cache
var rows *sql.Rows
var err error

// execution
ctx := context.Background()
if q.Timeout != 0 { // if timeout is provided, use context
logDebugf("query [%s] @ server [%s] executing begin with time limit: %v", q.Name, q.Server.Database, q.TimeoutDuration())
ctx, cancel := context.WithTimeout(context.Background(), q.TimeoutDuration())
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), q.TimeoutDuration())
defer cancel()
rows, err = q.Server.QueryContext(ctx, q.SQL)
} else {
logDebugf("query [%s] executing begin", q.Name)
rows, err = q.Server.Query(q.SQL)
logDebugf("query [%s] @ server [%s] executing begin", q.Server.Database, q.Name)
}

// Check predicate queries if any
if predicatePass := q.executePredicateQueries(ctx); !predicatePass {
// predicateSkip and err if appropriate were set as side-effects
return
}

// main query execution
rows, err = q.Server.QueryContext(ctx, q.SQL)

// error handling: if query failed because of timeout or error, record and return
if err != nil {
if err == context.DeadlineExceeded { // timeout
Expand Down
10 changes: 10 additions & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Exporter struct {
queryCacheTTL *prometheus.GaugeVec // {datname,query} query cache ttl
queryScrapeTotalCount *prometheus.GaugeVec // {datname,query} query level: how many errors the query triggers?
queryScrapeErrorCount *prometheus.GaugeVec // {datname,query} query level: how many errors the query triggers?
queryScrapePredicateSkipCount *prometheus.GaugeVec // {datname,query} query level: how many times was the query skipped due to predicate
queryScrapeDuration *prometheus.GaugeVec // {datname,query} query level: how many seconds the query spends?
queryScrapeMetricCount *prometheus.GaugeVec // {datname,query} query level: how many metrics the query returns?
queryScrapeHitCount *prometheus.GaugeVec // {datname,query} query level: how many errors the query triggers?
Expand Down Expand Up @@ -141,6 +142,7 @@ func (e *Exporter) collectServerMetrics(s *Server) {
e.queryCacheTTL.Reset()
e.queryScrapeTotalCount.Reset()
e.queryScrapeErrorCount.Reset()
e.queryScrapePredicateSkipCount.Reset()
e.queryScrapeDuration.Reset()
e.queryScrapeMetricCount.Reset()
e.queryScrapeHitCount.Reset()
Expand All @@ -167,6 +169,9 @@ func (e *Exporter) collectServerMetrics(s *Server) {
for queryName, counter := range s.queryScrapeErrorCount {
e.queryScrapeErrorCount.WithLabelValues(s.Database, queryName).Set(counter)
}
for queryName, counter := range s.queryScrapePredicateSkipCount {
e.queryScrapePredicateSkipCount.WithLabelValues(s.Database, queryName).Set(counter)
}
for queryName, counter := range s.queryScrapeMetricCount {
e.queryScrapeMetricCount.WithLabelValues(s.Database, queryName).Set(counter)
}
Expand Down Expand Up @@ -297,6 +302,10 @@ func (e *Exporter) setupInternalMetrics() {
Namespace: e.namespace, ConstLabels: e.constLabels,
Subsystem: "exporter_query", Name: "scrape_error_count", Help: "times the query failed",
}, []string{"datname", "query"})
e.queryScrapePredicateSkipCount = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: e.namespace, ConstLabels: e.constLabels,
Subsystem: "exporter_query", Name: "scrape_predicate_skip_count", Help: "times the query was skipped due to a predicate returning false",
}, []string{"datname", "query"})
e.queryScrapeDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: e.namespace, ConstLabels: e.constLabels,
Subsystem: "exporter_query", Name: "scrape_duration", Help: "seconds query spending on scraping",
Expand Down Expand Up @@ -333,6 +342,7 @@ func (e *Exporter) collectInternalMetrics(ch chan<- prometheus.Metric) {
e.queryCacheTTL.Collect(ch)
e.queryScrapeTotalCount.Collect(ch)
e.queryScrapeErrorCount.Collect(ch)
e.queryScrapePredicateSkipCount.Collect(ch)
e.queryScrapeDuration.Collect(ch)
e.queryScrapeMetricCount.Collect(ch)
e.queryScrapeHitCount.Collect(ch)
Expand Down
32 changes: 25 additions & 7 deletions exporter/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Query struct {
Name string `yaml:"name,omitempty"` // actual query name, used as metric prefix
Desc string `yaml:"desc,omitempty"` // description of this metric query
SQL string `yaml:"query"` // SQL command to fetch metrics
PredicateQueries []PredicateQuery `yaml:"predicate_queries,omitempty"` // SQL command to filter metrics
Branch string `yaml:"-"` // branch name, top layer key of config file

// control query behaviour
Expand All @@ -39,6 +40,14 @@ type Query struct {
MetricNames []string `yaml:"-"` // column (name) that used as metric
}

// A PredicateQuery is a query that returns a 1-column resultset that's used to decide whether
// to run the main query.
type PredicateQuery struct {
Name string `yaml:"name,omitempty"` // predicate query name, only used for logging
SQL string `yaml:"predicate_query"` // SQL command to return a predicate
TTL float64 `yaml:"ttl,omitempty"` // How long to cache results for
}

var queryTemplate, _ = template.New("Query").Parse(`##
# SYNOPSIS
# {{ .Name }}{{ if ne .Name .Branch }}.{{ .Branch }}{{ end }}_*
Expand Down Expand Up @@ -68,12 +77,21 @@ var htmlTemplate, _ = template.New("Query").Parse(`

<h2>{{ .Name }}</h2>
<p>{{ .Desc }}</p>

{{ if len(.PredicateQueries) > 0 }}
<h4>Predicate queries</h4>
<table style="border-style: dotted;">
<thhead><th>Name</th> <th>SQL</th> <th>Cache TTL</th></thead>
<tbody>
{{ range .PredicateQueries }}
<tr><td>{{ .Name }}</td><td><code>{{ html .SQL }}</code></td><td>{{if ne .TTL 0}}{{ .TTL }}s{{else}}<i>not cached</i>{{end}}</td></tr>
{{ end }}
</tbody></table>
{{ end }}
<h4>Query</h4>
<code><pre>{{ .SQL }}</pre></code>

<h4>Attribution</h4>
<code><table style="border-style: dotted;"><tbdoy>
<code><table style="border-style: dotted;"><tbody>
<tr><td>Branch </td> <td> {{ .Branch }} </td></tr>
<tr><td>TTL </td> <td> {{ .TTL }} </td></tr>
<tr><td>Priority </td> <td> {{ .Priority }} </td></tr>
Expand All @@ -82,17 +100,17 @@ var htmlTemplate, _ = template.New("Query").Parse(`
<tr><td>Version </td> <td> {{if ne .MinVersion 0}}{{ .MinVersion }}{{else}}lower{{end}} ~ {{if ne .MaxVersion 0}}{{ .MaxVersion }}{{else}}higher{{end}} </td></tr>
<tr><td>Tags </td> <td> {{ .Tags }} </td></tr>
<tr><td>Source </td> <td> {{ .Path }} </td></tr>
<tbdoy></table></code>
<tbody></table></code>

<h4>Columns</h4>
<code><table "align="left" style="border-style: dotted;"><thead><th>Name</th> <th>Usage</th> <th>Rename</th> <th>Bucket</th> <th>Scale</th> <th>Default</th> <th>Description</th></thead>
<tbdoy>{{ range .ColumnList }}<tr><td>{{ .Name }}</td><td>{{ .Usage }}</td><td>{{ .Rename }}</td><td>{{ .Bucket }}</td><td>{{ .Scale }}</td><td>{{ .Default }}</td><td>{{ .Desc }}</td></tr>{{ end }}
<tbdoy></table></code>
<tbody>{{ range .ColumnList }}<tr><td>{{ .Name }}</td><td>{{ .Usage }}</td><td>{{ .Rename }}</td><td>{{ .Bucket }}</td><td>{{ .Scale }}</td><td>{{ .Default }}</td><td>{{ .Desc }}</td></tr>{{ end }}
<tbody></table></code>

<h4>Metrics</h4>
<code><table "align="left" style="border-style: dotted;"><thead><th>Name</th> <th>Usage</th> <th>Desc</th></th></thead><tbdoy>
<code><table "align="left" style="border-style: dotted;"><thead><th>Name</th> <th>Usage</th> <th>Desc</th></th></thead><tbody>
{{ range .MetricList }}<tr><td>{{ .Name }}</td><td>{{ .Column.Usage }}</td><td>{{ .Column.Desc }}</td></tr>{{ end }}
<tbdoy></table></code>
<tbody></table></code>
</div>
`)

Expand Down
19 changes: 17 additions & 2 deletions exporter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Server struct {
queryScrapeTotalCount map[string]float64 // internal query metrics: total executed
queryScrapeHitCount map[string]float64 // internal query metrics: times serving from hit cache
queryScrapeErrorCount map[string]float64 // internal query metrics: times failed
queryScrapePredicateSkipCount map[string]float64 // internal query metrics: times skipped due to predicate
queryScrapeMetricCount map[string]float64 // internal query metrics: number of metrics scraped
queryScrapeDuration map[string]float64 // internal query metrics: time spend on executing
}
Expand Down Expand Up @@ -309,6 +310,7 @@ func (s *Server) ResetStats() {
s.queryScrapeTotalCount = make(map[string]float64, 0)
s.queryScrapeHitCount = make(map[string]float64, 0)
s.queryScrapeErrorCount = make(map[string]float64, 0)
s.queryScrapePredicateSkipCount = make(map[string]float64, 0)
s.queryScrapeMetricCount = make(map[string]float64, 0)
s.queryScrapeDuration = make(map[string]float64, 0)

Expand All @@ -317,6 +319,9 @@ func (s *Server) ResetStats() {
s.queryScrapeTotalCount[query.Name] = 0
s.queryScrapeHitCount[query.Name] = 0
s.queryScrapeErrorCount[query.Name] = 0
if len(query.PredicateQueries) > 0 {
s.queryScrapePredicateSkipCount[query.Name] = 0
}
s.queryScrapeMetricCount[query.Name] = 0
s.queryScrapeDuration[query.Name] = 0
}
Expand Down Expand Up @@ -441,13 +446,14 @@ func (s *Server) Stat() string {
// logErrorf("fail to generate server stats html")
// return fmt.Sprintf("fail to generate server stat html, %s", err.Error())
//}
buf.WriteString(fmt.Sprintf("%-24s %-10s %-10s %-10s %-10s %-6s %-10s\n", "name", "total", "hit", "error", "metric", "ttl/s", "duration/ms"))
buf.WriteString(fmt.Sprintf("%-24s %-10s %-10s %-10s %-10s %-10s %-6s %-10s\n", "name", "total", "hit", "error", "skip", "metric", "ttl/s", "duration/ms"))
for _, query := range s.Collectors {
buf.WriteString(fmt.Sprintf("%-24s %-10d %-10d %-10d %-10d %-6d %-10f\n",
buf.WriteString(fmt.Sprintf("%-24s %-10d %-10d %-10d %-10d %-10d %-6d %-10f\n",
query.Name,
int(s.queryScrapeTotalCount[query.Name]),
int(s.queryScrapeHitCount[query.Name]),
int(s.queryScrapeErrorCount[query.Name]),
int(s.queryScrapePredicateSkipCount[query.Name]),
int(s.queryScrapeMetricCount[query.Name]),
int(s.queryCacheTTL[query.Name]),
s.queryScrapeDuration[query.Name]*1000,
Expand Down Expand Up @@ -510,6 +516,15 @@ func (s *Server) Collect(ch chan<- prometheus.Metric) {
s.queryScrapeHitCount[query.Name]++
}
}
// TODO add label for which predicate caused skip?
if len(query.PredicateQueries) > 0 {
skipped, _ := query.PredicateSkip()
if skipped {
s.queryScrapePredicateSkipCount[query.Name]++
} else {
s.queryScrapePredicateSkipCount[query.Name] = 0
}
}
}

final:
Expand Down