Skip to content

Commit

Permalink
lsm: initial cold storage scans
Browse files Browse the repository at this point in the history
Not very efficient, we load full table to memory
  • Loading branch information
gernest committed Feb 18, 2024
1 parent 770e378 commit 8761772
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 29 deletions.
107 changes: 107 additions & 0 deletions db/part.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package db

import (
"bytes"
"context"

"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/arrow/compute"
"github.com/apache/arrow/go/v15/parquet"
"github.com/apache/arrow/go/v15/parquet/file"
"github.com/apache/arrow/go/v15/parquet/pqarrow"
"github.com/vinceanalytics/vince/buffers"
"github.com/vinceanalytics/vince/index"
)

type Part struct {
id string
*index.FileIndex
record arrow.Record
}

var _ index.Part = (*Part)(nil)

func NewPart(ctx context.Context, db Storage, resource, id string) (*Part, error) {
b := buffers.Bytes()
defer b.Release()

b.WriteString(resource)
b.Write(slash)
b.WriteString(id)
var fdx *index.FileIndex
err := db.Get(b.Bytes(), func(b []byte) error {
var err error
fdx, err = index.NewFileIndex(bytes.NewReader(b))
return err
})
if err != nil {
return nil, err
}
b.Write(slash)
b.Write(dataPath)
var r arrow.Record
err = db.Get(b.Bytes(), func(b []byte) error {
f, err := file.NewParquetReader(bytes.NewReader(b),
file.WithReadProps(parquet.NewReaderProperties(
compute.GetAllocator(ctx),
)),
)
if err != nil {
return err
}
defer f.Close()
pr, err := pqarrow.NewFileReader(f, pqarrow.ArrowReadProperties{
BatchSize: int64(fdx.NumRows()),
Parallel: true,
},
compute.GetAllocator(ctx),
)
if err != nil {
return err
}
table, err := pr.ReadTable(ctx)
if err != nil {
return err
}
if err != nil {
return err
}
defer table.Release()
r = tableToRecord(table)
return nil
})
if err != nil {
return nil, err
}
return &Part{
FileIndex: fdx,
record: r,
}, nil
}

func tableToRecord(table arrow.Table) arrow.Record {
a := make([]arrow.Array, 0, table.NumCols())
for i := 0; i < int(table.NumCols()); i++ {
col := table.Column(i)
// we read full batch so there is only one array in the chunk
x := col.Data().Chunks()[0]
x.Retain()
a = append(a, x)
}
return array.NewRecord(
table.Schema(), a, table.NumRows(),
)
}
func (p *Part) ID() string {
return p.id
}

func (p *Part) Record() arrow.Record {
return p.record
}

func (p *Part) Release() {
p.record.Release()
p.record = nil
}
5 changes: 5 additions & 0 deletions db/persistance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"bytes"
"context"
"errors"
"fmt"
"log/slog"
Expand Down Expand Up @@ -51,6 +52,10 @@ func NewStore(db Storage, mem memory.Allocator, resource string, ttl time.Durati
}
}

func (s *Store) LoadPart(ctx context.Context, id string) (*Part, error) {
return NewPart(ctx, s.db, s.resource, id)
}

func (s *Store) Save(r arrow.Record, idx index.Full) (*v1.Granule, error) {

// We don't call this frequently. So make sure we run GC when we are done. This
Expand Down
4 changes: 4 additions & 0 deletions index/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ var _ Full = (*FileIndex)(nil)

var baseFileIdxSize = uint64(unsafe.Sizeof(FileIndex{}))

func (f *FileIndex) NumRows() int {
return int(f.meta.Columns[0].NumRows)
}

func (f *FileIndex) Columns(_ func(column Column) error) error {
logger.Fail("FileIndex does not support Columns indexing")
return nil
Expand Down
8 changes: 8 additions & 0 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import (
v1 "github.com/vinceanalytics/vince/gen/go/staples/v1"
)

type Part interface {
Full
ID() string
Record() arrow.Record
Release()
}

type Full interface {
Columns(f func(column Column) error) error
Match(b *roaring.Bitmap, m []*filters.CompiledFilter)
Expand All @@ -30,4 +37,5 @@ type Index interface {

type Primary interface {
Add(resource string, granule *v1.Granule)
FindGranules(resource string, start int64, end int64) []string
}
99 changes: 70 additions & 29 deletions lsm/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
"github.com/apache/arrow/go/v15/arrow/compute"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/apache/arrow/go/v15/arrow/util"
"github.com/dgraph-io/ristretto"
"github.com/docker/go-units"
"github.com/oklog/ulid/v2"
"github.com/vinceanalytics/vince/camel"
"github.com/vinceanalytics/vince/db"
"github.com/vinceanalytics/vince/filters"
v1 "github.com/vinceanalytics/vince/gen/go/staples/v1"
"github.com/vinceanalytics/vince/index"
"github.com/vinceanalytics/vince/logger"
"github.com/vinceanalytics/vince/staples"
)

Expand All @@ -32,7 +34,7 @@ type RecordPart struct {
size uint64
}

var _ Part = (*RecordPart)(nil)
var _ index.Part = (*RecordPart)(nil)

func (r *RecordPart) Record() arrow.Record {
return r.record
Expand All @@ -50,13 +52,6 @@ func (r *RecordPart) Release() {
r.Release()
}

type Part interface {
index.Full
ID() string
Record() arrow.Record
Release()
}

func NewPart(r arrow.Record, idx index.Full) *RecordPart {
r.Retain()
return &RecordPart{
Expand All @@ -67,7 +62,7 @@ func NewPart(r arrow.Record, idx index.Full) *RecordPart {
}
}

type RecordNode = Node[*RecordPart]
type RecordNode = Node[index.Part]

type Tree[T any] struct {
tree *RecordNode
Expand All @@ -87,6 +82,8 @@ type Tree[T any] struct {

nodes []*RecordNode
records []arrow.Record

cache *ristretto.Cache
}

type Options struct {
Expand Down Expand Up @@ -130,6 +127,21 @@ func NewTree[T any](mem memory.Allocator, resource string, storage db.Storage, i
for _, f := range opts {
f(&o)
}

cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1e7,
MaxCost: int64(o.compactSize) * 2,
BufferItems: 64,
OnEvict: func(item *ristretto.Item) {
item.Value.(index.Part).Release()
},
OnReject: func(item *ristretto.Item) {
item.Value.(index.Part).Release()
},
})
if err != nil {
logger.Fail("Failed creating parts cache", "err", err)
}
return &Tree[T]{
tree: &RecordNode{},
index: indexer,
Expand All @@ -147,6 +159,7 @@ func NewTree[T any](mem memory.Allocator, resource string, storage db.Storage, i
slog.String("component", "lsm-tree"),
slog.String("resource", resource),
),
cache: cache,
}
}

Expand Down Expand Up @@ -178,13 +191,7 @@ func (lsm *Tree[T]) findNode(node *RecordNode) (list *RecordNode) {
return
}

type ScanCallback func(context.Context, arrow.Record) error

func (lsm *Tree[T]) Scan(
ctx context.Context,
start, end int64,
fs *v1.Filters,
) (arrow.Record, error) {
func (lsm *Tree[T]) Scan(ctx context.Context, start, end int64, fs *v1.Filters) (arrow.Record, error) {
ctx = compute.WithAllocator(ctx, lsm.mem)
compiled, err := filters.CompileFilters(fs)
if err != nil {
Expand All @@ -209,22 +216,18 @@ func (lsm *Tree[T]) Scan(
schema := arrow.NewSchema(fields, nil)
tr, tk := staples.NewTaker(lsm.mem, schema)
defer tr.Release()

lsm.ScanCold(ctx, start, end, compiled, func(r arrow.Record, ts *roaring.Bitmap) {
tk(r, project, ts.ToArray())
})
lsm.tree.Iterate(func(n *RecordNode) bool {
if n.part == nil {
return true
}
if n.part.Min() <= uint64(end) {
if uint64(start) <= n.part.Max() {
r := n.part.record
r.Retain()
defer r.Release()
ts := ScanTimestamp(r, lsm.mapping[v1.Filters_Timestamp.String()], start, end)
n.part.Match(ts, compiled)
if ts.IsEmpty() {
return true
}
tk(r, project, ts.ToArray())
lsm.processPart(n.part, start, end, compiled, func(r arrow.Record, ts *roaring.Bitmap) {
tk(r, project, ts.ToArray())
})
return true
}
return true
Expand All @@ -234,6 +237,44 @@ func (lsm *Tree[T]) Scan(
return tr.NewRecord(), nil
}

func (lsm *Tree[T]) ScanCold(ctx context.Context, start, end int64,
compiled []*filters.CompiledFilter, fn func(r arrow.Record, ts *roaring.Bitmap)) {
granules := lsm.primary.FindGranules(lsm.resource, start, end)
for _, granule := range granules {
part := lsm.loadPart(ctx, granule)
if part != nil {
lsm.processPart(part, start, end, compiled, fn)
}
}
}

func (lsm *Tree[T]) loadPart(ctx context.Context, id string) index.Part {
v, ok := lsm.cache.Get(id)
if !ok {
return v.(index.Part)
}
part, err := lsm.store.LoadPart(ctx, id)
if err != nil {
lsm.log.Error("Failed loading granule to memory", "id", id, "err", err)
return nil
}
lsm.cache.Set(id, part, int64(part.Size()))
return part
}

func (lsm *Tree[T]) processPart(part index.Part, start, end int64,
compiled []*filters.CompiledFilter, fn func(r arrow.Record, ts *roaring.Bitmap)) {
r := part.Record()
r.Retain()
defer r.Release()
ts := ScanTimestamp(r, lsm.mapping[v1.Filters_Timestamp.String()], start, end)
part.Match(ts, compiled)
if ts.IsEmpty() {
return
}
fn(r, ts)
}

func ScanTimestamp(r arrow.Record, timestampColumn int, start, end int64) *roaring.Bitmap {
b := new(roaring.Bitmap)
ls := r.Column(timestampColumn).(*array.Int64).Int64Values()
Expand Down Expand Up @@ -279,7 +320,7 @@ func (lsm *Tree[T]) Compact(persist ...bool) {
start := time.Now()
defer func() {
for _, r := range lsm.nodes {
r.part.record.Release()
r.part.Release()
}
clear(lsm.nodes)
clear(lsm.records)
Expand All @@ -293,8 +334,8 @@ func (lsm *Tree[T]) Compact(persist ...bool) {
return true
}
lsm.nodes = append(lsm.nodes, n)
lsm.records = append(lsm.records, n.part.record)
oldSizes += n.part.size
lsm.records = append(lsm.records, n.part.Record())
oldSizes += n.part.Size()
return true
})
if oldSizes == 0 {
Expand Down

0 comments on commit 8761772

Please sign in to comment.