Skip to content

Commit

Permalink
lsm: add test for part store
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 24, 2024
1 parent 0e094ff commit 4f4173f
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 0 deletions.
4 changes: 4 additions & 0 deletions internal/lsm/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func NewPartStore(mem memory.Allocator) *PartStore {
}
}

func (p *PartStore) Release() {
p.merger.Release()
}

func (p *PartStore) Size() uint64 {
return p.size.Load()
}
Expand Down
56 changes: 56 additions & 0 deletions internal/lsm/part_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package lsm

import (
"testing"
"time"

"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/stretchr/testify/require"
"github.com/vinceanalytics/vince/internal/closter/events"
"github.com/vinceanalytics/vince/internal/indexer"
)

func TestPartStore(t *testing.T) {
ps := NewPartStore(memory.NewGoAllocator())
defer ps.Release()

now := events.Now()()
// Three records starting 1 hour apart
first := events.SampleRecord(events.WithNow(nowFunc(now)), events.WithStep(time.Minute))
second := events.SampleRecord(events.WithNow(nowFunc(now.Add(time.Hour))), events.WithStep(time.Minute))
third := events.SampleRecord(events.WithNow(nowFunc(now.Add(2*time.Hour))), events.WithStep(time.Minute))

p1 := mustPart(t, first)
p2 := mustPart(t, second)
p3 := mustPart(t, third)

ps.Add(p1)
ps.Add(p2)
ps.Add(p3)

t.Run("Size", func(t *testing.T) {
wantSize := p1.Size() + p2.Size() + p3.Size()
require.Equal(t, wantSize, ps.Size())
})
t.Run("Compact", func(t *testing.T) {
old := ps.Size()
r, stats := ps.Compact()
require.Equal(t, old, stats.OldSize)
wantRows := first.NumRows() + second.NumRows() + third.NumRows()
require.Equal(t, wantRows, r.NumRows())
require.Zero(t, ps.Size())
})
}

func mustPart(t *testing.T, r arrow.Record) *RecordPart {
t.Helper()
idx, err := indexer.New().Index(r)
if err != nil {
t.Fatal(err)
}
return NewPart(r, idx)
}
func nowFunc(now time.Time) func() time.Time {
return func() time.Time { return now }
}
4 changes: 4 additions & 0 deletions internal/staples/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ func (m *Merger) NewRecord() arrow.Record {
return m.b.NewRecord()
}

func (m *Merger) Release() {
m.b.Release()
}

func NewMerger(mem memory.Allocator, as *arrow.Schema) *Merger {
b := array.NewRecordBuilder(mem, as)
fields := make([]func(arrow.Array), len(b.Fields()))
Expand Down

0 comments on commit 4f4173f

Please sign in to comment.