Skip to content

Commit

Permalink
lsm: fix scan
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 24, 2024
1 parent f1cd820 commit a42947d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 30 deletions.
23 changes: 3 additions & 20 deletions internal/lsm/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,28 +210,11 @@ func (lsm *Tree) Scan(ctx context.Context, start, end int64, fs *v1.Filters) (ar
if len(fs.Projection) == 0 {
return nil, errors.New("missing projections")
}
project := make([]int, 0, len(fs.Projection))
project := make([]string, 0, len(fs.Projection))
for _, name := range fs.Projection {
col := events.Mapping[name.String()]
project = append(project, col)
project = append(project, name.String())
}
fields := make([]arrow.Field, len(project))
for i := range project {
fields[i] = events.Schema.Field(project[i])
}
schema := arrow.NewSchema(fields, nil)
tr, tk := staples.NewTaker(lsm.mem, schema)
defer tr.Release()

lsm.ScanCold(ctx, start, end, compiled, project, func(r arrow.Record, ts *roaring.Bitmap) {
tk(r, project, ts.ToArray())
})
lsm.ps.Scan(start, end, func(p index.Part) {
lsm.processPart(p, start, end, compiled, func(r arrow.Record, ts *roaring.Bitmap) {
tk(r, project, ts.ToArray())
})
})
return tr.NewRecord(), nil
return lsm.ps.Scan(start, end, compiled, project), nil
}

func (lsm *Tree) ScanCold(ctx context.Context, start, end int64,
Expand Down
35 changes: 33 additions & 2 deletions internal/lsm/part.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package lsm

import (
"slices"
"sync/atomic"
"time"

"github.com/RoaringBitmap/roaring"
"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/vinceanalytics/vince/internal/closter/events"
"github.com/vinceanalytics/vince/internal/filters"
"github.com/vinceanalytics/vince/internal/index"
"github.com/vinceanalytics/vince/internal/staples"
)

// PartStore is in memory storage of parts
type PartStore struct {
mem memory.Allocator
size atomic.Uint64
tree *RecordNode
merger *staples.Merger
Expand All @@ -21,6 +26,7 @@ type PartStore struct {

func NewPartStore(mem memory.Allocator) *PartStore {
return &PartStore{
mem: mem,
tree: new(RecordNode),
merger: staples.NewMerger(mem, events.Schema),
}
Expand All @@ -39,7 +45,11 @@ func (p *PartStore) Add(r *RecordPart) {
p.tree.Prepend(r)
}

func (p *PartStore) Scan(start, end int64, f func(index.Part)) {
func (p *PartStore) Scan(start, end int64,
compiled []*filters.CompiledFilter,
projected []string) arrow.Record {
b, take := staples.NewTaker(p.mem, projected)
defer b.Release()
p.tree.Iterate(func(n *RecordNode) bool {
if n.part == nil {
return true
Expand All @@ -49,10 +59,31 @@ func (p *PartStore) Scan(start, end int64, f func(index.Part)) {
int64(n.part.Max()),
start, end,
func() {
f(n.part)
r := n.part.Record()
r.Retain()
defer r.Release()
tsCol := r.Column(0).(*array.Int64)
ts := scanRange(tsCol.Int64Values(), start, end)
n.part.Match(ts, compiled)
if ts.IsEmpty() {
return
}
take(r, ts.ToArray())
},
)
})
r := b.NewRecord()
return r
}

func scanRange(ls []int64, start, end int64) *roaring.Bitmap {
b := new(roaring.Bitmap)
from, _ := slices.BinarySearch(ls, start)
to, _ := slices.BinarySearch(ls, end)
for i := from; i < to; i++ {
b.Add(uint32(i))
}
return b
}

type CompactStats struct {
Expand Down
23 changes: 15 additions & 8 deletions internal/staples/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/vinceanalytics/vince/internal/camel"
"github.com/vinceanalytics/vince/internal/closter/events"
)

type Arrow[T any] struct {
Expand Down Expand Up @@ -245,15 +246,21 @@ func merge(b array.Builder) func(arrow.Array) {
}
}

func NewTaker(mem memory.Allocator, as *arrow.Schema) (*array.RecordBuilder, func(arrow.Record, []int, []uint32)) {
b := array.NewRecordBuilder(mem, as)
fields := make([]func(arrow.Array, []uint32), len(b.Fields()))
for i := range fields {
fields[i] = take(b.Field(i))
func NewTaker(mem memory.Allocator, projected []string) (*array.RecordBuilder, func(arrow.Record, []uint32)) {
cols := make([]int, len(projected))
fields := make([]arrow.Field, len(projected))
for i, v := range projected {
cols[i] = events.Mapping[v]
fields[i] = events.Schema.Field(events.Mapping[v])
}
b := array.NewRecordBuilder(mem, arrow.NewSchema(fields, nil))
tf := make([]func(arrow.Array, []uint32), len(projected))
for i, f := range b.Fields() {
tf[i] = take(f)
}
return b, func(v arrow.Record, columns []int, rows []uint32) {
for idx, col := range columns {
fields[idx](v.Column(col), rows)
return b, func(v arrow.Record, rows []uint32) {
for idx, col := range cols {
tf[idx](v.Column(col), rows)
}
}
}
Expand Down

0 comments on commit a42947d

Please sign in to comment.