Skip to content

Commit

Permalink
feat(outputs): add support for hostname, tags, custom fields, and tem…
Browse files Browse the repository at this point in the history
…plated fields to TimesacleDB output

Signed-off-by: alika <[email protected]>
  • Loading branch information
alika authored and poiana committed Apr 26, 2023
1 parent d0eda67 commit 215930f
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 15 deletions.
34 changes: 30 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1254,17 +1254,43 @@ permissions to access the resources you selected to use, like `SQS`, `Lambda`,
To use TimescaleDB you should create the Hypertable first, following this example

```sql
CREATE TABLE falco_events (
CREATE TABLE falcosidekick_events (
time TIMESTAMPTZ NOT NULL,
rule TEXT,
priority VARCHAR(20),
source VARCHAR(20),
output TEXT
output TEXT,
tags TEXT,
hostname TEXT,
);
SELECT create_hypertable('falco_events', 'time');
SELECT create_hypertable('falcosidekick_events', 'time');
```

The name from the table should match with the `hypertable` output configuration.
To support [`customfields` or `templatedfields`](#yaml-file) you need to ensure you add the corresponding fields to the Hypertable, for example:

```yaml
customfields:
custom_field_1: "custom-value-1"
templatedfields:
k8s_namespace: '{{ or (index . "k8s.ns.name") "null" }}'
```
```sql
CREATE TABLE falcosidekick_events (
time TIMESTAMPTZ NOT NULL,
rule TEXT,
priority VARCHAR(20),
source VARCHAR(20),
output TEXT,
tags TEXT,
hostname TEXT,
custom_field_1 TEXT,
k8s_namespace TEXT
);
SELECT create_hypertable('falcosidekick_events', 'time');
```

The name from the table should match with the `hypertable` output configuration. The TimescaleDB output processor will insert SQL nulls when it encounters a string field value of `"null"`.

## Examples

Expand Down
16 changes: 9 additions & 7 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ import (

func getConfig() *types.Configuration {
c := &types.Configuration{
Customfields: make(map[string]string),
Grafana: types.GrafanaOutputConfig{CustomHeaders: make(map[string]string)},
Loki: types.LokiOutputConfig{CustomHeaders: make(map[string]string)},
Elasticsearch: types.ElasticsearchOutputConfig{CustomHeaders: make(map[string]string)},
Webhook: types.WebhookOutputConfig{CustomHeaders: make(map[string]string)},
Alertmanager: types.AlertmanagerOutputConfig{ExtraLabels: make(map[string]string), ExtraAnnotations: make(map[string]string)},
CloudEvents: types.CloudEventsOutputConfig{Extensions: make(map[string]string)},
Customfields: make(map[string]string),
Templatedfields: make(map[string]string),
Grafana: types.GrafanaOutputConfig{CustomHeaders: make(map[string]string)},
Loki: types.LokiOutputConfig{CustomHeaders: make(map[string]string)},
Elasticsearch: types.ElasticsearchOutputConfig{CustomHeaders: make(map[string]string)},
Webhook: types.WebhookOutputConfig{CustomHeaders: make(map[string]string)},
Alertmanager: types.AlertmanagerOutputConfig{ExtraLabels: make(map[string]string), ExtraAnnotations: make(map[string]string)},
CloudEvents: types.CloudEventsOutputConfig{Extensions: make(map[string]string)},
}

configFile := kingpin.Flag("config-file", "config file").Short('c').ExistingFile()
Expand Down Expand Up @@ -445,6 +446,7 @@ func getConfig() *types.Configuration {
}

v.GetStringMapString("Customfields")
v.GetStringMapString("Templatedfields")
v.GetStringMapString("Webhook.CustomHeaders")
v.GetStringMapString("CloudEvents.Extensions")
v.GetStringMapString("AlertManager.ExtraLabels")
Expand Down
79 changes: 75 additions & 4 deletions outputs/timescaledb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@ import (
"context"
"fmt"
"log"
"strings"

"github.com/DataDog/datadog-go/statsd"
"github.com/falcosecurity/falcosidekick/types"
"github.com/jackc/pgx/v5/pgxpool"
)

type timescaledbPayload struct {
SQL string `json:"sql"`
Values []any `json:"values"`
}

func NewTimescaleDBClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics,
statsdClient, dogstatsdClient *statsd.Client) (*Client, error) {

Expand Down Expand Up @@ -39,14 +45,75 @@ func NewTimescaleDBClient(config *types.Configuration, stats *types.Statistics,
}, nil
}

func newTimescaleDBPayload(falcopayload types.FalcoPayload, config *types.Configuration) timescaledbPayload {
vals := make(map[string]any, 7+len(config.Customfields)+len(config.Templatedfields))
vals[Time] = falcopayload.Time
vals[Rule] = falcopayload.Rule
vals[Priority] = falcopayload.Priority.String()
vals[Source] = falcopayload.Source
vals["output"] = falcopayload.Output

if len(falcopayload.Tags) != 0 {
vals[Tags] = strings.Join(falcopayload.Tags, ",")
}

if falcopayload.Hostname != "" {
vals[Hostname] = falcopayload.Hostname
}

for i, j := range falcopayload.OutputFields {
switch v := j.(type) {
case string:
for k := range config.Customfields {
if i == k {
vals[i] = strings.ReplaceAll(v, "\"", "")
}
}
for k := range config.Templatedfields {
if i == k {
vals[i] = strings.ReplaceAll(v, "\"", "")
}
}
default:
continue
}
}

i := 0
retVals := make([]any, len(vals))
var cols strings.Builder
var args strings.Builder
for k, v := range vals {
cols.WriteString(k)
fmt.Fprintf(&args, "$%d", i+1)
if i < (len(vals) - 1) {
cols.WriteString(",")
args.WriteString(",")
}

str, isString := v.(string)
if isString && (strings.ToLower(str) == "null") {
retVals[i] = nil
} else {
retVals[i] = v
}
i++
}

sql := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
config.TimescaleDB.HypertableName,
cols.String(),
args.String())

return timescaledbPayload{SQL: sql, Values: retVals}
}

func (c *Client) TimescaleDBPost(falcopayload types.FalcoPayload) {
c.Stats.TimescaleDB.Add(Total, 1)

hypertable := c.Config.TimescaleDB.HypertableName
queryInsertData := fmt.Sprintf("INSERT INTO %s (time, rule, priority, source, output) VALUES ($1, $2, $3, $4, $5)", hypertable)

var ctx = context.Background()
_, err := c.TimescaleDBClient.Exec(ctx, queryInsertData, falcopayload.Time, falcopayload.Rule, falcopayload.Priority.String(), falcopayload.Source, falcopayload.Output)
tsdbPayload := newTimescaleDBPayload(falcopayload, c.Config)
_, err := c.TimescaleDBClient.Exec(ctx, tsdbPayload.SQL, tsdbPayload.Values...)
if err != nil {
go c.CountMetric(Outputs, 1, []string{"output:timescaledb", "status:error"})
c.Stats.TimescaleDB.Add(Error, 1)
Expand All @@ -58,4 +125,8 @@ func (c *Client) TimescaleDBPost(falcopayload types.FalcoPayload) {
go c.CountMetric(Outputs, 1, []string{"output:timescaledb", "status:ok"})
c.Stats.TimescaleDB.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "timescaledb", "status": OK}).Inc()

if c.Config.Debug {
log.Printf("[DEBUG] : TimescaleDB payload : %v\n", tsdbPayload)
}
}
62 changes: 62 additions & 0 deletions outputs/timescaledb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package outputs

import (
"encoding/json"
"regexp"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/falcosecurity/falcosidekick/types"
)

func TestNewTimescaleDBPayload(t *testing.T) {
expectedTableName := "test_hypertable"
expectedTime, _ := time.Parse(time.RFC3339, "2001-01-01T01:10:00Z")
expectedValues := map[string]any{
"time": expectedTime,
"rule": "Test rule",
"priority": "Debug",
"source": "syscalls",
"output": "This is a test from falcosidekick",
"tags": "test,example",
"hostname": "test-host",
"custom_field_1": "test-custom-value-1",
"template_field_1": "falcosidekick",
}

var f types.FalcoPayload
require.Nil(t, json.Unmarshal([]byte(falcoTestInput), &f))
f.OutputFields["custom_field_1"] = "test-custom-value-1"
f.OutputFields["template_field_1"] = "falcosidekick"

config := &types.Configuration{
Customfields: map[string]string{
"custom_field_1": "test-custom-value-1",
},
Templatedfields: map[string]string{
"template_field_1": `{{ or (index . "proc.name") "null" }}`,
},
TimescaleDB: types.TimescaleDBConfig{
HypertableName: "test_hypertable",
},
}
output := newTimescaleDBPayload(f, config)

re := regexp.MustCompile(`INSERT\s+INTO\s+(test_hypertable)\s+\((.*)\)\s+VALUES\s+\((.*)\)`)
submatches := re.FindStringSubmatch(output.SQL)
tablename := submatches[1]
cols := strings.Split(submatches[2], ",")

require.Equal(t, expectedTableName, tablename)
require.Equal(t, 9, len(cols))
for i, v := range cols {
if val, exist := expectedValues[v]; exist {
require.Equal(t, val, output.Values[i])
} else {
require.Fail(t, "Missing expected column: %s", v)
}
}
}

0 comments on commit 215930f

Please sign in to comment.