Skip to content

Commit

Permalink
added Yandex Data Streams support
Browse files Browse the repository at this point in the history
  • Loading branch information
preved911 committed Jun 20, 2022
1 parent b036954 commit 90f1a4b
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 3 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ It works as a single endpoint for as many as you want `Falco` instances :
- [**Kafka Rest Proxy**](https://docs.confluent.io/platform/current/kafka-rest/index.html)
- [**RabbitMQ**](https://www.rabbitmq.com/)
- [**Azure Event Hubs**](https://azure.microsoft.com/en-in/services/event-hubs/)
- [**Yandex Data Streams**](https://cloud.yandex.com/en/docs/data-streams/)

### Email

Expand Down
7 changes: 6 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,17 @@ func getConfig() *types.Configuration {

v.SetDefault("Yandex.AccessKeyID", "")
v.SetDefault("Yandex.SecretAccessKey", "")
v.SetDefault("Yandex.Endpoint", "")
v.SetDefault("Yandex.Region", "ru-central1")
v.SetDefault("Yandex.S3.Endpoint", "https://storage.yandexcloud.net")

v.SetDefault("Yandex.S3.Bucket", "")
v.SetDefault("Yandex.S3.Prefix", "falco")
v.SetDefault("Yamdex.S3.MinimumPriority", "")

v.SetDefault("Yandex.DataStreams.Endpoint", "")
v.SetDefault("Yandex.DataStreams.StreamName", "")
v.SetDefault("Yandex.DataStreams.MinimumPriority", "")

v.SetDefault("Syslog.Host", "")
v.SetDefault("Syslog.Port", "")
v.SetDefault("Syslog.Protocol", "")
Expand Down
4 changes: 4 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ func forwardEvent(falcopayload types.FalcoPayload) {
go yandexClient.UploadYandexS3(falcopayload)
}

if config.Yandex.DataStreams.StreamName != "" && (falcopayload.Priority >= types.Priority(config.Yandex.DataStreams.MinimumPriority) || falcopayload.Rule == testRule) {
go yandexClient.UploadYandexDataStreams(falcopayload)
}

if config.Syslog.Host != "" && (falcopayload.Priority >= types.Priority(config.Syslog.MinimumPriority) || falcopayload.Rule == testRule) {
go syslogClient.SyslogPost(falcopayload)
}
Expand Down
13 changes: 13 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,19 @@ func init() {
}
}

if config.Yandex.DataStreams.StreamName != "" {
var err error
yandexClient, err = outputs.NewYandexClient(config, stats, promStats, statsdClient, dogstatsdClient)
if err != nil {
config.Yandex.DataStreams.StreamName = ""
log.Printf("[ERROR] : Yandex - %v\n", err)
} else {
if config.Yandex.DataStreams.StreamName != "" {
outputs.EnabledOutputs = append(outputs.EnabledOutputs, "YandexDataStreams")
}
}
}

if config.Syslog.Host != "" {
var err error
syslogClient, err = outputs.NewSyslogClient(config, stats, promStats, statsdClient, dogstatsdClient)
Expand Down
29 changes: 28 additions & 1 deletion outputs/yandex.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"github.com/DataDog/datadog-go/statsd"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid"

"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/falcosecurity/falcosidekick/types"
Expand All @@ -22,7 +24,7 @@ func NewYandexClient(config *types.Configuration, stats *types.Statistics, promS

sess, err := session.NewSession(&aws.Config{
Region: aws.String(config.Yandex.Region),
Endpoint: aws.String(config.Yandex.S3.Endpoint),
Endpoint: aws.String(config.Yandex.Endpoint),
Credentials: credentials.NewStaticCredentials(config.Yandex.AccessKeyID, config.Yandex.SecretAccessKey, ""),
})
if err != nil {
Expand Down Expand Up @@ -68,3 +70,28 @@ func (c *Client) UploadYandexS3(falcopayload types.FalcoPayload) {
go c.CountMetric("outputs", 1, []string{"output:yandexs3", "status:ok"})
c.PromStats.Outputs.With(map[string]string{"destination": "yandexs3", "status": "ok"}).Inc()
}

// UploadYandexDataStreams uploads payload to Yandex Data Streams
func (c *Client) UploadYandexDataStreams(falcoPayLoad types.FalcoPayload) {
svc := kinesis.New(c.AWSSession)

f, _ := json.Marshal(falcoPayLoad)
input := &kinesis.PutRecordInput{
Data: f,
PartitionKey: aws.String(uuid.NewString()),
StreamName: aws.String(c.Config.Yandex.DataStreams.StreamName),
}

resp, err := svc.PutRecord(input)
if err != nil {
go c.CountMetric("outputs", 1, []string{"output:yandexdatastreams", "status:error"})
c.PromStats.Outputs.With(map[string]string{"destination": "yandexdatastreams", "status": Error}).Inc()
log.Printf("[ERROR] : %v Data Streams - %v\n", c.OutputType, err.Error())
return
}

log.Printf("[INFO] : %v Data Streams - Put Record OK (%v)\n", c.OutputType, resp.SequenceNumber)
go c.CountMetric("outputs", 1, []string{"output:yandexdatastreams", "status:ok"})
c.Stats.AWSKinesis.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "yandexdatastreams", "status": "ok"}).Inc()
}
8 changes: 7 additions & 1 deletion types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,15 +444,20 @@ type grafanaOutputConfig struct {
type YandexOutputConfig struct {
AccessKeyID string
SecretAccessKey string
Endpoint string
Region string
S3 YandexS3Config
DataStreams YandexDataStreamsConfig
}
type YandexS3Config struct {
Endpoint string
Prefix string
Bucket string
MinimumPriority string
}
type YandexDataStreamsConfig struct {
StreamName string
MinimumPriority string
}

// SyslogConfig represents config parameters for the syslog client
// Host: the remote syslog host. It can be either an IP address or a domain.
Expand Down Expand Up @@ -512,6 +517,7 @@ type Statistics struct {
Fission *expvar.Map
Grafana *expvar.Map
YandexS3 *expvar.Map
YandexDataStreams *expvar.Map
Syslog *expvar.Map
Cliq *expvar.Map
PolicyReport *expvar.Map
Expand Down

0 comments on commit 90f1a4b

Please sign in to comment.