Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add MQTT output #338

Merged
merged 1 commit into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ It works as a single endpoint for as many as you want `Falco` instances :
- [**RabbitMQ**](https://www.rabbitmq.com/)
- [**Azure Event Hubs**](https://azure.microsoft.com/en-in/services/event-hubs/)
- [**Yandex Data Streams**](https://cloud.yandex.com/en/docs/data-streams/)
- [**MQTT**](https://mqtt.org/)

### Email

Expand Down Expand Up @@ -479,6 +480,16 @@ syslog:
# port: "" # Syslog endpoint port number
# protocol: "" # Syslog transport protocol. It can be either "tcp" or "udp"
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)

mqtt:
broker: "" # Broker address, can start with tcp:// or ssl://, if not empty, MQTT output is enabled
# topic: "falco/events" # Topic for messages (default: falco/events)
# qos: 0 # QOS for messages (default: 0)
# retained: false # If true, messages are retained (default: false)
# user: "" # User if the authentication is enabled in the broker
# password: "" # Password if the authentication is enabled in the broker
# checkcert: true # check if ssl certificate of the output is valid (default: true)
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
```

Usage :
Expand Down Expand Up @@ -887,12 +898,19 @@ care of lower/uppercases**) : `yaml: a.b --> envvar: A_B` :
- **SYSLOG_PORT**: Syslog endpoint port number
- **SYSLOG_PROTOCOL**: Syslog transport protocol. It can be either "tcp" or "udp"
- **SYSLOG_MINIMUMPRIORITY**: minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default: "debug")

- **POLICYREPORT_ENABLED**: if true policyreport output is enabled (default: `false`)
- **POLICYREPORT_KUBECONFIG**: Kubeconfig file to use (only if falcosidekick is running outside the cluster)
- **POLICYREPORT_MINIMUMPRIORITY**: events with priority above this are mapped to fail in PolicyReport summary and lower that those are mapped to warn
- **POLICYREPORT_MAXEVENTS**: the max number of events that can be per report (default: 1000)
- **POLICYREPORT_PRUNEBYPRIORITY**: if true; the events with lowest severity are pruned first, in FIFO order (default: `false`)
- **MQTT_BROKER**: broker address, can start with `tcp://` or `ssl://`, if not empty, MQTT output is enabled
- **MQTT_TOPIC**: topic for messages (default: `falco/events`)
- **MQTT_QOS**: QOS for messages (default: `0`)
- **MQTT_RETAINED**: if `true`, messages are retained (default: `false`)
- **MQTT_USER**: user if the authentication is enabled in the broker
- **MQTT_PASSWORD**: password if the authentication is enabled in the broker
- **MQTT_CHECKCERT**: check if ssl certificate of the output is valid (default: `true`)
- **MQTT_PRUNEBYPRIORITY**: if true; the events with lowest severity are pruned first, in FIFO order (default: `false`)

#### Slack/Rocketchat/Mattermost/Googlechat Message Formatting

Expand Down
12 changes: 12 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,15 @@ func getConfig() *types.Configuration {
v.SetDefault("Syslog.Protocol", "")
v.SetDefault("Syslog.MinimumPriority", "")

v.SetDefault("MQTT.Broker", "")
v.SetDefault("MQTT.Topic", "falco/events")
v.SetDefault("MQTT.QOS", 0)
v.SetDefault("MQTT.Retained", false)
v.SetDefault("MQTT.User", "")
v.SetDefault("MQTT.Password", "")
v.SetDefault("MQTT.CheckCert", true)
v.SetDefault("MQTT.MinimumPriority", "")

v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
v.AutomaticEnv()
if *configFile != "" {
Expand Down Expand Up @@ -434,7 +443,10 @@ func getConfig() *types.Configuration {
c.Rabbitmq.MinimumPriority = checkPriority(c.Rabbitmq.MinimumPriority)
c.Wavefront.MinimumPriority = checkPriority(c.Wavefront.MinimumPriority)
c.Yandex.S3.MinimumPriority = checkPriority(c.Yandex.S3.MinimumPriority)
c.Yandex.DataStreams.MinimumPriority = checkPriority(c.Yandex.DataStreams.MinimumPriority)
c.Syslog.MinimumPriority = checkPriority(c.Syslog.MinimumPriority)
c.MQTT.MinimumPriority = checkPriority(c.MQTT.MinimumPriority)
c.PolicyReport.MinimumPriority = checkPriority(c.PolicyReport.MinimumPriority)

c.Slack.MessageFormatTemplate = getMessageFormatTemplate("Slack", c.Slack.MessageFormat)
c.Rocketchat.MessageFormatTemplate = getMessageFormatTemplate("Rocketchat", c.Rocketchat.MessageFormat)
Expand Down
10 changes: 10 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,13 @@ yandex:
# endpoint: "" # Yandex Data Streams endpoint (default: https://yds.serverless.yandexcloud.net)
# streamname: "" # stream name in format /${region}/${folder_id}/${ydb_id}/${stream_name}
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug

mqtt:
broker: "" # Broker address, can start with tcp:// or ssl://, if not empty, MQTT output is enabled
# topic: "falco/events" # Topic for messages (default: falco/events)
# qos: 0 # QOS for messages (default: 0)
# retained: false # If true, messages are retained (default: false)
# user: "" # User if the authentication is enabled in the broker
# password: "" # Password if the authentication is enabled in the broker
# checkcert: true # check if ssl certificate of the output is valid (default: true)
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ require (
k8s.io/client-go v11.0.0+incompatible
)

require github.com/eclipse/paho.mqtt.golang v1.4.1

require (
github.com/Azure/azure-amqp-common-go/v3 v3.0.1 // indirect
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible // indirect
Expand Down Expand Up @@ -65,6 +67,7 @@ require (
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/googleapis/gnostic v0.5.1 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/imdario/mergo v0.3.10 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down Expand Up @@ -103,7 +106,7 @@ require (
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.4.1 // indirect
golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073 // indirect
golang.org/x/text v0.3.4 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5m
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eclipse/paho.mqtt.golang v1.4.1 h1:tUSpviiL5G3P9SZZJPC4ZULZJsxQKXxfENpMvdbAXAI=
github.com/eclipse/paho.mqtt.golang v1.4.1/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21 h1:OJyUGMJTzHTd1XQp98QTaHernxMYzRaOasRir9hUlFQ=
Expand Down Expand Up @@ -365,6 +367,7 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
Expand Down Expand Up @@ -841,6 +844,7 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
Expand Down Expand Up @@ -877,6 +881,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
4 changes: 4 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,4 +313,8 @@ func forwardEvent(falcopayload types.FalcoPayload) {
if config.Syslog.Host != "" && (falcopayload.Priority >= types.Priority(config.Syslog.MinimumPriority) || falcopayload.Rule == testRule) {
go syslogClient.SyslogPost(falcopayload)
}

if config.MQTT.Broker != "" && (falcopayload.Priority >= types.Priority(config.MQTT.MinimumPriority) || falcopayload.Rule == testRule) {
go mqttClient.MQTTPublish(falcopayload)
}
}
12 changes: 12 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
grafanaClient *outputs.Client
yandexClient *outputs.Client
syslogClient *outputs.Client
mqttClient *outputs.Client

statsdClient, dogstatsdClient *statsd.Client
config *types.Configuration
Expand Down Expand Up @@ -550,6 +551,17 @@ func init() {
}
}

if config.MQTT.Broker != "" {
var err error
mqttClient, err = outputs.NewMQTTClient(config, stats, promStats, statsdClient, dogstatsdClient)
if err != nil {
config.MQTT.Broker = ""
log.Printf("[ERROR] : MQTT - %v\n", err)
} else {
outputs.EnabledOutputs = append(outputs.EnabledOutputs, "MQTT")
}
}

log.Printf("[INFO] : Falco Sidekick version: %s\n", GetVersionInfo().GitVersion)
log.Printf("[INFO] : Enabled Outputs : %s\n", outputs.EnabledOutputs)

Expand Down
25 changes: 20 additions & 5 deletions outputs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,37 @@ import (
"github.com/segmentio/kafka-go"
"k8s.io/client-go/kubernetes"

mqtt "github.com/eclipse/paho.mqtt.golang"

"github.com/falcosecurity/falcosidekick/types"
)

// ErrHeaderMissing = 400
var ErrHeaderMissing = errors.New("header missing")

// ErrClientAuthenticationError = 401
var ErrClientAuthenticationError = errors.New("authentication Error")
var ErrClientAuthenticationError = errors.New("authentication error")

// ErrForbidden = 403
var ErrForbidden = errors.New("access Denied")
var ErrForbidden = errors.New("access denied")

// ErrNotFound = 404
var ErrNotFound = errors.New("resource not found")

// ErrUnprocessableEntityError = 422
var ErrUnprocessableEntityError = errors.New("bad Request")
var ErrUnprocessableEntityError = errors.New("bad request")

// ErrTooManyRequest = 429
var ErrTooManyRequest = errors.New("exceeding post rate limit")

// ErrInternalServer = 500
var ErrInternalServer = errors.New("internal server error")

// ErrBadGateway = 502
var ErrBadGateway = errors.New("bad gateway")

// ErrClientCreation is returned if client can't be created
var ErrClientCreation = errors.New("client creation Error")
var ErrClientCreation = errors.New("client creation error")

// EnabledOutputs list all enabled outputs
var EnabledOutputs []string
Expand Down Expand Up @@ -100,6 +108,7 @@ type Client struct {
RabbitmqClient *amqp.Channel
WavefrontSender *wavefront.Sender
Crdclient *crdClient.Clientset
MQTTClient mqtt.Client
}

// NewClient returns a new output.Client for accessing the different API.
Expand Down Expand Up @@ -235,8 +244,14 @@ func (c *Client) Post(payload interface{}) error {
case http.StatusTooManyRequests: //429
log.Printf("[ERROR] : %v - %v (%v)\n", c.OutputType, ErrTooManyRequest, resp.StatusCode)
return ErrTooManyRequest
case http.StatusInternalServerError: //500
log.Printf("[ERROR] : %v - %v (%v)\n", c.OutputType, ErrTooManyRequest, resp.StatusCode)
return ErrInternalServer
case http.StatusBadGateway: //502
log.Printf("[ERROR] : %v - %v (%v)\n", c.OutputType, ErrTooManyRequest, resp.StatusCode)
return ErrBadGateway
default:
log.Printf("[ERROR] : %v - Unexpected Response (%v)\n", c.OutputType, resp.StatusCode)
log.Printf("[ERROR] : %v - unexpected Response (%v)\n", c.OutputType, resp.StatusCode)
return errors.New(resp.Status)
}
}
Expand Down
3 changes: 1 addition & 2 deletions outputs/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"crypto/x509/pkix"
"encoding/base64"
"encoding/pem"
"errors"
"io/ioutil"
"math/big"
"net"
Expand Down Expand Up @@ -76,7 +75,7 @@ func TestPost(t *testing.T) {
"/404": ErrNotFound,
"/422": ErrUnprocessableEntityError,
"/429": ErrTooManyRequest,
"/502": errors.New("502 Bad Gateway"),
"/502": ErrBadGateway,
} {
nc, err := NewClient("", ts.URL+i, false, true, &types.Configuration{}, &types.Statistics{}, &types.PromStatistics{}, nil, nil)
require.Nil(t, err)
Expand Down
1 change: 1 addition & 0 deletions outputs/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
Openfaas string = "OpenFaas"
Fission string = "Fission"
Falco string = "Falco"
MQTT string = "MQTT"

UDP string = "udp"
TCP string = "tcp"
Expand Down
74 changes: 74 additions & 0 deletions outputs/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package outputs

import (
"crypto/tls"
"log"

"github.com/DataDog/datadog-go/statsd"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"

"github.com/falcosecurity/falcosidekick/types"
)

// NewMQTTClient returns a new output.Client for accessing Kubernetes.
func NewMQTTClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics,
statsdClient, dogstatsdClient *statsd.Client) (*Client, error) {

options := mqtt.NewClientOptions()
options.AddBroker(config.MQTT.Broker)
options.SetClientID("falcosidekick-" + uuid.NewString()[:6])
if config.MQTT.User != "" && config.MQTT.Password != "" {
options.Username = config.MQTT.User
options.Password = config.MQTT.Password
}
if !config.MQTT.CheckCert {
// #nosec G402 This is only set as a result of explicit configuration
options.TLSConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
options.OnConnectionLost = func(client mqtt.Client, err error) {
log.Printf("[ERROR] : MQTT - Connection lost: %v\n", err.Error())
}

client := mqtt.NewClient(options)

return &Client{
OutputType: MQTT,
Config: config,
MQTTClient: client,
Stats: stats,
PromStats: promStats,
StatsdClient: statsdClient,
DogstatsdClient: dogstatsdClient,
}, nil
}

// MQTTPublish .
func (c *Client) MQTTPublish(falcopayload types.FalcoPayload) {
c.Stats.MQTT.Add(Total, 1)

t := c.MQTTClient.Connect()
t.Wait()
if err := t.Error(); err != nil {
go c.CountMetric(Outputs, 1, []string{"output:mqtt", "status:error"})
c.Stats.MQTT.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "mqtt", "status": err.Error()}).Inc()
log.Printf("[ERROR] : %s - %v\n", MQTT, err.Error())
return
}
defer c.MQTTClient.Disconnect(100)
if err := c.MQTTClient.Publish(c.Config.MQTT.Topic, byte(c.Config.MQTT.QOS), c.Config.MQTT.Retained, falcopayload.String()).Error(); err != nil {
go c.CountMetric(Outputs, 1, []string{"output:mqtt", "status:error"})
c.Stats.MQTT.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "mqtt", "status": Error}).Inc()
log.Printf("[ERROR] : %s - %v\n", MQTT, err.Error())
return
}

log.Printf("[INFO] : %s - Message published\n", MQTT)
go c.CountMetric(Outputs, 1, []string{"output:mqtt", "status:ok"})
c.Stats.MQTT.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "mqtt", "status": OK}).Inc()
}
1 change: 1 addition & 0 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func getInitStats() *types.Statistics {
YandexS3: getOutputNewMap("yandexs3"),
YandexDataStreams: getOutputNewMap("yandexdatastreams"),
Syslog: getOutputNewMap("syslog"),
MQTT: getOutputNewMap("mqtt"),
PolicyReport: getOutputNewMap("policyreport"),
NodeRed: getOutputNewMap("nodered"),
}
Expand Down
Loading