Skip to content

Commit

Permalink
lsm: efficiently load from cold storage
Browse files Browse the repository at this point in the history
- load and cache index and record separately
- load only projected columns
  • Loading branch information
gernest committed Feb 18, 2024
1 parent 8761772 commit 4755649
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 37 deletions.
55 changes: 31 additions & 24 deletions db/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,23 @@ import (
)

type Part struct {
id string
GranuleID string
*index.FileIndex
record arrow.Record
Data arrow.Record
}

var _ index.Part = (*Part)(nil)

func NewPart(ctx context.Context, db Storage, resource, id string) (*Part, error) {
func NewRecord(ctx context.Context, db Storage, resource, id string, numRows int64, columns []int) (arrow.Record, error) {
b := buffers.Bytes()
defer b.Release()

b.WriteString(resource)
b.Write(slash)
b.WriteString(id)
var fdx *index.FileIndex
err := db.Get(b.Bytes(), func(b []byte) error {
var err error
fdx, err = index.NewFileIndex(bytes.NewReader(b))
return err
})
if err != nil {
return nil, err
}
b.Write(slash)
b.Write(dataPath)
var r arrow.Record
err = db.Get(b.Bytes(), func(b []byte) error {
err := db.Get(b.Bytes(), func(b []byte) error {
f, err := file.NewParquetReader(bytes.NewReader(b),
file.WithReadProps(parquet.NewReaderProperties(
compute.GetAllocator(ctx),
Expand All @@ -52,15 +42,16 @@ func NewPart(ctx context.Context, db Storage, resource, id string) (*Part, error
}
defer f.Close()
pr, err := pqarrow.NewFileReader(f, pqarrow.ArrowReadProperties{
BatchSize: int64(fdx.NumRows()),
BatchSize: numRows,
Parallel: true,
},
compute.GetAllocator(ctx),
)
if err != nil {
return err
}
table, err := pr.ReadTable(ctx)
// There is only one row group
table, err := pr.ReadRowGroups(ctx, columns, []int{0})
if err != nil {
return err
}
Expand All @@ -74,10 +65,26 @@ func NewPart(ctx context.Context, db Storage, resource, id string) (*Part, error
if err != nil {
return nil, err
}
return &Part{
FileIndex: fdx,
record: r,
}, nil
return r, nil
}

func NewIndex(ctx context.Context, db Storage, resource, id string) (*index.FileIndex, error) {
b := buffers.Bytes()
defer b.Release()

b.WriteString(resource)
b.Write(slash)
b.WriteString(id)
var fdx *index.FileIndex
err := db.Get(b.Bytes(), func(b []byte) error {
var err error
fdx, err = index.NewFileIndex(bytes.NewReader(b))
return err
})
if err != nil {
return nil, err
}
return fdx, err
}

func tableToRecord(table arrow.Table) arrow.Record {
Expand All @@ -94,14 +101,14 @@ func tableToRecord(table arrow.Table) arrow.Record {
)
}
func (p *Part) ID() string {
return p.id
return p.GranuleID
}

func (p *Part) Record() arrow.Record {
return p.record
return p.Data
}

func (p *Part) Release() {
p.record.Release()
p.record = nil
p.Data.Release()
p.Data = nil
}
8 changes: 6 additions & 2 deletions db/persistance.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ func NewStore(db Storage, mem memory.Allocator, resource string, ttl time.Durati
}
}

func (s *Store) LoadPart(ctx context.Context, id string) (*Part, error) {
return NewPart(ctx, s.db, s.resource, id)
func (s *Store) LoadRecord(ctx context.Context, id string, numRows int64, columns []int) (arrow.Record, error) {
return NewRecord(ctx, s.db, s.resource, id, numRows, columns)
}

func (s *Store) LoadIndex(ctx context.Context, id string) (*index.FileIndex, error) {
return NewIndex(ctx, s.db, s.resource, id)
}

func (s *Store) Save(r arrow.Record, idx index.Full) (*v1.Granule, error) {
Expand Down
71 changes: 60 additions & 11 deletions lsm/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lsm

import (
"context"
"encoding/binary"
"errors"
"fmt"
"log/slog"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/apache/arrow/go/v15/arrow/compute"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/apache/arrow/go/v15/arrow/util"
"github.com/cespare/xxhash/v2"
"github.com/dgraph-io/ristretto"
"github.com/docker/go-units"
"github.com/oklog/ulid/v2"
Expand Down Expand Up @@ -133,10 +135,14 @@ func NewTree[T any](mem memory.Allocator, resource string, storage db.Storage, i
MaxCost: int64(o.compactSize) * 2,
BufferItems: 64,
OnEvict: func(item *ristretto.Item) {
item.Value.(index.Part).Release()
if r, ok := item.Value.(arrow.Record); ok {
r.Release()
}
},
OnReject: func(item *ristretto.Item) {
item.Value.(index.Part).Release()
if r, ok := item.Value.(arrow.Record); ok {
r.Release()
}
},
})
if err != nil {
Expand Down Expand Up @@ -216,9 +222,11 @@ func (lsm *Tree[T]) Scan(ctx context.Context, start, end int64, fs *v1.Filters)
schema := arrow.NewSchema(fields, nil)
tr, tk := staples.NewTaker(lsm.mem, schema)
defer tr.Release()
lsm.ScanCold(ctx, start, end, compiled, func(r arrow.Record, ts *roaring.Bitmap) {

lsm.ScanCold(ctx, start, end, compiled, project, func(r arrow.Record, ts *roaring.Bitmap) {
tk(r, project, ts.ToArray())
})

lsm.tree.Iterate(func(n *RecordNode) bool {
if n.part == nil {
return true
Expand All @@ -238,27 +246,68 @@ func (lsm *Tree[T]) Scan(ctx context.Context, start, end int64, fs *v1.Filters)
}

func (lsm *Tree[T]) ScanCold(ctx context.Context, start, end int64,
compiled []*filters.CompiledFilter, fn func(r arrow.Record, ts *roaring.Bitmap)) {
compiled []*filters.CompiledFilter, columns []int, fn func(r arrow.Record, ts *roaring.Bitmap)) {
granules := lsm.primary.FindGranules(lsm.resource, start, end)
for _, granule := range granules {
part := lsm.loadPart(ctx, granule)
part := lsm.loadPart(ctx, granule, columns)
if part != nil {
lsm.processPart(part, start, end, compiled, fn)
}
}
}

func (lsm *Tree[T]) loadPart(ctx context.Context, id string) index.Part {
v, ok := lsm.cache.Get(id)
func (lsm *Tree[T]) loadPart(ctx context.Context, id string, columns []int) index.Part {
idx := lsm.loadIndex(ctx, id)
if idx == nil {
return nil
}
r := lsm.loadRecord(ctx, id, int64(idx.NumRows()), columns)
if r == nil {
return nil
}
return &db.Part{
FileIndex: idx,
Data: r,
}
}

func (lsm *Tree[T]) loadIndex(ctx context.Context, id string) *index.FileIndex {
h := new(xxhash.Digest)
h.WriteString(id)
key := h.Sum64()
v, ok := lsm.cache.Get(key)
if !ok {
return v.(*index.FileIndex)
}
part, err := lsm.store.LoadIndex(ctx, id)
if err != nil {
lsm.log.Error("Failed loading granule index to memory", "id", id, "err", err)
return nil
}
lsm.cache.Set(key, part, int64(part.Size()))
return part
}

func (lsm *Tree[T]) loadRecord(ctx context.Context, id string, numRows int64, columns []int) arrow.Record {
h := new(xxhash.Digest)
h.WriteString(id)
var a [8]byte
binary.BigEndian.PutUint64(a[:], uint64(numRows))
h.Write(a[:])
for _, v := range columns {
h.Write(binary.BigEndian.AppendUint32(a[:], uint32(v))[:4])
}
key := h.Sum64()
v, ok := lsm.cache.Get(key)
if !ok {
return v.(index.Part)
return v.(arrow.Record)
}
part, err := lsm.store.LoadPart(ctx, id)
part, err := lsm.store.LoadRecord(ctx, id, numRows, columns)
if err != nil {
lsm.log.Error("Failed loading granule to memory", "id", id, "err", err)
lsm.log.Error("Failed loading granule index to memory", "id", id, "err", err)
return nil
}
lsm.cache.Set(id, part, int64(part.Size()))
lsm.cache.Set(key, part, util.TotalRecordSize(part))
return part
}

Expand Down

0 comments on commit 4755649

Please sign in to comment.