Skip to content

Commit

Permalink
Merge pull request #111 from DCSO/full-flow-aggregation
Browse files Browse the repository at this point in the history
implement full flow aggregation
  • Loading branch information
satta committed Jul 3, 2024
2 parents c015441 + ac6a091 commit ae899ec
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 15 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to FEVER will be documented in this file.

## [1.3.6] - 2024-07-03

### Added
- Add support for sending aggregations from all flows, not just TCP
bidirectional ones.

## [1.3.5] - 2023-03-27

### Fixed
Expand Down
4 changes: 3 additions & 1 deletion cmd/fever/cmds/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,9 @@ func mainfunc(cmd *cobra.Command, args []string) {
"state": "disabled",
}).Info("compression of flow stats")
}
ua := processing.MakeUnicornAggregator(submitter, unicornSleep, dummyMode)
allFlows := viper.GetBool("flowreport.all")
ua := processing.MakeUnicornAggregator(submitter, unicornSleep, dummyMode,
allFlows)
testSrcIP := viper.GetString("flowreport.testdata-srcip")
testDestIP := viper.GetString("flowreport.testdata-destip")
testDestPort := viper.GetInt64("flowreport.testdata-destport")
Expand Down
4 changes: 3 additions & 1 deletion fever.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ flowreport:
#testdata-srcip: 0.0.0.1
#testdata-destip: 0.0.0.2
#testdata-destport: 99999
# Set to true to count _all_ flows, not just TCP bidirectional ones.
all: false

# Configuration for metrics (i.e. InfluxDB) submission.
metrics:
Expand Down Expand Up @@ -165,4 +167,4 @@ mgmt:
socket: /tmp/fever-mgmt.sock
# Use network server for gRPC commmunication.
#network: tcp
#host: localhost:9999
#host: localhost:9999
6 changes: 4 additions & 2 deletions processing/unicorn_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type UnicornAggregator struct {
TestFlowSrcIP string
TestFlowDestIP string
TestFlowDestPort int64
AllFlows bool
}

// MakeUnicornAggregate creates a new empty UnicornAggregate object.
Expand All @@ -58,7 +59,7 @@ func MakeUnicornAggregate() *UnicornAggregate {

// MakeUnicornAggregator creates a new empty UnicornAggregator object.
func MakeUnicornAggregator(statsSubmitter util.StatsSubmitter,
submitPeriod time.Duration, dummyMode bool) *UnicornAggregator {
submitPeriod time.Duration, dummyMode bool, allFlows bool) *UnicornAggregator {
a := &UnicornAggregator{
Logger: log.WithFields(log.Fields{
"domain": "aggregate",
Expand All @@ -70,6 +71,7 @@ func MakeUnicornAggregator(statsSubmitter util.StatsSubmitter,
ClosedChan: make(chan bool),
Aggregate: *MakeUnicornAggregate(),
TestFlowDestPort: 99999,
AllFlows: allFlows,
}
return a
}
Expand Down Expand Up @@ -197,7 +199,7 @@ func (a *UnicornAggregator) Stop(stopChan chan bool) {
// aggregated state
func (a *UnicornAggregator) Consume(e *types.Entry) error {
// Unicorn flow aggregation update
if e.EventType == "flow" && e.Proto == "TCP" && e.BytesToClient > 0 {
if e.EventType == "flow" && (a.AllFlows || (e.Proto == "TCP" && e.BytesToClient > 0)) {
a.StringBuf.Write([]byte(e.SrcIP))
a.StringBuf.Write([]byte("_"))
a.StringBuf.Write([]byte(e.DestIP))
Expand Down
89 changes: 78 additions & 11 deletions processing/unicorn_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
log "github.com/sirupsen/logrus"
)

func makeUnicornFlowEvent() types.Entry {
func makeUnicornFlowEvent(proto string) types.Entry {
e := types.Entry{
SrcIP: fmt.Sprintf("10.%d.%d.%d", rand.Intn(250), rand.Intn(250), rand.Intn(250)),
SrcPort: []int64{1, 2, 3, 4, 5}[rand.Intn(5)],
DestIP: fmt.Sprintf("10.0.0.%d", rand.Intn(250)),
DestPort: []int64{11, 12, 13, 14, 15}[rand.Intn(5)],
Timestamp: time.Now().Format(types.SuricataTimestampFormat),
EventType: "flow",
Proto: "TCP",
Proto: proto,
BytesToClient: int64(rand.Intn(10000)),
BytesToServer: int64(rand.Intn(10000)),
PktsToClient: int64(rand.Intn(100)),
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestUnicornAggregatorNoSubmission(t *testing.T) {
dsub := &testSubmitter{
Data: make([]string, 0),
}
f := MakeUnicornAggregator(dsub, 100*time.Millisecond, false)
f := MakeUnicornAggregator(dsub, 100*time.Millisecond, false, false)
f.Run()

time.Sleep(1 * time.Second)
Expand All @@ -128,12 +128,12 @@ func TestUnicornAggregator(t *testing.T) {
dsub := &testSubmitter{
Data: make([]string, 0),
}
f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false)
f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false, false)
f.Run()

createdFlows := make(map[string]int)
for i := 0; i < 200000; i++ {
ev := makeUnicornFlowEvent()
ev := makeUnicornFlowEvent("TCP")
if ev.BytesToClient > 0 {
key := fmt.Sprintf("%s_%s_%d", ev.SrcIP, ev.DestIP, ev.DestPort)
createdFlows[key]++
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestUnicornAggregatorWithTestdata(t *testing.T) {
dsub := &testSubmitter{
Data: make([]string, 0),
}
f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false)
f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false, false)
f.EnableTestFlow("1.2.3.4", "5.6.7.8", 33333)
f.Run()

Expand Down Expand Up @@ -239,7 +239,7 @@ func TestUnicornAggregatorWithDispatch(t *testing.T) {
dsub := &testSubmitter{
Data: make([]string, 0),
}
f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false)
f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false, false)
feedWaitChan := make(chan bool)
outChan := make(chan types.Entry)

Expand All @@ -256,17 +256,21 @@ func TestUnicornAggregatorWithDispatch(t *testing.T) {
f.Run()

createdFlows := make(map[string]int)
for i := 0; i < 200000; i++ {
ev := makeUnicornFlowEvent()
if ev.BytesToClient > 0 {
for i := 0; i < 400000; i++ {
proto := "TCP"
if i%2 == 0 {
proto = "UDP"
}
ev := makeUnicornFlowEvent(proto)
if proto == "TCP" && ev.BytesToClient > 0 {
key := fmt.Sprintf("%s_%s_%d", ev.SrcIP, ev.DestIP, ev.DestPort)
createdFlows[key]++
}
d.Dispatch(&ev)
}

for {
if dsub.GetTotalAggs() < len(createdFlows) {
if dsub.GetTotalAggs() < (len(createdFlows) / 2) {
log.Debug(dsub.GetTotalAggs())
time.Sleep(100 * time.Millisecond)
} else {
Expand Down Expand Up @@ -309,3 +313,66 @@ func TestUnicornAggregatorWithDispatch(t *testing.T) {
}
}
}

func TestUnicornMixedUDPTCP(t *testing.T) {
rand.Seed(time.Now().UTC().UnixNano())
dsub := &testSubmitter{
Data: make([]string, 0),
}
f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false, true)
f.Run()

createdFlows := make(map[string]int)
for i := 0; i < 200000; i++ {
proto := "TCP"
if i%2 == 0 {
proto = "UDP"
}
ev := makeUnicornFlowEvent(proto)
key := fmt.Sprintf("%s_%s_%d", ev.SrcIP, ev.DestIP, ev.DestPort)
createdFlows[key]++
f.Consume(&ev)
}

for {
if dsub.GetTotalAggs() < len(createdFlows) {
log.Debug(dsub.GetTotalAggs())
time.Sleep(100 * time.Millisecond)
} else {
break
}
}

consumeWaitChan := make(chan bool)
f.Stop(consumeWaitChan)
<-consumeWaitChan

if len(dsub.Data) == 0 {
t.Fatalf("collected aggregations are empty")
}

log.Info(dsub.GetTotalAggs(), len(createdFlows), len(dsub.Data))

var totallen int
for _, v := range dsub.Data {
totallen += len(v)
}
if totallen == 0 {
t.Fatalf("length of collected aggregations is zero")
}

if dsub.GetTotalAggs() != len(createdFlows) {
t.Fatalf("unexpected number of flow aggregates: %d/%d", dsub.GetTotalAggs(),
len(createdFlows))
}

for k, v := range dsub.GetFlowTuples() {
if _, ok := createdFlows[k]; !ok {
t.Fatalf("missing flow aggregate: %s", k)
}
if v["count"] != int64(createdFlows[k]) {
t.Fatalf("unexpected number of flows for %s: %d/%d",
k, v["count"], createdFlows[k])
}
}
}

0 comments on commit ae899ec

Please sign in to comment.