Skip to content

Commit

Permalink
add MQTT output
Browse files Browse the repository at this point in the history
Signed-off-by: Issif <[email protected]>
  • Loading branch information
Issif committed Jun 28, 2022
1 parent 503b96a commit 20d0a15
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 20 deletions.
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ It works as a single endpoint for as many as you want `Falco` instances :
- [**RabbitMQ**](https://www.rabbitmq.com/)
- [**Azure Event Hubs**](https://azure.microsoft.com/en-in/services/event-hubs/)
- [**Yandex Data Streams**](https://cloud.yandex.com/en/docs/data-streams/)
- [**MQTT**](https://mqtt.org/)

### Email

Expand Down Expand Up @@ -469,6 +470,16 @@ syslog:
# port: "" # Syslog endpoint port number
# protocol: "" # Syslog transport protocol. It can be either "tcp" or "udp"
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)

mqtt:
broker: "" # Broker address, can start with tcp:// or ssl://, if not empty, MQTT output is enabled
# topic: "falco/events" # Topic for messages (default: falco/events)
# qos: 0 # QOS for messages (default: 0)
# retained: false # If true, messages are retained (default: false)
# user: "" # User if the authentication is enabled in the broker
# password: "" # Password if the authentication is enabled in the broker
# checkcert: true # check if ssl certificate of the output is valid (default: true)
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
```

Usage :
Expand Down Expand Up @@ -864,12 +875,19 @@ care of lower/uppercases**) : `yaml: a.b --> envvar: A_B` :
- **SYSLOG_PORT**: Syslog endpoint port number
- **SYSLOG_PROTOCOL**: Syslog transport protocol. It can be either "tcp" or "udp"
- **SYSLOG_MINIMUMPRIORITY**: minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default: "debug")

- **POLICYREPORT_ENABLED**: if true policyreport output is enabled (default: `false`)
- **POLICYREPORT_KUBECONFIG**: Kubeconfig file to use (only if falcosidekick is running outside the cluster)
- **POLICYREPORT_MINIMUMPRIORITY**: events with priority above this are mapped to fail in PolicyReport summary and lower that those are mapped to warn
- **POLICYREPORT_MAXEVENTS**: the max number of events that can be per report (default: 1000)
- **POLICYREPORT_PRUNEBYPRIORITY**: if true; the events with lowest severity are pruned first, in FIFO order (default: `false`)
- **MQTT_BROKER**: broker address, can start with `tcp://` or `ssl://`, if not empty, MQTT output is enabled
- **MQTT_TOPIC**: topic for messages (default: `falco/events`)
- **MQTT_QOS**: QOS for messages (default: `0`)
- **MQTT_RETAINED**: if `true`, messages are retained (default: `false`)
- **MQTT_USER**: user if the authentication is enabled in the broker
- **MQTT_PASSWORD**: password if the authentication is enabled in the broker
- **MQTT_CHECKCERT**: check if ssl certificate of the output is valid (default: `true`)
- **MQTT_PRUNEBYPRIORITY**: if true; the events with lowest severity are pruned first, in FIFO order (default: `false`)

#### Slack/Rocketchat/Mattermost/Googlechat Message Formatting

Expand Down
12 changes: 12 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@ func getConfig() *types.Configuration {
v.SetDefault("Syslog.Protocol", "")
v.SetDefault("Syslog.MinimumPriority", "")

v.SetDefault("MQTT.Broker", "")
v.SetDefault("MQTT.Topic", "falco/events")
v.SetDefault("MQTT.QOS", 0)
v.SetDefault("MQTT.Retained", false)
v.SetDefault("MQTT.User", "")
v.SetDefault("MQTT.Password", "")
v.SetDefault("MQTT.CheckCert", true)
v.SetDefault("MQTT.MinimumPriority", "")

v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
v.AutomaticEnv()
if *configFile != "" {
Expand Down Expand Up @@ -428,7 +437,10 @@ func getConfig() *types.Configuration {
c.Rabbitmq.MinimumPriority = checkPriority(c.Rabbitmq.MinimumPriority)
c.Wavefront.MinimumPriority = checkPriority(c.Wavefront.MinimumPriority)
c.Yandex.S3.MinimumPriority = checkPriority(c.Yandex.S3.MinimumPriority)
c.Yandex.DataStreams.MinimumPriority = checkPriority(c.Yandex.DataStreams.MinimumPriority)
c.Syslog.MinimumPriority = checkPriority(c.Syslog.MinimumPriority)
c.MQTT.MinimumPriority = checkPriority(c.MQTT.MinimumPriority)
c.PolicyReport.MinimumPriority = checkPriority(c.PolicyReport.MinimumPriority)

c.Slack.MessageFormatTemplate = getMessageFormatTemplate("Slack", c.Slack.MessageFormat)
c.Rocketchat.MessageFormatTemplate = getMessageFormatTemplate("Rocketchat", c.Rocketchat.MessageFormat)
Expand Down
10 changes: 10 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,13 @@ yandex:
# endpoint: "" # Yandex Data Streams endpoint (default: https://yds.serverless.yandexcloud.net)
# streamname: "" # stream name in format /${region}/${folder_id}/${ydb_id}/${stream_name}
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug

mqtt:
broker: "" # Broker address, can start with tcp:// or ssl://, if not empty, MQTT output is enabled
# topic: "falco/events" # Topic for messages (default: falco/events)
# qos: 0 # QOS for messages (default: 0)
# retained: false # If true, messages are retained (default: false)
# user: "" # User if the authentication is enabled in the broker
# password: "" # Password if the authentication is enabled in the broker
# checkcert: true # check if ssl certificate of the output is valid (default: true)
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ require (
k8s.io/client-go v11.0.0+incompatible
)

require github.com/eclipse/paho.mqtt.golang v1.4.1

require (
github.com/Azure/azure-amqp-common-go/v3 v3.0.1 // indirect
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible // indirect
Expand Down Expand Up @@ -65,6 +67,7 @@ require (
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/googleapis/gnostic v0.5.1 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/imdario/mergo v0.3.10 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down Expand Up @@ -103,7 +106,7 @@ require (
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.4.1 // indirect
golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073 // indirect
golang.org/x/text v0.3.4 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5m
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eclipse/paho.mqtt.golang v1.4.1 h1:tUSpviiL5G3P9SZZJPC4ZULZJsxQKXxfENpMvdbAXAI=
github.com/eclipse/paho.mqtt.golang v1.4.1/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21 h1:OJyUGMJTzHTd1XQp98QTaHernxMYzRaOasRir9hUlFQ=
Expand Down Expand Up @@ -365,6 +367,7 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
Expand Down Expand Up @@ -841,6 +844,7 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
Expand Down Expand Up @@ -877,6 +881,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
4 changes: 4 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,4 +309,8 @@ func forwardEvent(falcopayload types.FalcoPayload) {
if config.Syslog.Host != "" && (falcopayload.Priority >= types.Priority(config.Syslog.MinimumPriority) || falcopayload.Rule == testRule) {
go syslogClient.SyslogPost(falcopayload)
}

if config.MQTT.Broker != "" && (falcopayload.Priority >= types.Priority(config.MQTT.MinimumPriority) || falcopayload.Rule == testRule) {
go mqttClient.MQTTPublish(falcopayload)
}
}
12 changes: 12 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var (
grafanaClient *outputs.Client
yandexClient *outputs.Client
syslogClient *outputs.Client
mqttClient *outputs.Client

statsdClient, dogstatsdClient *statsd.Client
config *types.Configuration
Expand Down Expand Up @@ -539,6 +540,17 @@ func init() {
}
}

if config.MQTT.Broker != "" {
var err error
mqttClient, err = outputs.NewMQTTClient(config, stats, promStats, statsdClient, dogstatsdClient)
if err != nil {
config.MQTT.Broker = ""
log.Printf("[ERROR] : MQTT - %v\n", err)
} else {
outputs.EnabledOutputs = append(outputs.EnabledOutputs, "MQTT")
}
}

log.Printf("[INFO] : Falco Sidekick version: %s\n", GetVersionInfo().GitVersion)
log.Printf("[INFO] : Enabled Outputs : %s\n", outputs.EnabledOutputs)

Expand Down
25 changes: 20 additions & 5 deletions outputs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,37 @@ import (
"github.com/segmentio/kafka-go"
"k8s.io/client-go/kubernetes"

mqtt "github.com/eclipse/paho.mqtt.golang"

"github.com/falcosecurity/falcosidekick/types"
)

// ErrHeaderMissing = 400
var ErrHeaderMissing = errors.New("header missing")

// ErrClientAuthenticationError = 401
var ErrClientAuthenticationError = errors.New("authentication Error")
var ErrClientAuthenticationError = errors.New("authentication error")

// ErrForbidden = 403
var ErrForbidden = errors.New("access Denied")
var ErrForbidden = errors.New("access denied")

// ErrNotFound = 404
var ErrNotFound = errors.New("resource not found")

// ErrUnprocessableEntityError = 422
var ErrUnprocessableEntityError = errors.New("bad Request")
var ErrUnprocessableEntityError = errors.New("bad request")

// ErrTooManyRequest = 429
var ErrTooManyRequest = errors.New("exceeding post rate limit")

// ErrInternalServer = 500
var ErrInternalServer = errors.New("internal server error")

// ErrBadGateway = 502
var ErrBadGateway = errors.New("bad gateway")

// ErrClientCreation is returned if client can't be created
var ErrClientCreation = errors.New("client creation Error")
var ErrClientCreation = errors.New("client creation error")

// EnabledOutputs list all enabled outputs
var EnabledOutputs []string
Expand Down Expand Up @@ -100,6 +108,7 @@ type Client struct {
RabbitmqClient *amqp.Channel
WavefrontSender *wavefront.Sender
Crdclient *crdClient.Clientset
MQTTClient mqtt.Client
}

// NewClient returns a new output.Client for accessing the different API.
Expand Down Expand Up @@ -235,8 +244,14 @@ func (c *Client) Post(payload interface{}) error {
case http.StatusTooManyRequests: //429
log.Printf("[ERROR] : %v - %v (%v)\n", c.OutputType, ErrTooManyRequest, resp.StatusCode)
return ErrTooManyRequest
case http.StatusInternalServerError: //500
log.Printf("[ERROR] : %v - %v (%v)\n", c.OutputType, ErrTooManyRequest, resp.StatusCode)
return ErrInternalServer
case http.StatusBadGateway: //502
log.Printf("[ERROR] : %v - %v (%v)\n", c.OutputType, ErrTooManyRequest, resp.StatusCode)
return ErrBadGateway
default:
log.Printf("[ERROR] : %v - Unexpected Response (%v)\n", c.OutputType, resp.StatusCode)
log.Printf("[ERROR] : %v - unexpected Response (%v)\n", c.OutputType, resp.StatusCode)
return errors.New(resp.Status)
}
}
Expand Down
3 changes: 1 addition & 2 deletions outputs/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"crypto/x509/pkix"
"encoding/base64"
"encoding/pem"
"errors"
"io/ioutil"
"math/big"
"net"
Expand Down Expand Up @@ -76,7 +75,7 @@ func TestPost(t *testing.T) {
"/404": ErrNotFound,
"/422": ErrUnprocessableEntityError,
"/429": ErrTooManyRequest,
"/502": errors.New("502 Bad Gateway"),
"/502": ErrBadGateway,
} {
nc, err := NewClient("", ts.URL+i, false, true, &types.Configuration{}, &types.Statistics{}, &types.PromStatistics{}, nil, nil)
require.Nil(t, err)
Expand Down
1 change: 1 addition & 0 deletions outputs/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
Openfaas string = "OpenFaas"
Fission string = "Fission"
Falco string = "Falco"
MQTT string = "MQTT"

UDP string = "udp"
TCP string = "tcp"
Expand Down
74 changes: 74 additions & 0 deletions outputs/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package outputs

import (
"crypto/tls"
"log"

"github.com/DataDog/datadog-go/statsd"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"

"github.com/falcosecurity/falcosidekick/types"
)

// NewMQTTClient returns a new output.Client for accessing Kubernetes.
func NewMQTTClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics,
statsdClient, dogstatsdClient *statsd.Client) (*Client, error) {

options := mqtt.NewClientOptions()
options.AddBroker(config.MQTT.Broker)
options.SetClientID("falcosidekick-" + uuid.NewString()[:6])
if config.MQTT.User != "" && config.MQTT.Password != "" {
options.Username = config.MQTT.User
options.Password = config.MQTT.Password
}
if !config.MQTT.CheckCert {
// #nosec G402 This is only set as a result of explicit configuration
options.TLSConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
options.OnConnectionLost = func(client mqtt.Client, err error) {
log.Printf("[ERROR] : MQTT - Connection lost: %v\n", err.Error())
}

client := mqtt.NewClient(options)

return &Client{
OutputType: MQTT,
Config: config,
MQTTClient: client,
Stats: stats,
PromStats: promStats,
StatsdClient: statsdClient,
DogstatsdClient: dogstatsdClient,
}, nil
}

// MQTTPublish .
func (c *Client) MQTTPublish(falcopayload types.FalcoPayload) {
c.Stats.MQTT.Add(Total, 1)

t := c.MQTTClient.Connect()
t.Wait()
if err := t.Error(); err != nil {
go c.CountMetric(Outputs, 1, []string{"output:mqtt", "status:error"})
c.Stats.MQTT.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "mqtt", "status": err.Error()}).Inc()
log.Printf("[ERROR] : %s - %v\n", MQTT, err.Error())
return
}
defer c.MQTTClient.Disconnect(100)
if err := c.MQTTClient.Publish(c.Config.MQTT.Topic, byte(c.Config.MQTT.QOS), c.Config.MQTT.Retained, falcopayload.String()).Error(); err != nil {
go c.CountMetric(Outputs, 1, []string{"output:mqtt", "status:error"})
c.Stats.MQTT.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "mqtt", "status": Error}).Inc()
log.Printf("[ERROR] : %s - %v\n", MQTT, err.Error())
return
}

log.Printf("[INFO] : %s - Message published\n", MQTT)
go c.CountMetric(Outputs, 1, []string{"output:mqtt", "status:ok"})
c.Stats.MQTT.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "mqtt", "status": OK}).Inc()
}
1 change: 1 addition & 0 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func getInitStats() *types.Statistics {
YandexS3: getOutputNewMap("yandexs3"),
YandexDataStreams: getOutputNewMap("yandexdatastreams"),
Syslog: getOutputNewMap("syslog"),
MQTT: getOutputNewMap("mqtt"),
PolicyReport: getOutputNewMap("policyreport"),
}
stats.Falco.Add(outputs.Emergency, 0)
Expand Down
Loading

0 comments on commit 20d0a15

Please sign in to comment.