Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Yandex Data Streams support #336

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ It works as a single endpoint for as many as you want `Falco` instances :
- [**Kafka Rest Proxy**](https://docs.confluent.io/platform/current/kafka-rest/index.html)
- [**RabbitMQ**](https://www.rabbitmq.com/)
- [**Azure Event Hubs**](https://azure.microsoft.com/en-in/services/event-hubs/)
- [**Yandex Data Streams**](https://cloud.yandex.com/en/docs/data-streams/)
preved911 marked this conversation as resolved.
Show resolved Hide resolved

### Email

Expand Down Expand Up @@ -454,10 +455,14 @@ yandex:
# secretaccesskey: "" # yandex secret access key
# region: "" # yandex storage region (default: ru-central-1)
s3:
# endpoint: "" yandex storage endpoint (default: https://storage.yandexcloud.net)
# endpoint: "" # yandex storage endpoint (default: https://storage.yandexcloud.net)
# bucket: "falcosidekick" # Yandex storage, bucket name
# prefix: "" # name of prefix, keys will have format: s3://<bucket>/<prefix>/YYYY-MM-DD/YYYY-MM-DDTHH:mm:ss.s+01:00.json
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|erro
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug
datastreams:
# endpoint: "" # Yandex Data Streams endpoint (default: https://yds.serverless.yandexcloud.net)
# streamname: "" # stream name in format /${region}/${folder_id}/${ydb_id}/${stream_name}
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug

syslog:
# host: "" # Syslog host, if not empty, Syslog output is enabled
Expand Down Expand Up @@ -851,7 +856,10 @@ care of lower/uppercases**) : `yaml: a.b --> envvar: A_B` :
- **YANDEX_S3_ENDPOINT**: Yandex storage endpoint (default: https://storage.yandexcloud.net)
- **YANDEX_S3_BUCKET**: Yandex storage, bucket name
- **YANDEX_S3_PREFIX**: name of prefix, keys will have format: s3://<bucket>/<prefix>/YYYY-MM-DD/YYYY-MM-DDTHH:mm:ss.s+01:00.json
- **YANDEX_S3_MINIMUMPRIORITY**: # minimum priority of event for using this output, order is emergency|alert|critical|erro
- **YANDEX_S3_MINIMUMPRIORITY**: # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug
- **YANDEX_DATASTREAMS_ENDPOINT**: Yandex Data Streams endpoint (default: https://yds.serverless.yandexcloud.net)
- **YANDEX_DATASTREAMS_STREAMNAME**: Stream name in format /${region}/${folder_id}/${ydb_id}/${stream_name}
- **YANDEX_DATASTREAMS_MINIMUMPRIORITY**: # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug
- **SYSLOG_HOST**: Syslog Host, if not empty, Syslog output is enabled
- **SYSLOG_PORT**: Syslog endpoint port number
- **SYSLOG_PROTOCOL**: Syslog transport protocol. It can be either "tcp" or "udp"
Expand Down
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,16 @@ func getConfig() *types.Configuration {
v.SetDefault("Yandex.AccessKeyID", "")
v.SetDefault("Yandex.SecretAccessKey", "")
v.SetDefault("Yandex.Region", "ru-central1")

v.SetDefault("Yandex.S3.Endpoint", "https://storage.yandexcloud.net")
v.SetDefault("Yandex.S3.Bucket", "")
v.SetDefault("Yandex.S3.Prefix", "falco")
v.SetDefault("Yamdex.S3.MinimumPriority", "")

v.SetDefault("Yandex.DataStreams.Endpoint", "https://yds.serverless.yandexcloud.net")
v.SetDefault("Yandex.DataStreams.StreamName", "")
v.SetDefault("Yandex.DataStreams.MinimumPriority", "")

v.SetDefault("Syslog.Host", "")
v.SetDefault("Syslog.Port", "")
v.SetDefault("Syslog.Protocol", "")
Expand Down
14 changes: 14 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,17 @@ policyreport:
failthreshold: 4 # events with priority above this threshold are mapped to fail in PolicyReport Summary and lower that those are mapped to warn (default=4)
maxevents: 1000 # the max number of events per report(default: 1000)
prunebypriority: false # if true; the events with lowest severity are pruned first, in FIFO order (default: false)

yandex:
# accesskeyid: "" # yandex access key
# secretaccesskey: "" # yandex secret access key
# region: "" # yandex storage region (default: ru-central-1)
s3:
# endpoint: "" # yandex storage endpoint (default: https://storage.yandexcloud.net)
# bucket: "falcosidekick" # Yandex storage, bucket name
# prefix: "" # name of prefix, keys will have format: s3://<bucket>/<prefix>/YYYY-MM-DD/YYYY-MM-DDTHH:mm:ss.s+01:00.json
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug
datastreams:
# endpoint: "" # Yandex Data Streams endpoint (default: https://yds.serverless.yandexcloud.net)
# streamname: "" # stream name in format /${region}/${folder_id}/${ydb_id}/${stream_name}
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug
4 changes: 4 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ func forwardEvent(falcopayload types.FalcoPayload) {
go yandexClient.UploadYandexS3(falcopayload)
}

if config.Yandex.DataStreams.StreamName != "" && (falcopayload.Priority >= types.Priority(config.Yandex.DataStreams.MinimumPriority) || falcopayload.Rule == testRule) {
go yandexClient.UploadYandexDataStreams(falcopayload)
}

if config.Syslog.Host != "" && (falcopayload.Priority >= types.Priority(config.Syslog.MinimumPriority) || falcopayload.Rule == testRule) {
go syslogClient.SyslogPost(falcopayload)
}
Expand Down
13 changes: 13 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,19 @@ func init() {
}
}

if config.Yandex.DataStreams.StreamName != "" {
var err error
yandexClient, err = outputs.NewYandexClient(config, stats, promStats, statsdClient, dogstatsdClient)
if err != nil {
config.Yandex.DataStreams.StreamName = ""
log.Printf("[ERROR] : Yandex - %v\n", err)
} else {
if config.Yandex.DataStreams.StreamName != "" {
outputs.EnabledOutputs = append(outputs.EnabledOutputs, "YandexDataStreams")
}
}
}

if config.Syslog.Host != "" {
var err error
syslogClient, err = outputs.NewSyslogClient(config, stats, promStats, statsdClient, dogstatsdClient)
Expand Down
50 changes: 47 additions & 3 deletions outputs/yandex.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,39 @@ import (

"github.com/DataDog/datadog-go/statsd"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid"

"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/falcosecurity/falcosidekick/types"
)

// NewYandexClient returns a new output.Client for accessing the Yandex API.
func NewYandexClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics, statsdClient, dogstatsdClient *statsd.Client) (*Client, error) {
resolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) {
switch service {
case endpoints.S3ServiceID:
return endpoints.ResolvedEndpoint{
URL: config.Yandex.S3.Endpoint,
SigningRegion: "ru-central1",
}, nil
case endpoints.KinesisServiceID:
return endpoints.ResolvedEndpoint{
URL: config.Yandex.DataStreams.Endpoint,
SigningRegion: "ru-central1",
}, nil
}

return endpoints.DefaultResolver().EndpointFor(service, region, optFns...)
}

sess, err := session.NewSession(&aws.Config{
Region: aws.String(config.Yandex.Region),
Endpoint: aws.String(config.Yandex.S3.Endpoint),
Credentials: credentials.NewStaticCredentials(config.Yandex.AccessKeyID, config.Yandex.SecretAccessKey, ""),
Region: aws.String(config.Yandex.Region),
Credentials: credentials.NewStaticCredentials(config.Yandex.AccessKeyID, config.Yandex.SecretAccessKey, ""),
EndpointResolver: endpoints.ResolverFunc(resolverFn),
})
if err != nil {
log.Printf("[ERROR] : Yandex - %v\n", "Error while creating Yandex Session")
Expand Down Expand Up @@ -68,3 +87,28 @@ func (c *Client) UploadYandexS3(falcopayload types.FalcoPayload) {
go c.CountMetric("outputs", 1, []string{"output:yandexs3", "status:ok"})
c.PromStats.Outputs.With(map[string]string{"destination": "yandexs3", "status": "ok"}).Inc()
}

// UploadYandexDataStreams uploads payload to Yandex Data Streams
func (c *Client) UploadYandexDataStreams(falcoPayLoad types.FalcoPayload) {
svc := kinesis.New(c.AWSSession)

f, _ := json.Marshal(falcoPayLoad)
input := &kinesis.PutRecordInput{
Data: f,
PartitionKey: aws.String(uuid.NewString()),
StreamName: aws.String(c.Config.Yandex.DataStreams.StreamName),
}

resp, err := svc.PutRecord(input)
if err != nil {
go c.CountMetric("outputs", 1, []string{"output:yandexdatastreams", "status:error"})
c.PromStats.Outputs.With(map[string]string{"destination": "yandexdatastreams", "status": Error}).Inc()
log.Printf("[ERROR] : %v Data Streams - %v\n", c.OutputType, err.Error())
return
}

log.Printf("[INFO] : %v Data Streams - Put Record OK (%v)\n", c.OutputType, resp.SequenceNumber)
go c.CountMetric("outputs", 1, []string{"output:yandexdatastreams", "status:ok"})
c.Stats.YandexDataStreams.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "yandexdatastreams", "status": "ok"}).Inc()
}
1 change: 1 addition & 0 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func getInitStats() *types.Statistics {
Fission: getOutputNewMap("fission"),
Grafana: getOutputNewMap("grafana"),
YandexS3: getOutputNewMap("yandexs3"),
YandexDataStreams: getOutputNewMap("yandexdatastreams"),
Syslog: getOutputNewMap("syslog"),
PolicyReport: getOutputNewMap("policyreport"),
}
Expand Down
7 changes: 7 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,13 +446,19 @@ type YandexOutputConfig struct {
SecretAccessKey string
Region string
S3 YandexS3Config
DataStreams YandexDataStreamsConfig
}
type YandexS3Config struct {
Endpoint string
Prefix string
Bucket string
MinimumPriority string
}
type YandexDataStreamsConfig struct {
Endpoint string
StreamName string
MinimumPriority string
}

// SyslogConfig represents config parameters for the syslog client
// Host: the remote syslog host. It can be either an IP address or a domain.
Expand Down Expand Up @@ -512,6 +518,7 @@ type Statistics struct {
Fission *expvar.Map
Grafana *expvar.Map
YandexS3 *expvar.Map
YandexDataStreams *expvar.Map
Syslog *expvar.Map
Cliq *expvar.Map
PolicyReport *expvar.Map
Expand Down