Skip to content

Commit

Permalink
sebrecords: add and use Batch type
Browse files Browse the repository at this point in the history
  • Loading branch information
micvbang committed Jul 16, 2024
1 parent 03b6e0b commit 8ddb415
Show file tree
Hide file tree
Showing 9 changed files with 433 additions and 122 deletions.
5 changes: 5 additions & 0 deletions cmd/seb/app/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ var dumpCmd = &cobra.Command{

if dumpFlags.dumpRecords {
fmt.Printf("Records:\n")
records, err := batch.IndividualRecords(0, batch.Len())

Check failure on line 104 in cmd/seb/app/dump.go

View workflow job for this annotation

GitHub Actions / test

undefined: batch
if err != nil {
return fmt.Errorf("parsing individual records: %w", err)
}

for i, record := range records {
dumpBytes := helpy.Clamp(dumpFlags.dumpRecordBytes, 1, len(record))
if dumpFlags.dumpRecordBytes == 0 {
Expand Down
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ var (
ErrNotInStorage = fmt.Errorf("not in storage")
ErrUnauthorized = fmt.Errorf("unauthorized")
ErrPayloadTooLarge = fmt.Errorf("payload too large")
ErrBadInput = fmt.Errorf("bad input")
)
60 changes: 56 additions & 4 deletions internal/infrastructure/tester/recordbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/micvbang/go-helpy/inty"
"github.com/micvbang/go-helpy/slicey"
"github.com/micvbang/go-helpy/stringy"
"github.com/micvbang/simple-event-broker/internal/sebrecords"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -33,19 +34,70 @@ func MakeRandomRecordsSize(records int, recordSize int) []sebrecords.Record {
return expectedRecordBatch
}

func RecordsConcat(records []sebrecords.Record) ([]uint32, []byte) {
recordsRaw := make([]byte, 0, 4096)
recordSizes := make([]uint32, len(records))
for i, record := range records {
recordSizes[i] = uint32(len(record))
recordsRaw = append(recordsRaw, record...)
}

return recordSizes, recordsRaw
}

func RequireOffsets(t *testing.T, start uint64, stop uint64, offsets []uint64) {
for offset := start; offset < stop; offset++ {
require.Equal(t, offset, offsets[offset-start])
}
}

func RecordsConcat(records []sebrecords.Record) ([]uint32, []byte) {
recordsRaw := make([]byte, 0, 4096)
func MakeRandomRecordBatch(numRecords int) sebrecords.Batch {
records := make([][]byte, numRecords)
for i := 0; i < len(records); i++ {
records[i] = []byte(stringy.RandomN(1 + inty.RandomN(50)))
}
return RecordsToBatch(records)
}

func MakeRandomRecordBatchSize(numRecords int, recordSize int) sebrecords.Batch {
records := make([][]byte, numRecords)
for i := 0; i < len(records); i++ {
records[i] = []byte(stringy.RandomN(recordSize))
}
return RecordsToBatch(records)
}

func RecordsToBatch(records [][]byte) sebrecords.Batch {
data := make([]byte, 0, 4096)
recordSizes := make([]uint32, len(records))
for i, record := range records {
recordSizes[i] = uint32(len(record))
recordsRaw = append(recordsRaw, record...)
data = append(data, record...)
}

return recordSizes, recordsRaw
return sebrecords.NewBatch(recordSizes, data)
}

func RecordsAsBytes(records []sebrecords.Record) [][]byte {
return slicey.Map(records, func(r sebrecords.Record) []byte {
return r
})
}

func BatchRecords(t testing.TB, batch sebrecords.Batch, start int, end int) []byte {
records, err := batch.Records(start, end)
if err != nil {
t.Fatalf(err.Error())
}

return records
}

func BatchIndividualRecords(t testing.TB, batch sebrecords.Batch, start int, end int) [][]byte {
records, err := batch.IndividualRecords(start, end)
if err != nil {
t.Fatalf(err.Error())
}

return records
}
71 changes: 71 additions & 0 deletions internal/infrastructure/tester/recordbatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package tester_test

import (
"testing"

"github.com/micvbang/simple-event-broker/internal/infrastructure/tester"
"github.com/micvbang/simple-event-broker/internal/sebrecords"
)

func TestBatchIndividualRecords(t *testing.T) {
batch := sebrecords.BatchFromRecords([][]byte{{1}, {2}, {3}})

tests := map[string]struct {
start int
end int
err error
expected [][]byte
}{
// "start > end": {
// start: 1,
// end: 0,
// err: seb.ErrBadInput,
// },
// "end out of bounds": {
// start: 0,
// end: batch.Len() + 1,
// err: seb.ErrOutOfBounds,
// },
// "both out of bounds": {
// start: batch.Len() + 1,
// end: batch.Len() + 2,
// err: seb.ErrOutOfBounds,
// },
"first": {
start: 0,
end: 1,
expected: [][]byte{{1}},
},
"second": {
start: 1,
end: 2,
expected: [][]byte{{2}},
},
"third": {
start: 2,
end: 3,
expected: [][]byte{{3}},
},
"first two": {
start: 0,
end: 2,
expected: [][]byte{{1}, {2}},
},
"second two": {
start: 1,
end: 3,
expected: [][]byte{{2}, {3}},
},
"all": {
start: 0,
end: 3,
expected: [][]byte{{1}, {2}, {3}},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
tester.BatchIndividualRecords(t, batch, test.start, test.end)
})
}
}
86 changes: 86 additions & 0 deletions internal/sebrecords/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package sebrecords

import (
"fmt"

seb "github.com/micvbang/simple-event-broker"
)

type Batch struct {
sizes []uint32
data []byte
indexes []int32
}

func NewBatch(recordSizes []uint32, data []byte) Batch {
indexes := make([]int32, len(recordSizes)+1)
index := int32(0)
for i, recordSize := range recordSizes {
indexes[i] = index
index += int32(recordSize)
}
indexes[len(recordSizes)] = int32(len(data))

return Batch{
sizes: recordSizes,
data: data,
indexes: indexes,
}
}

func BatchFromRecords(records [][]byte) Batch {
totalSize := 0
recordSizes := make([]uint32, len(records))
for i, record := range records {
recordSizes[i] = uint32(len(record))
totalSize += len(record)
}

data := make([]byte, 0, totalSize)
for _, record := range records {
data = append(data, record...)
}

return NewBatch(recordSizes, data)
}

func (b Batch) Len() int {
return len(b.sizes)
}

func (b Batch) Sizes() []uint32 {
return b.sizes
}

func (b Batch) Data() []byte {
return b.data
}

func (b Batch) Records(startIndex int, endIndex int) ([]byte, error) {
if startIndex >= len(b.sizes) || endIndex > len(b.sizes) {
return nil, seb.ErrOutOfBounds
}

if startIndex >= endIndex {
return nil, fmt.Errorf("%w: start (%d) must be smaller than end (%d)", seb.ErrBadInput, startIndex, endIndex)
}

startByte, endByte := b.indexes[startIndex], b.indexes[endIndex]
return b.data[startByte:endByte], nil
}

func (b Batch) IndividualRecords(startIndex int, endIndex int) ([][]byte, error) {
recordsData, err := b.Records(startIndex, endIndex)
if err != nil {
return nil, err
}

records := make([][]byte, endIndex-startIndex)
bytesUsed := uint32(0)
for i := range records {
size := b.sizes[startIndex+i]
records[i] = recordsData[bytesUsed : bytesUsed+size]
bytesUsed += size
}
return records, nil
}
Loading

0 comments on commit 8ddb415

Please sign in to comment.