Skip to content

Commit

Permalink
add flattenfields + create indexte
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Labarussias <[email protected]>
  • Loading branch information
Issif committed Jun 24, 2024
1 parent d00a069 commit d083849
Show file tree
Hide file tree
Showing 21 changed files with 955 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:

- uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
with:
go-version: '1.21'
go-version: '1.22'
check-latest: true
cache: true

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
with:
go-version: '1.21'
go-version: '1.22'
cache: false
check-latest: true
- name: golangci-lint
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/push-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:

- uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
with:
go-version: '1.21'
go-version: '1.22'
check-latest: true
cache: true

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:

- uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
with:
go-version: '1.21'
go-version: '1.22'
check-latest: true

- name: Set up QEMU
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
with:
go-version: '1.21'
go-version: '1.22'
check-latest: true
cache: true
- name: Run Go tests
Expand Down
11 changes: 11 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ func getConfig() *types.Configuration {
v.SetDefault("Elasticsearch.CheckCert", true)
v.SetDefault("Elasticsearch.Username", "")
v.SetDefault("Elasticsearch.Password", "")
v.SetDefault("Elasticsearch.FlattenFields", false)
v.SetDefault("Elasticsearch.CreateIndexTemplate", false)
v.SetDefault("Elasticsearch.NumberOfShards", 3)
v.SetDefault("Elasticsearch.NumberOfReplicas", 3)

v.SetDefault("Quickwit.HostPort", "")
v.SetDefault("Quickwit.Index", "falco")
Expand Down Expand Up @@ -712,6 +716,13 @@ func getConfig() *types.Configuration {
c.Loki.ExtraLabelsList = strings.Split(strings.ReplaceAll(c.Loki.ExtraLabels, " ", ""), ",")
}

if c.Elasticsearch.NumberOfReplicas <= 0 {
c.Elasticsearch.NumberOfReplicas = 3
}
if c.Elasticsearch.NumberOfShards <= 0 {
c.Elasticsearch.NumberOfShards = 3
}

if c.Prometheus.ExtraLabels != "" {
c.Prometheus.ExtraLabelsList = strings.Split(strings.ReplaceAll(c.Prometheus.ExtraLabels, " ", ""), ",")
}
Expand Down
4 changes: 4 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ elasticsearch:
# checkcert: true # check if ssl certificate of the output is valid (default: true)
# username: "" # use this username to authenticate to Elasticsearch if the username is not empty (default: "")
# password: "" # use this password to authenticate to Elasticsearch if the password is not empty (default: "")
# flattenfields: false # replace . by _ to avoid mapping conflicts, force to true if createindextemplate==true (default: false)
# createindextemplate: false # create an index template (default: false)
# numberofshards: 3 # number of shards set by the index template (default: 3)
# numberofreplicas: 3 # number of replicas set by the index template (default: 3)
# customHeaders: # Custom headers to add in POST, useful for Authentication
# key: value

Expand Down
31 changes: 19 additions & 12 deletions docs/outputs/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,29 @@

## Configuration

| Setting | Env var | Default value | Description |
| ------------------------------- | ------------------------------- | ---------------- | ----------------------------------------------------------------------------------------------------------------------------------- |
| `elasticsearch.hosport` | `ELASTICSEARCH_HOSTPORT` | | http://{domain or ip}:{port}, if not empty, Elasticsearch output is **enabled** |
| `elasticsearch.index` | `ELASTICSEARCH_INDEX` | `falco` | Index |
| `elasticsearch.type` | `ELASTICSEARCH_TYPE` | `_doc` | Index |
| `elasticsearch.suffix` | `ELASTICSEARCH_SUFFIX` | `daily` | Date suffix for index rotation : `daily`, `monthly`, `annually`, `none` |
| `elasticsearch.username` | `ELASTICSEARCH_USERNAME` | | Use this username to authenticate to Elasticsearch |
| `elasticsearch.password` | `ELASTICSEARCH_PASSWORD` | | Use this password to authenticate to Elasticsearch |
| `elasticsearch.customheaders` | `ELASTICSEARCH_CUSTOMHEADERS` | | Custom headers to add in POST, useful for Authentication |
| `elasticsearch.mutualtls` | `ELASTICSEARCH_MUTUALTLS` | `false` | Authenticate to the output with TLS, if true, checkcert flag will be ignored (server cert will always be checked) |
| `elasticsearch.checkcert` | `ELASTICSEARCH_CHECKCERT` | `true` | Check if ssl certificate of the output is valid |
| `elasticsearch.minimumpriority` | `ELASTICSEARCH_MINIMUMPRIORITY` | `""` (= `debug`) | Minimum priority of event for using this output, order is `emergency,alert,critical,error,warning,notice,informational,debug or ""` |
| Setting | Env var | Default value | Description |
| ----------------------------------- | ----------------------------------- | ---------------- | ----------------------------------------------------------------------------------------------------------------------------------- |
| `elasticsearch.hosport` | `ELASTICSEARCH_HOSTPORT` | | http://{domain or ip}:{port}, if not empty, Elasticsearch output is **enabled** |
| `elasticsearch.index` | `ELASTICSEARCH_INDEX` | `falco` | Index |
| `elasticsearch.type` | `ELASTICSEARCH_TYPE` | `_doc` | Index |
| `elasticsearch.suffix` | `ELASTICSEARCH_SUFFIX` | `daily` | Date suffix for index rotation : `daily`, `monthly`, `annually`, `none` |
| `elasticsearch.username` | `ELASTICSEARCH_USERNAME` | | Use this username to authenticate to Elasticsearch |
| `elasticsearch.password` | `ELASTICSEARCH_PASSWORD` | | Use this password to authenticate to Elasticsearch |
| `elasticsearch.flattenfields` | `ELASTICSEARCH_FLATTENFIELDS` | `false` | Replace . by _ to avoid mapping conflicts, force to true if `createindextemplate=true` |
| `elasticsearch.createindextemplate` | `ELASTICSEARCH_CREATEINDEXTEMPLATE` | `false` | Create an index template |
| `elasticsearch.numberofshards` | `ELASTICSEARCH_NUMBEROFSHARDS` | `3` | Number of shards set by the index template |
| `elasticsearch.numberofreplicas` | `ELASTICSEARCH_REPLICAS` | `3` | Number of replicas set by the index template |
| `elasticsearch.customheaders` | `ELASTICSEARCH_CUSTOMHEADERS` | | Custom headers to add in POST, useful for Authentication |
| `elasticsearch.mutualtls` | `ELASTICSEARCH_MUTUALTLS` | `false` | Authenticate to the output with TLS, if true, checkcert flag will be ignored (server cert will always be checked) |
| `elasticsearch.checkcert` | `ELASTICSEARCH_CHECKCERT` | `true` | Check if ssl certificate of the output is valid |
| `elasticsearch.minimumpriority` | `ELASTICSEARCH_MINIMUMPRIORITY` | `""` (= `debug`) | Minimum priority of event for using this output, order is `emergency,alert,critical,error,warning,notice,informational,debug or ""` |

> [!NOTE]
The Env var values override the settings from yaml file.

> [!WARNING]
By enabling the creation of the index template with `elasticsearch.createindextemplate=true`, the output fields of the Falco events will be flatten to avoid any mapping conflict.

## Example of config.yaml

```yaml
Expand Down
10 changes: 10 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"log"
"net/http"
"net/url"
"os"
"regexp"
"strings"
Expand Down Expand Up @@ -232,6 +233,15 @@ func init() {
if err != nil {
config.Elasticsearch.HostPort = ""
} else {
if config.Elasticsearch.CreateIndexTemplate {
elasticsearchClient.EndpointURL, _ = url.Parse(fmt.Sprintf("%s/_index_template/falco", config.Elasticsearch.HostPort))
err = elasticsearchClient.ElasticsearchCreateIndexTemplate(config.Elasticsearch)
}
}
if err != nil {
config.Elasticsearch.HostPort = ""
} else {

outputs.EnabledOutputs = append(outputs.EnabledOutputs, "Elasticsearch")
}
}
Expand Down
29 changes: 22 additions & 7 deletions outputs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ func NewClient(outputType string, defaultEndpointURL string, mutualTLSEnabled bo
return &Client{OutputType: outputType, EndpointURL: endpointURL, MutualTLSEnabled: mutualTLSEnabled, CheckCert: checkCert, HeaderList: []Header{}, ContentType: DefaultContentType, Config: params.Config, Stats: params.Stats, PromStats: params.PromStats, StatsdClient: params.StatsdClient, DogstatsdClient: params.DogstatsdClient}, nil
}

// Get get a payload from Output with GET http method.
func (c *Client) Get() error {
return c.sendRequest("GET", nil)
}

// Post sends event (payload) to Output with POST http method.
func (c *Client) Post(payload interface{}) error {
return c.sendRequest("POST", payload)
Expand All @@ -167,7 +172,7 @@ func getInlinedBodyAsString(resp *http.Response) string {
var compactedBody bytes.Buffer
err := json.Compact(&compactedBody, body)
if err == nil {
return string(compactedBody.Bytes())
return compactedBody.String()
}
}

Expand All @@ -177,11 +182,12 @@ func getInlinedBodyAsString(resp *http.Response) string {
// Post sends event (payload) to Output.
func (c *Client) sendRequest(method string, payload interface{}) error {
// defer + recover to catch panic if output doesn't respond
defer func() {
defer func(c *Client) {
if err := recover(); err != nil {
go c.CountMetric("outputs", 1, []string{"output:" + strings.ToLower(c.OutputType), "status:connectionrefused"})
log.Printf("[ERROR] : %v - %s", c.OutputType, err)
}
}()
}(c)

body := new(bytes.Buffer)
switch payload.(type) {
Expand Down Expand Up @@ -276,8 +282,13 @@ func (c *Client) sendRequest(method string, payload interface{}) error {
client := &http.Client{
Transport: customTransport,
}

req, err := http.NewRequest(method, c.EndpointURL.String(), body)
req := new(http.Request)
var err error
if method == "GET" {
req, err = http.NewRequest(method, c.EndpointURL.String(), nil)
} else {
req, err = http.NewRequest(method, c.EndpointURL.String(), body)
}
if err != nil {
log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error())
}
Expand All @@ -304,13 +315,17 @@ func (c *Client) sendRequest(method string, payload interface{}) error {

switch resp.StatusCode {
case http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent: //200, 201, 202, 204
log.Printf("[INFO] : %v - Post OK (%v)\n", c.OutputType, resp.StatusCode)
log.Printf("[INFO] : %v - %v OK (%v)\n", c.OutputType, method, resp.StatusCode)
if ot := c.OutputType; ot == Kubeless || ot == Openfaas || ot == Fission {
log.Printf("[INFO] : %v - Function Response : %s\n", ot, getInlinedBodyAsString(resp))
}
return nil
case http.StatusBadRequest: //400
log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrHeaderMissing, resp.StatusCode, getInlinedBodyAsString(resp))
msg := getInlinedBodyAsString(resp)
log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrHeaderMissing, resp.StatusCode, msg)
if msg != "" {
return fmt.Errorf(msg)
}
return ErrHeaderMissing
case http.StatusUnauthorized: //401
log.Printf("[ERROR] : %v - %v (%v): %s\n", c.OutputType, ErrClientAuthenticationError, resp.StatusCode, getInlinedBodyAsString(resp))
Expand Down
106 changes: 101 additions & 5 deletions outputs/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,42 @@
package outputs

import (
"encoding/json"
"fmt"
"log"
"net/url"
"regexp"
"strings"
"time"

"github.com/falcosecurity/falcosidekick/types"
)

type eSPayload struct {
types.FalcoPayload
Timestamp time.Time `json:"@timestamp"`
}

type mappingError struct {
Error struct {
RootCause []struct {
Type string `json:"type"`
Reason string `json:"reason"`
} `json:"root_cause"`
Type string `json:"type"`
Reason string `json:"reason"`
} `json:"error"`
Status int `json:"status"`
}

// ElasticsearchPost posts event to Elasticsearch
func (c *Client) ElasticsearchPost(falcopayload types.FalcoPayload) {
c.Stats.Elasticsearch.Add(Total, 1)

current := time.Now()
var eURL string
switch c.Config.Elasticsearch.Suffix {
case "none":
case None:
eURL = c.Config.Elasticsearch.HostPort + "/" + c.Config.Elasticsearch.Index + "/" + c.Config.Elasticsearch.Type
case "monthly":
eURL = c.Config.Elasticsearch.HostPort + "/" + c.Config.Elasticsearch.Index + "-" + current.Format("2006.01") + "/" + c.Config.Elasticsearch.Type
Expand Down Expand Up @@ -45,11 +66,47 @@ func (c *Client) ElasticsearchPost(falcopayload types.FalcoPayload) {
c.AddHeader(i, j)
}

err = c.Post(falcopayload)
payload := eSPayload{FalcoPayload: falcopayload, Timestamp: falcopayload.Time}
if c.Config.Elasticsearch.FlattenFields || c.Config.Elasticsearch.CreateIndexTemplate {
for i, j := range payload.OutputFields {
payload.OutputFields[strings.ReplaceAll(i, ".", "_")] = j
delete(payload.OutputFields, i)
}
}

err = c.Post(payload)
if err != nil {
c.setElasticSearchErrorMetrics()
log.Printf("[ERROR] : ElasticSearch - %v\n", err)
return
var mappingErr mappingError
if err2 := json.Unmarshal([]byte(err.Error()), &mappingErr); err2 != nil {
c.setElasticSearchErrorMetrics()
return
}
if mappingErr.Error.Type == "document_parsing_exception" {
reg := regexp.MustCompile(`\[\w+(\.\w+)+\]`)
k := reg.FindStringSubmatch(mappingErr.Error.Reason)
if len(k) == 0 {
c.setElasticSearchErrorMetrics()
return
}
if !strings.Contains(k[0], "output_fields") {
c.setElasticSearchErrorMetrics()
return
}
s := strings.ReplaceAll(k[0], "[output_fields.", "")
s = strings.ReplaceAll(s, "]", "")
for i := range payload.OutputFields {
if strings.HasPrefix(i, s) {
delete(payload.OutputFields, i)
}
}
fmt.Println(payload.OutputFields)
log.Printf("[INFO] : %v - %v\n", c.OutputType, "attempt to POST again the payload without the wrong field")
err = c.Post(payload)
if err != nil {
c.setElasticSearchErrorMetrics()
return
}
}
}

// Setting the success status
Expand All @@ -58,6 +115,45 @@ func (c *Client) ElasticsearchPost(falcopayload types.FalcoPayload) {
c.PromStats.Outputs.With(map[string]string{"destination": "elasticsearch", "status": OK}).Inc()
}

func (c *Client) ElasticsearchCreateIndexTemplate(config types.ElasticsearchOutputConfig) error {
d := c
var err error
u, err := url.Parse(fmt.Sprintf("%s/_index_template/falco", config.HostPort))
if err != nil {
log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error())
return err
}
d.EndpointURL = u
if err := d.Get(); err != nil {
if err.Error() == "resource not found" {
pattern := "-*"
if config.Suffix == None {
pattern = ""
}
m := strings.ReplaceAll(ESmapping, "${INDEX}", config.Index)
m = strings.ReplaceAll(m, "${PATTERN}", pattern)
m = strings.ReplaceAll(m, "${SHARDS}", fmt.Sprintf("%v", config.NumberOfShards))
m = strings.ReplaceAll(m, "${REPLICAS}", fmt.Sprintf("%v", config.NumberOfReplicas))
j := make(map[string]interface{})
if err := json.Unmarshal([]byte(m), &j); err != nil {
log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error())
return err
}
if d.Put(j) != nil {
log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error())
return err
}
log.Printf("[INFO] : %v - %v\n", c.OutputType, "Index template created")
} else {
log.Printf("[ERROR] : %v - %v\n", c.OutputType, err.Error())
return err
}
} else {
log.Printf("[INFO] : %v - %v\n", c.OutputType, "Index template already exists")
}
return nil
}

// setElasticSearchErrorMetrics set the error stats
func (c *Client) setElasticSearchErrorMetrics() {
go c.CountMetric(Outputs, 1, []string{"output:elasticsearch", "status:error"})
Expand Down
Loading

0 comments on commit d083849

Please sign in to comment.