Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add flattenfields + create indextemplate #868

Merged
merged 2 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:

- uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
with:
go-version: '1.21'
go-version: '1.22'
check-latest: true
cache: true

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
with:
go-version: '1.21'
go-version: '1.22'
cache: false
check-latest: true
- name: golangci-lint
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/push-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:

- uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
with:
go-version: '1.21'
go-version: '1.22'
check-latest: true
cache: true

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:

- uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
with:
go-version: '1.21'
go-version: '1.22'
check-latest: true

- name: Set up QEMU
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
with:
go-version: '1.21'
go-version: '1.22'
check-latest: true
cache: true
- name: Run Go tests
Expand Down
11 changes: 11 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ func getConfig() *types.Configuration {
v.SetDefault("Elasticsearch.CheckCert", true)
v.SetDefault("Elasticsearch.Username", "")
v.SetDefault("Elasticsearch.Password", "")
v.SetDefault("Elasticsearch.FlattenFields", false)
v.SetDefault("Elasticsearch.CreateIndexTemplate", false)
v.SetDefault("Elasticsearch.NumberOfShards", 3)
v.SetDefault("Elasticsearch.NumberOfReplicas", 3)

v.SetDefault("Quickwit.HostPort", "")
v.SetDefault("Quickwit.Index", "falco")
Expand Down Expand Up @@ -712,6 +716,13 @@ func getConfig() *types.Configuration {
c.Loki.ExtraLabelsList = strings.Split(strings.ReplaceAll(c.Loki.ExtraLabels, " ", ""), ",")
}

if c.Elasticsearch.NumberOfReplicas <= 0 {
c.Elasticsearch.NumberOfReplicas = 3
}
if c.Elasticsearch.NumberOfShards <= 0 {
c.Elasticsearch.NumberOfShards = 3
}

if c.Prometheus.ExtraLabels != "" {
c.Prometheus.ExtraLabelsList = strings.Split(strings.ReplaceAll(c.Prometheus.ExtraLabels, " ", ""), ",")
}
Expand Down
4 changes: 4 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ elasticsearch:
# checkcert: true # check if ssl certificate of the output is valid (default: true)
# username: "" # use this username to authenticate to Elasticsearch if the username is not empty (default: "")
# password: "" # use this password to authenticate to Elasticsearch if the password is not empty (default: "")
# flattenfields: false # replace . by _ to avoid mapping conflicts, force to true if createindextemplate==true (default: false)
# createindextemplate: false # create an index template (default: false)
# numberofshards: 3 # number of shards set by the index template (default: 3)
# numberofreplicas: 3 # number of replicas set by the index template (default: 3)
# customHeaders: # Custom headers to add in POST, useful for Authentication
# key: value

Expand Down
31 changes: 19 additions & 12 deletions docs/outputs/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,29 @@

## Configuration

| Setting | Env var | Default value | Description |
| ------------------------------- | ------------------------------- | ---------------- | ----------------------------------------------------------------------------------------------------------------------------------- |
| `elasticsearch.hosport` | `ELASTICSEARCH_HOSTPORT` | | http://{domain or ip}:{port}, if not empty, Elasticsearch output is **enabled** |
| `elasticsearch.index` | `ELASTICSEARCH_INDEX` | `falco` | Index |
| `elasticsearch.type` | `ELASTICSEARCH_TYPE` | `_doc` | Index |
| `elasticsearch.suffix` | `ELASTICSEARCH_SUFFIX` | `daily` | Date suffix for index rotation : `daily`, `monthly`, `annually`, `none` |
| `elasticsearch.username` | `ELASTICSEARCH_USERNAME` | | Use this username to authenticate to Elasticsearch |
| `elasticsearch.password` | `ELASTICSEARCH_PASSWORD` | | Use this password to authenticate to Elasticsearch |
| `elasticsearch.customheaders` | `ELASTICSEARCH_CUSTOMHEADERS` | | Custom headers to add in POST, useful for Authentication |
| `elasticsearch.mutualtls` | `ELASTICSEARCH_MUTUALTLS` | `false` | Authenticate to the output with TLS, if true, checkcert flag will be ignored (server cert will always be checked) |
| `elasticsearch.checkcert` | `ELASTICSEARCH_CHECKCERT` | `true` | Check if ssl certificate of the output is valid |
| `elasticsearch.minimumpriority` | `ELASTICSEARCH_MINIMUMPRIORITY` | `""` (= `debug`) | Minimum priority of event for using this output, order is `emergency,alert,critical,error,warning,notice,informational,debug or ""` |
| Setting | Env var | Default value | Description |
| ----------------------------------- | ----------------------------------- | ---------------- | ----------------------------------------------------------------------------------------------------------------------------------- |
| `elasticsearch.hosport` | `ELASTICSEARCH_HOSTPORT` | | http://{domain or ip}:{port}, if not empty, Elasticsearch output is **enabled** |
| `elasticsearch.index` | `ELASTICSEARCH_INDEX` | `falco` | Index |
| `elasticsearch.type` | `ELASTICSEARCH_TYPE` | `_doc` | Index |
| `elasticsearch.suffix` | `ELASTICSEARCH_SUFFIX` | `daily` | Date suffix for index rotation : `daily`, `monthly`, `annually`, `none` |
| `elasticsearch.username` | `ELASTICSEARCH_USERNAME` | | Use this username to authenticate to Elasticsearch |
| `elasticsearch.password` | `ELASTICSEARCH_PASSWORD` | | Use this password to authenticate to Elasticsearch |
| `elasticsearch.flattenfields` | `ELASTICSEARCH_FLATTENFIELDS` | `false` | Replace . by _ to avoid mapping conflicts, force to true if `createindextemplate=true` |
| `elasticsearch.createindextemplate` | `ELASTICSEARCH_CREATEINDEXTEMPLATE` | `false` | Create an index template |
| `elasticsearch.numberofshards` | `ELASTICSEARCH_NUMBEROFSHARDS` | `3` | Number of shards set by the index template |
| `elasticsearch.numberofreplicas` | `ELASTICSEARCH_REPLICAS` | `3` | Number of replicas set by the index template |
| `elasticsearch.customheaders` | `ELASTICSEARCH_CUSTOMHEADERS` | | Custom headers to add in POST, useful for Authentication |
| `elasticsearch.mutualtls` | `ELASTICSEARCH_MUTUALTLS` | `false` | Authenticate to the output with TLS, if true, checkcert flag will be ignored (server cert will always be checked) |
| `elasticsearch.checkcert` | `ELASTICSEARCH_CHECKCERT` | `true` | Check if ssl certificate of the output is valid |
| `elasticsearch.minimumpriority` | `ELASTICSEARCH_MINIMUMPRIORITY` | `""` (= `debug`) | Minimum priority of event for using this output, order is `emergency,alert,critical,error,warning,notice,informational,debug or ""` |

> [!NOTE]
The Env var values override the settings from yaml file.

> [!WARNING]
By enabling the creation of the index template with `elasticsearch.createindextemplate=true`, the output fields of the Falco events will be flatten to avoid any mapping conflict.

## Example of config.yaml

```yaml
Expand Down
10 changes: 10 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"log"
"net/http"
"net/url"
"os"
"regexp"
"strings"
Expand Down Expand Up @@ -232,6 +233,15 @@ func init() {
if err != nil {
config.Elasticsearch.HostPort = ""
} else {
if config.Elasticsearch.CreateIndexTemplate {
elasticsearchClient.EndpointURL, _ = url.Parse(fmt.Sprintf("%s/_index_template/falco", config.Elasticsearch.HostPort))
err = elasticsearchClient.ElasticsearchCreateIndexTemplate(config.Elasticsearch)
}
}
if err != nil {
config.Elasticsearch.HostPort = ""
} else {

outputs.EnabledOutputs = append(outputs.EnabledOutputs, "Elasticsearch")
}
}
Expand Down
29 changes: 22 additions & 7 deletions outputs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ func NewClient(outputType string, defaultEndpointURL string, mutualTLSEnabled bo
return &Client{OutputType: outputType, EndpointURL: endpointURL, MutualTLSEnabled: mutualTLSEnabled, CheckCert: checkCert, HeaderList: []Header{}, ContentType: DefaultContentType, Config: params.Config, Stats: params.Stats, PromStats: params.PromStats, StatsdClient: params.StatsdClient, DogstatsdClient: params.DogstatsdClient}, nil
}

// Get get a payload from Output with GET http method.
func (c *Client) Get() error {
return c.sendRequest("GET", nil)
}

// Post sends event (payload) to Output with POST http method.
func (c *Client) Post(payload interface{}) error {
return c.sendRequest("POST", payload)
Expand All @@ -167,7 +172,7 @@ func getInlinedBodyAsString(resp *http.Response) string {
var compactedBody bytes.Buffer
err := json.Compact(&compactedBody, body)
if err == nil {
return string(compactedBody.Bytes())
return compactedBody.String()
}
}

Expand All @@ -177,11 +182,12 @@ func getInlinedBodyAsString(resp *http.Response) string {
// Post sends event (payload) to Output.
func (c *Client) sendRequest(method string, payload interface{}) error {
// defer + recover to catch panic if output doesn't respond
defer func() {
defer func(c *Client) {
if err := recover(); err != nil {
go c.CountMetric("outputs", 1, []string{"output:" + strings.ToLower(c.OutputType), "status:connectionrefused"})
log.Printf("[ERROR] : %v - %s", c.OutputType, err)
}
}()
}(c)

body := new(bytes.Buffer)
switch payload.(type) {
Expand Down Expand Up @@ -276,8 +282,13 @@ func (c *Client) sendRequest(method string, payload interface{}) error {
client := &http.Client{
Transport: customTransport,
}

req, err := http.NewRequest(method, c.EndpointURL.String(), body)
req := new(http.Request)
var err error
if method == "GET" {
req, err = http.NewRequest(method, c.EndpointURL.String(), nil)
} else {
req, err = http.NewRequest(method, c.EndpointURL.String(), body)
}
if err != nil {
log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error())
}
Expand All @@ -304,13 +315,17 @@ func (c *Client) sendRequest(method string, payload interface{}) error {

switch resp.StatusCode {
case http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent: //200, 201, 202, 204
log.Printf("[INFO] : %v - Post OK (%v)\n", c.OutputType, resp.StatusCode)
log.Printf("[INFO] : %v - %v OK (%v)\n", c.OutputType, method, resp.StatusCode)
if ot := c.OutputType; ot == Kubeless || ot == Openfaas || ot == Fission {
log.Printf("[INFO] : %v - Function Response : %s\n", ot, getInlinedBodyAsString(resp))
}
return nil
case http.StatusBadRequest: //400
log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrHeaderMissing, resp.StatusCode, getInlinedBodyAsString(resp))
msg := getInlinedBodyAsString(resp)
log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrHeaderMissing, resp.StatusCode, msg)
if msg != "" {
return fmt.Errorf(msg)
}
return ErrHeaderMissing
case http.StatusUnauthorized: //401
log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrClientAuthenticationError, resp.StatusCode, getInlinedBodyAsString(resp))
Expand Down
118 changes: 113 additions & 5 deletions outputs/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,42 @@
package outputs

import (
"encoding/json"
"fmt"
"log"
"net/url"
"regexp"
"strings"
"time"

"github.com/falcosecurity/falcosidekick/types"
)

type eSPayload struct {
types.FalcoPayload
Timestamp time.Time `json:"@timestamp"`
}

type mappingError struct {
Error struct {
RootCause []struct {
Type string `json:"type"`
Reason string `json:"reason"`
} `json:"root_cause"`
Type string `json:"type"`
Reason string `json:"reason"`
} `json:"error"`
Status int `json:"status"`
}

// ElasticsearchPost posts event to Elasticsearch
func (c *Client) ElasticsearchPost(falcopayload types.FalcoPayload) {
c.Stats.Elasticsearch.Add(Total, 1)

current := time.Now()
var eURL string
switch c.Config.Elasticsearch.Suffix {
case "none":
case None:
eURL = c.Config.Elasticsearch.HostPort + "/" + c.Config.Elasticsearch.Index + "/" + c.Config.Elasticsearch.Type
case "monthly":
eURL = c.Config.Elasticsearch.HostPort + "/" + c.Config.Elasticsearch.Index + "-" + current.Format("2006.01") + "/" + c.Config.Elasticsearch.Type
Expand Down Expand Up @@ -45,11 +66,47 @@ func (c *Client) ElasticsearchPost(falcopayload types.FalcoPayload) {
c.AddHeader(i, j)
}

err = c.Post(falcopayload)
payload := eSPayload{FalcoPayload: falcopayload, Timestamp: falcopayload.Time}
if c.Config.Elasticsearch.FlattenFields || c.Config.Elasticsearch.CreateIndexTemplate {
for i, j := range payload.OutputFields {
payload.OutputFields[strings.ReplaceAll(i, ".", "_")] = j
delete(payload.OutputFields, i)
}
}

err = c.Post(payload)
if err != nil {
c.setElasticSearchErrorMetrics()
log.Printf("[ERROR] : ElasticSearch - %v\n", err)
return
var mappingErr mappingError
if err2 := json.Unmarshal([]byte(err.Error()), &mappingErr); err2 != nil {
c.setElasticSearchErrorMetrics()
return
}
if mappingErr.Error.Type == "document_parsing_exception" {
reg := regexp.MustCompile(`\[\w+(\.\w+)+\]`)
k := reg.FindStringSubmatch(mappingErr.Error.Reason)
if len(k) == 0 {
c.setElasticSearchErrorMetrics()
return
}
if !strings.Contains(k[0], "output_fields") {
c.setElasticSearchErrorMetrics()
return
}
s := strings.ReplaceAll(k[0], "[output_fields.", "")
s = strings.ReplaceAll(s, "]", "")
for i := range payload.OutputFields {
if strings.HasPrefix(i, s) {
delete(payload.OutputFields, i)
}
}
fmt.Println(payload.OutputFields)
log.Printf("[INFO] : %v - %v\n", c.OutputType, "attempt to POST again the payload without the wrong field")
err = c.Post(payload)
if err != nil {
c.setElasticSearchErrorMetrics()
return
}
}
}

// Setting the success status
Expand All @@ -58,6 +115,57 @@ func (c *Client) ElasticsearchPost(falcopayload types.FalcoPayload) {
c.PromStats.Outputs.With(map[string]string{"destination": "elasticsearch", "status": OK}).Inc()
}

func (c *Client) ElasticsearchCreateIndexTemplate(config types.ElasticsearchOutputConfig) error {
d := c
indexExists, err := c.isIndexTemplateExist(config)
if err != nil {
log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error())
return err
}
if indexExists {
log.Printf("[INFO] : %v - %v\n", c.OutputType, "Index template already exists")
return nil
}

pattern := "-*"
if config.Suffix == None {
pattern = ""
}
m := strings.ReplaceAll(ESmapping, "${INDEX}", config.Index)
m = strings.ReplaceAll(m, "${PATTERN}", pattern)
m = strings.ReplaceAll(m, "${SHARDS}", fmt.Sprintf("%v", config.NumberOfShards))
m = strings.ReplaceAll(m, "${REPLICAS}", fmt.Sprintf("%v", config.NumberOfReplicas))
j := make(map[string]interface{})
if err := json.Unmarshal([]byte(m), &j); err != nil {
log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error())
return err
}
// create the index template by PUT
if d.Put(j) != nil {
log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error())
return err
}

log.Printf("[INFO] : %v - %v\n", c.OutputType, "Index template created")
return nil
}

func (c *Client) isIndexTemplateExist(config types.ElasticsearchOutputConfig) (bool, error) {
clientCopy := c
var err error
u, err := url.Parse(fmt.Sprintf("%s/_index_template/falco", config.HostPort))
if err != nil {
return false, err
}
clientCopy.EndpointURL = u
if err := clientCopy.Get(); err != nil {
if err.Error() == "resource not found" {
return false, nil
}
}
return true, nil
}

// setElasticSearchErrorMetrics set the error stats
func (c *Client) setElasticSearchErrorMetrics() {
go c.CountMetric(Outputs, 1, []string{"output:elasticsearch", "status:error"})
Expand Down
Loading