Skip to content

Commit

Permalink
pg_exporter: predicate queries feature (WIP)
Browse files Browse the repository at this point in the history
**WIP**:

* has a TODO about adding name of failing predicate query to metric set
* defines the TTL field but does not implement it
* lacks test cover

Add "predicate queries" to decide whether a given metric query should
be scraped at runtime. Fixes #41.

These can be used to express complex conditions that may change over
time, such as "collect this metric set if the foo extension is installed
and is at least version 2.1", "collect this metric only when running on
(some specific vendor postgres flavour)" or "only try to collect this if
this specific table exists and has this specific column".

This helps with maintaining a single configuration to support multiple
postgres versions, vendor flavours, access privilege levels, installed
extensions and extension versions, etc. E.g. you can selectively query
`pg_stat_statements` only if the extension is installed in the current
DB.

The new syntax is added at query level in the configuration and looks
like this:

  rowcount_in_foo_table:
    predicate_queries:
      - name: foo_table exists
        predicate_query: |
          SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'foo')
    query: |
      SELECT count(1) FROM public.foo;
  • Loading branch information
ringerc committed Jun 17, 2024
1 parent 01bbe00 commit 03465d7
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 16 deletions.
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,20 @@ Config files are using YAML format, there are lots of examples in the [conf](htt
#┃ * '<tag>' means this query WILL be executed when exporter is tagged with '<tag>'
#┃ ( <tag> could not be cluster,primary,standby,master,replica,etc...)
#
#┃ # One or more "predicate queries" may be defined for a metric query. These
#┃ # are run before the main metric query (after any cache hit check). If all
#┃ # of them, when run sequentially, return a single row with a single column
#┃ # boolean true result, the main metric query is executed. If any of them
#┃ # return false or return zero rows, the main query is skipped. If any
#┃ # predicate query returns more than one row, a non-boolean result, or fails
#┃ # with an error the whole query is marked failed. Predicate queries can be
#┃ # used to check for the presence of specific functions, tables, extensions,
#┃ # settings, vendor-specific postgres features etc before running the main
#┃ # query.
#┃ predicate_queries:
#┃ - name: predicate query name
#┃ predicate_query: |
#┃ SELECT EXISTS (SELECT 1 FROM information_schema.routines WHERE routine_schema = 'pg_catalog' AND routine_name = 'pg_backup_start_time');
#
#┃ metrics: <---- List of returned columns, each column must have a `name` and `usage`, `rename` and `description` are optional
#┃ - timestamp: <---- Column name, should be exactly the same as returned column name
Expand Down Expand Up @@ -473,4 +487,4 @@ Author: [Vonng](https://vonng.com/en) ([[email protected]](mailto:[email protected]))

License: [Apache Apache License Version 2.0](LICENSE)

Copyright: 2018-2023 [email protected]
Copyright: 2018-2023 [email protected]
14 changes: 14 additions & 0 deletions config/collector/000-doc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@
# * `<tag>` means this query WILL be executed when exporter is tagged with `<tag>`
# ( <tag> could not be cluster,primary,standby,master,replica,etc...)
#
# # One or more "predicate queries" may be defined for a metric query. These
# # are run before the main metric query (after any cache hit check). If all
# # of them, when run sequentially, return a single row with a single column
# # boolean true result, the main metric query is executed. If any of them
# # return false or return zero rows, the main query is skipped. If any
# # predicate query returns more than one row, a non-boolean result, or fails
# # with an error the whole query is marked failed. Predicate queries can be
# # used to check for the presence of specific functions, tables, extensions,
# # settings, vendor-specific postgres features etc before running the main
# # query.
# predicate_queries:
# - name: predicate query name
# predicate_query: |
# SELECT EXISTS (SELECT 1 FROM information_schema.routines WHERE routine_schema = 'pg_catalog' AND routine_name = 'pg_backup_start_time');
#
# metrics: # List of returned columns, each column must have a `name` and `usage`, `rename` and `description` are optional
# - timestamp: # Column name, should be exactly the same as returned column name
Expand Down
94 changes: 88 additions & 6 deletions exporter/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Collector struct {
result []prometheus.Metric // cached metrics
descriptors map[string]*prometheus.Desc // maps column index to descriptor, build on init
cacheHit bool // indicate last scrape was served from cache or real execution
predicateSkip string // if nonempty, predicate query caused skip of this scrape
err error

// stats
Expand Down Expand Up @@ -74,11 +75,17 @@ func (q *Collector) ResultSize() int {
return len(q.result)
}

// Error wraps query error
// Error wraps query error (including error in predicate query)
func (q *Collector) Error() error {
return q.err
}

// Did the last scrape skip due to predicate query and if so which predicate
// query caused the skip?
func (q *Collector) PredicateSkip() (bool, string) {
return q.predicateSkip != "", q.predicateSkip
}

// Duration returns last scrape duration in float64 seconds
func (q *Collector) Duration() float64 {
return q.scrapeDone.Sub(q.scrapeBegin).Seconds()
Expand All @@ -89,23 +96,98 @@ func (q *Collector) CacheHit() bool {
return q.cacheHit
}

// Run any predicate queries for this query. Return true only if all predicate queries pass.
// As a side effect sets predicateSkip to the first predicate query that failed, using
// the predicate query name if specified otherwise the index.
func (q *Collector) executePredicateQueries(ctx context.Context) bool {
for i, predicateQuery := range q.PredicateQueries {
predicateQueryName := predicateQuery.Name
if predicateQueryName == "" {
predicateQueryName = fmt.Sprintf("%d", i)
}
q.predicateSkip = predicateQueryName

msgPrefix := fmt.Sprintf("predicate query [%s] for query [%s] @ server [%s]", predicateQueryName, q.Name, q.Server.Database)

// Execute the predicate query.
logDebugf("%s executing predicate query", msgPrefix)
rows, err := q.Server.QueryContext(ctx, predicateQuery.SQL)
if err != nil {
// If a predicate query fails that's treated as a skip, and the err
// flag is set so Fatal will be respected if set.
if err == context.DeadlineExceeded { // timeout
q.err = fmt.Errorf("%s timeout because duration %v exceed limit %v",
msgPrefix, time.Now().Sub(q.scrapeBegin), q.TimeoutDuration())
} else {
q.err = fmt.Errorf("%s failed: %w", msgPrefix, err)
}
return false
}
defer rows.Close()

// The predicate passes if it returns exactly one row with one column
// that is a boolean true.
colTypes, err := rows.ColumnTypes()
if err != nil {
q.err = fmt.Errorf("%s failed to get column types: %w", msgPrefix, err)
}
if len(colTypes) != 1 {
q.err = fmt.Errorf("%s failed because it returned %d columns, expected 1", msgPrefix, len(colTypes))
}
if colTypes[0].DatabaseTypeName() != "BOOL" {
q.err = fmt.Errorf("%s failed because it returned a column of type %s, expected BOOL. Consider a CAST(colname AS boolean) or colname::boolean in the query.", msgPrefix, colTypes[0].DatabaseTypeName())
}
firstRow := true
predicatePass := sql.NullBool{}
for rows.Next() {
if ! firstRow {
q.err = fmt.Errorf("%s failed because it returned more than one row", msgPrefix)
return false
}
firstRow = false
err = rows.Scan(&predicatePass)
if err != nil {
q.err = fmt.Errorf("%s failed scanning in expected 1-row 1-column nullable boolean result: %w", msgPrefix, err)
return false
}
}
if ! (predicatePass.Valid && predicatePass.Bool) {
// succesfully executed predicate query requested a skip
logDebugf("%s returned false, null or zero rows, skipping query", msgPrefix)
return false
}
logDebugf("%s returned true", msgPrefix)
}
// If we get here, all predicate queries passed.
q.predicateSkip = ""
return true
}

// execute will run this query to registered server, result and err are registered
func (q *Collector) execute() {
q.result = q.result[:0] // reset cache
var rows *sql.Rows
var err error

// execution
ctx := context.Background()
if q.Timeout != 0 { // if timeout is provided, use context
logDebugf("query [%s] @ server [%s] executing begin with time limit: %v", q.Name, q.Server.Database, q.TimeoutDuration())
ctx, cancel := context.WithTimeout(context.Background(), q.TimeoutDuration())
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), q.TimeoutDuration())
defer cancel()
rows, err = q.Server.QueryContext(ctx, q.SQL)
} else {
logDebugf("query [%s] executing begin", q.Name)
rows, err = q.Server.Query(q.SQL)
logDebugf("query [%s] @ server [%s] executing begin", q.Server.Database, q.Name)
}

// Check predicate queries if any
if predicatePass := q.executePredicateQueries(ctx); !predicatePass {
// predicateSkip and err if appropriate were set as side-effects
return
}

// main query execution
rows, err = q.Server.QueryContext(ctx, q.SQL)

// error handling: if query failed because of timeout or error, record and return
if err != nil {
if err == context.DeadlineExceeded { // timeout
Expand Down
10 changes: 10 additions & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Exporter struct {
queryCacheTTL *prometheus.GaugeVec // {datname,query} query cache ttl
queryScrapeTotalCount *prometheus.GaugeVec // {datname,query} query level: how many errors the query triggers?
queryScrapeErrorCount *prometheus.GaugeVec // {datname,query} query level: how many errors the query triggers?
queryScrapePredicateSkipCount *prometheus.GaugeVec // {datname,query} query level: how many times was the query skipped due to predicate
queryScrapeDuration *prometheus.GaugeVec // {datname,query} query level: how many seconds the query spends?
queryScrapeMetricCount *prometheus.GaugeVec // {datname,query} query level: how many metrics the query returns?
queryScrapeHitCount *prometheus.GaugeVec // {datname,query} query level: how many errors the query triggers?
Expand Down Expand Up @@ -141,6 +142,7 @@ func (e *Exporter) collectServerMetrics(s *Server) {
e.queryCacheTTL.Reset()
e.queryScrapeTotalCount.Reset()
e.queryScrapeErrorCount.Reset()
e.queryScrapePredicateSkipCount.Reset()
e.queryScrapeDuration.Reset()
e.queryScrapeMetricCount.Reset()
e.queryScrapeHitCount.Reset()
Expand All @@ -167,6 +169,9 @@ func (e *Exporter) collectServerMetrics(s *Server) {
for queryName, counter := range s.queryScrapeErrorCount {
e.queryScrapeErrorCount.WithLabelValues(s.Database, queryName).Set(counter)
}
for queryName, counter := range s.queryScrapePredicateSkipCount {
e.queryScrapePredicateSkipCount.WithLabelValues(s.Database, queryName).Set(counter)
}
for queryName, counter := range s.queryScrapeMetricCount {
e.queryScrapeMetricCount.WithLabelValues(s.Database, queryName).Set(counter)
}
Expand Down Expand Up @@ -295,6 +300,10 @@ func (e *Exporter) setupInternalMetrics() {
Namespace: e.namespace, ConstLabels: e.constLabels,
Subsystem: "exporter_query", Name: "scrape_error_count", Help: "times the query failed",
}, []string{"datname", "query"})
e.queryScrapePredicateSkipCount = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: e.namespace, ConstLabels: e.constLabels,
Subsystem: "exporter_query", Name: "scrape_predicate_skip_count", Help: "times the query was skipped due to a predicate returning false",
}, []string{"datname", "query"})
e.queryScrapeDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: e.namespace, ConstLabels: e.constLabels,
Subsystem: "exporter_query", Name: "scrape_duration", Help: "seconds query spending on scrapping",
Expand Down Expand Up @@ -331,6 +340,7 @@ func (e *Exporter) collectInternalMetrics(ch chan<- prometheus.Metric) {
e.queryCacheTTL.Collect(ch)
e.queryScrapeTotalCount.Collect(ch)
e.queryScrapeErrorCount.Collect(ch)
e.queryScrapePredicateSkipCount.Collect(ch)
e.queryScrapeDuration.Collect(ch)
e.queryScrapeMetricCount.Collect(ch)
e.queryScrapeHitCount.Collect(ch)
Expand Down
32 changes: 25 additions & 7 deletions exporter/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Query struct {
Name string `yaml:"name"` // actual query name, used as metric prefix
Desc string `yaml:"desc"` // description of this metric query
SQL string `yaml:"query"` // SQL command to fetch metrics
PredicateQueries []PredicateQuery `yaml:"predicate_queries,omitempty"` // SQL command to filter metrics
Branch string `yaml:"-"` // branch name, top layer key of config file

// control query behaviour
Expand All @@ -39,6 +40,14 @@ type Query struct {
MetricNames []string `yaml:"-"` // column (name) that used as metric
}

// A PredicateQuery is a query that returns a 1-column resultset that's used to decide whether
// to run the main query.
type PredicateQuery struct {
Name string `yaml:"name,omitempty"` // predicate query name, only used for logging
SQL string `yaml:"predicate_query"` // SQL command to return a predicate
TTL float64 `yaml:"ttl,omitempty"` // How long to cache results for
}

var queryTemplate, _ = template.New("Query").Parse(`##
# SYNOPSIS
# {{ .Name }}{{ if ne .Name .Branch }}.{{ .Branch }}{{ end }}_*
Expand Down Expand Up @@ -68,12 +77,21 @@ var htmlTemplate, _ = template.New("Query").Parse(`
<h2>{{ .Name }}</h2>
<p>{{ .Desc }}</p>
{{ if len(.PredicateQueries) > 0 }}
<h4>Predicate queries</h4>
<table style="border-style: dotted;">
<thhead><th>Name</th> <th>SQL</th> <th>Cache TTL</th></thead>
<tbody>
{{ range .PredicateQueries }}
<tr><td>{{ .Name }}</td><td><code>{{ html .SQL }}</code></td><td>{{if ne .TTL 0}}{{ .TTL }}s{{else}}<i>not cached</i>{{end}}</td></tr>
{{ end }}
</tbody></table>
{{ end }}
<h4>Query</h4>
<code><pre>{{ .SQL }}</pre></code>
<h4>Attribution</h4>
<code><table style="border-style: dotted;"><tbdoy>
<code><table style="border-style: dotted;"><tbody>
<tr><td>Branch </td> <td> {{ .Branch }} </td></tr>
<tr><td>TTL </td> <td> {{ .TTL }} </td></tr>
<tr><td>Priority </td> <td> {{ .Priority }} </td></tr>
Expand All @@ -82,17 +100,17 @@ var htmlTemplate, _ = template.New("Query").Parse(`
<tr><td>Version </td> <td> {{if ne .MinVersion 0}}{{ .MinVersion }}{{else}}lower{{end}} ~ {{if ne .MaxVersion 0}}{{ .MaxVersion }}{{else}}higher{{end}} </td></tr>
<tr><td>Tags </td> <td> {{ .Tags }} </td></tr>
<tr><td>Source </td> <td> {{ .Path }} </td></tr>
<tbdoy></table></code>
<tbody></table></code>
<h4>Columns</h4>
<code><table "align="left" style="border-style: dotted;"><thead><th>Name</th> <th>Usage</th> <th>Rename</th> <th>Bucket</th> <th>Scale</th> <th>Default</th> <th>Description</th></thead>
<tbdoy>{{ range .ColumnList }}<tr><td>{{ .Name }}</td><td>{{ .Usage }}</td><td>{{ .Rename }}</td><td>{{ .Bucket }}</td><td>{{ .Scale }}</td><td>{{ .Default }}</td><td>{{ .Desc }}</td></tr>{{ end }}
<tbdoy></table></code>
<tbody>{{ range .ColumnList }}<tr><td>{{ .Name }}</td><td>{{ .Usage }}</td><td>{{ .Rename }}</td><td>{{ .Bucket }}</td><td>{{ .Scale }}</td><td>{{ .Default }}</td><td>{{ .Desc }}</td></tr>{{ end }}
<tbody></table></code>
<h4>Metrics</h4>
<code><table "align="left" style="border-style: dotted;"><thead><th>Name</th> <th>Usage</th> <th>Desc</th></th></thead><tbdoy>
<code><table "align="left" style="border-style: dotted;"><thead><th>Name</th> <th>Usage</th> <th>Desc</th></th></thead><tbody>
{{ range .MetricList }}<tr><td>{{ .Name }}</td><td>{{ .Column.Usage }}</td><td>{{ .Column.Desc }}</td></tr>{{ end }}
<tbdoy></table></code>
<tbody></table></code>
</div>
`)

Expand Down
19 changes: 17 additions & 2 deletions exporter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Server struct {
queryScrapeTotalCount map[string]float64 // internal query metrics: total executed
queryScrapeHitCount map[string]float64 // internal query metrics: times serving from hit cache
queryScrapeErrorCount map[string]float64 // internal query metrics: times failed
queryScrapePredicateSkipCount map[string]float64 // internal query metrics: times skipped due to predicate
queryScrapeMetricCount map[string]float64 // internal query metrics: number of metrics scrapped
queryScrapeDuration map[string]float64 // internal query metrics: time spend on executing
}
Expand Down Expand Up @@ -308,6 +309,7 @@ func (s *Server) ResetStats() {
s.queryScrapeTotalCount = make(map[string]float64, 0)
s.queryScrapeHitCount = make(map[string]float64, 0)
s.queryScrapeErrorCount = make(map[string]float64, 0)
s.queryScrapePredicateSkipCount = make(map[string]float64, 0)
s.queryScrapeMetricCount = make(map[string]float64, 0)
s.queryScrapeDuration = make(map[string]float64, 0)

Expand All @@ -316,6 +318,9 @@ func (s *Server) ResetStats() {
s.queryScrapeTotalCount[query.Name] = 0
s.queryScrapeHitCount[query.Name] = 0
s.queryScrapeErrorCount[query.Name] = 0
if len(query.PredicateQueries) > 0 {
s.queryScrapePredicateSkipCount[query.Name] = 0
}
s.queryScrapeMetricCount[query.Name] = 0
s.queryScrapeDuration[query.Name] = 0
}
Expand Down Expand Up @@ -440,13 +445,14 @@ func (s *Server) Stat() string {
// logErrorf("fail to generate server stats html")
// return fmt.Sprintf("fail to generate server stat html, %s", err.Error())
//}
buf.WriteString(fmt.Sprintf("%-24s %-10s %-10s %-10s %-10s %-6s %-10s\n", "name", "total", "hit", "error", "metric", "ttl/s", "duration/ms"))
buf.WriteString(fmt.Sprintf("%-24s %-10s %-10s %-10s %-10s %-10s %-6s %-10s\n", "name", "total", "hit", "error", "skip", "metric", "ttl/s", "duration/ms"))
for _, query := range s.Collectors {
buf.WriteString(fmt.Sprintf("%-24s %-10d %-10d %-10d %-10d %-6d %-10f\n",
buf.WriteString(fmt.Sprintf("%-24s %-10d %-10d %-10d %-10d %-10d %-6d %-10f\n",
query.Name,
int(s.queryScrapeTotalCount[query.Name]),
int(s.queryScrapeHitCount[query.Name]),
int(s.queryScrapeErrorCount[query.Name]),
int(s.queryScrapePredicateSkipCount[query.Name]),
int(s.queryScrapeMetricCount[query.Name]),
int(s.queryCacheTTL[query.Name]),
s.queryScrapeDuration[query.Name]*1000,
Expand Down Expand Up @@ -509,6 +515,15 @@ func (s *Server) Collect(ch chan<- prometheus.Metric) {
s.queryScrapeHitCount[query.Name]++
}
}
// TODO add label for which predicate caused skip?
if len(query.PredicateQueries) > 0 {
skipped, _ := query.PredicateSkip()
if skipped {
s.queryScrapePredicateSkipCount[query.Name]++
} else {
s.queryScrapePredicateSkipCount[query.Name] = 0
}
}
}

final:
Expand Down

0 comments on commit 03465d7

Please sign in to comment.