Skip to content

Commit

Permalink
fix: Error handling in AWS Security Lake
Browse files Browse the repository at this point in the history
Closes: #389
Signed-off-by: Michael Gasch <[email protected]>
  • Loading branch information
embano1 committed Feb 2, 2023
1 parent c7c3904 commit 952f92f
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 51 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/aws/aws-sdk-go v1.44.89
github.com/cloudevents/sdk-go/v2 v2.11.0
github.com/eclipse/paho.mqtt.golang v1.4.1
github.com/embano1/memlog v0.4.3
github.com/embano1/memlog v0.4.4
github.com/emersion/go-sasl v0.0.0-20211008083017-0b9dcfb154ac
github.com/emersion/go-smtp v0.15.0
github.com/google/uuid v1.3.0
Expand Down Expand Up @@ -77,7 +77,7 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/gnostic v0.6.9 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
Expand Down Expand Up @@ -129,7 +129,7 @@ require (
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b // indirect
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect
golang.org/x/term v0.0.0-20220722155259-a9ba230a4035 // indirect
golang.org/x/text v0.3.8 // indirect
Expand Down
19 changes: 11 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ github.com/aws/aws-sdk-go-v2/service/s3 v1.11.1/go.mod h1:XLAGFrEjbvMCLvAtWLLP32
github.com/aws/aws-sdk-go-v2/service/sso v1.3.1/go.mod h1:J3A3RGUvuCZjvSuZEcOpHDnzZP/sKbhDWV2T1EOzFIM=
github.com/aws/aws-sdk-go-v2/service/sts v1.6.0/go.mod h1:q7o0j7d7HrJk/vr9uUt3BVRASvcU7gYZB9PUgPiByXg=
github.com/aws/smithy-go v1.6.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
Expand Down Expand Up @@ -268,8 +267,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/eclipse/paho.mqtt.golang v1.4.1 h1:tUSpviiL5G3P9SZZJPC4ZULZJsxQKXxfENpMvdbAXAI=
github.com/eclipse/paho.mqtt.golang v1.4.1/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/embano1/memlog v0.4.3 h1:riVTumM3e3/qBrsKYsml7sSJYvCm6z4V4+4C3hRk0aA=
github.com/embano1/memlog v0.4.3/go.mod h1:yDNCaHBS5MqgHjF5SRlCibLaGqSrcC9DjG3AomLj+bk=
github.com/embano1/memlog v0.4.4 h1:t12/1vR1RXYs/kRB0/Yl6d/CwwSmmJeEGF2sPcKUPR0=
github.com/embano1/memlog v0.4.4/go.mod h1:KwZp72rqDg8jn7LgwwPST1VHuQbEACEpr23SaM0REbw=
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ=
github.com/emersion/go-sasl v0.0.0-20211008083017-0b9dcfb154ac h1:tn/OQ2PmwQ0XFVgAHfjlLyqMewry25Rz7jWnVoh4Ggs=
github.com/emersion/go-sasl v0.0.0-20211008083017-0b9dcfb154ac/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ=
Expand Down Expand Up @@ -407,6 +406,7 @@ github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
github.com/google/flatbuffers v1.11.0 h1:O7CEyB8Cb3/DmtxODGtLHcEvpr81Jm5qLg/hsHnxA2A=
github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/gnostic v0.5.7-v3refs/go.mod h1:73MKFl6jIHelAJNaBGFzt3SPtZULs9dYrGFt8OiIsHQ=
github.com/google/gnostic v0.6.9 h1:ZK/5VhkoX835RikCHpSUJV9a+S3e1zLh59YnyWeBW+0=
Expand All @@ -424,8 +424,9 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
Expand Down Expand Up @@ -587,8 +588,6 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.15.7/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM=
github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
Expand Down Expand Up @@ -731,9 +730,11 @@ github.com/pelletier/go-toml/v2 v2.0.5 h1:ipoSadvV8oGUjnUbMub59IDPPwfxF694nG/jwb
github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -1083,8 +1084,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde h1:ejfdSekXMDxDLbRrJMwUk6KnSLZ2McaUCVcIKM+N6jc=
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -1522,8 +1523,10 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
gotest.tools/v3 v3.4.0 h1:ZazjZUfuVeZGLAmlKKuyv3IKP5orXcwtOwDQH6YVr6o=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
122 changes: 82 additions & 40 deletions outputs/awssecuritylake.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package outputs
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
Expand All @@ -11,10 +12,11 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/embano1/memlog"
"github.com/falcosecurity/falcosidekick/types"
"github.com/google/uuid"
"github.com/xitongsys/parquet-go-source/mem"
"github.com/xitongsys/parquet-go/writer"

"github.com/falcosecurity/falcosidekick/types"
)

const (
Expand Down Expand Up @@ -196,56 +198,96 @@ func getAWSSecurityLakeSeverity(priority types.PriorityType) (int32, string) {
func (c *Client) EnqueueSecurityLake(falcopayload types.FalcoPayload) {
offset, err := c.Config.AWS.SecurityLake.Memlog.Write(c.Config.AWS.SecurityLake.Ctx, []byte(falcopayload.String()))
if err != nil {
if err.Error() != "future offset" {
go c.CountMetric(Outputs, 1, []string{"output:awssecuritylake.", "status:error"})
c.Stats.AWSSecurityLake.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssecuritylake.", "status": Error}).Inc()
log.Printf("[ERROR] : %v SecurityLake. - %v\n", c.OutputType, err)
return
}
go c.CountMetric(Outputs, 1, []string{"output:awssecuritylake.", "status:error"})
c.Stats.AWSSecurityLake.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssecuritylake.", "status": Error}).Inc()
log.Printf("[ERROR] : %v SecurityLake. - %v\n", c.OutputType, err)
return
}
log.Printf("[INFO] : %v SecurityLake. - Event queued (%v)\n", c.OutputType, falcopayload.UUID)
*c.Config.AWS.SecurityLake.WriteOffset = offset
}

func (c *Client) StartSecurityLakeWorker() {
for {
if err := c.processNextBatch(); errors.Is(err, memlog.ErrOutOfRange) {
// don't sleep if we're too slow reading
continue
}

time.Sleep(time.Duration(c.Config.AWS.SecurityLake.Interval) * time.Minute)
// time.Sleep(5 * time.Second)
batch := make([]memlog.Record, c.Config.AWS.SecurityLake.BatchSize)
count, err := c.Config.AWS.SecurityLake.Memlog.ReadBatch(c.Config.AWS.SecurityLake.Ctx, *c.Config.AWS.SecurityLake.ReadOffset+1, batch)
if err != nil {
if err.Error() != "future offset" {
go c.CountMetric(Outputs, 1, []string{"output:awssecuritylake.", "status:error"})
c.Stats.AWSSecurityLake.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssecuritylake.", "status": Error}).Inc()
log.Printf("[ERROR] : %v SecurityLake. - %v\n", c.OutputType, err)
continue
}
}
}

func (c *Client) processNextBatch() error {
awslake := c.Config.AWS.SecurityLake // assumes no concurrent r/w
ctx := awslake.Ctx
ml := awslake.Memlog

sleep := time.Duration(awslake.Interval) * time.Minute
time.Sleep(sleep)

batch := make([]memlog.Record, awslake.BatchSize)
count, err := ml.ReadBatch(ctx, *awslake.ReadOffset+1, batch)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
go c.CountMetric(Outputs, 1, []string{"output:awssecuritylake.", "status:error"})
c.Stats.AWSSecurityLake.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssecuritylake.", "status": Error}).Inc()
log.Printf("[ERROR] : %v SecurityLake. - %v\n", c.OutputType, err)
// ctx currently not handled in main
// https://github.com/falcosecurity/falcosidekick/pull/390#discussion_r1081690326
return err
}
if count > 0 {
var err error
*c.Config.AWS.SecurityLake.ReadOffset = batch[count-1].Metadata.Offset
uid := uuid.New().String()
if count == 1 {
err = c.writeParquet(uid, []memlog.Record{batch[0]})
}
if count > 1 {
err = c.writeParquet(uid, batch[:count-1])
}
if err != nil {
go c.CountMetric(Outputs, 1, []string{"output:awssecuritylake.", "status:error"})
c.Stats.AWSSecurityLake.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssecuritylake.", "status": Error}).Inc()
continue
}
if err != nil {
go c.CountMetric(Outputs, 1, []string{"output:awssecuritylake.", "status:ok"})
c.Stats.AWSSecurityLake.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssecuritylake.", "status": "ok"}).Inc()
}

if errors.Is(err, memlog.ErrOutOfRange) {
earliest, _ := ml.Range(ctx)

go c.CountMetric(Outputs, 1, []string{"output:awssecuritylake.", "status:error"})
c.Stats.AWSSecurityLake.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssecuritylake.", "status": Error}).Inc()

msg := fmt.Errorf("slow batch reader: resetting read offset from %d to %d: %v",
*awslake.ReadOffset,
earliest,
err,
)
log.Printf("[ERROR] : %v SecurityLake. - %v\n", c.OutputType, msg)
earliest = earliest - 1 // to ensure next batch includes earliest as we read from ReadOffset+1
awslake.ReadOffset = &earliest
return err
}

// catch all other errors besides ErrFutureOffset which could contain a partial batch
if !errors.Is(err, memlog.ErrFutureOffset) {
go c.CountMetric(Outputs, 1, []string{"output:awssecuritylake.", "status:error"})
c.Stats.AWSSecurityLake.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssecuritylake.", "status": Error}).Inc()
log.Printf("[ERROR] : %v SecurityLake. - %v\n", c.OutputType, err)
return err
}
}

if count > 0 {
uid := uuid.New().String()

if err := c.writeParquet(uid, batch[:count]); err != nil {
go c.CountMetric(Outputs, 1, []string{"output:awssecuritylake.", "status:error"})
c.Stats.AWSSecurityLake.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssecuritylake.", "status": Error}).Inc()
// we don't update ReadOffset to retry and not skip records
return err
}

go c.CountMetric(Outputs, 1, []string{"output:awssecuritylake.", "status:ok"})
c.Stats.AWSSecurityLake.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssecuritylake.", "status": "ok"}).Inc()

// update offset
*awslake.ReadOffset = batch[count-1].Metadata.Offset
}

return nil
}

func (c *Client) writeParquet(uid string, records []memlog.Record) error {
Expand Down

0 comments on commit 952f92f

Please sign in to comment.