Skip to content

Commit

Permalink
Revert "remove db.Store"
Browse files Browse the repository at this point in the history
This reverts commit e800083.
  • Loading branch information
gernest committed Feb 10, 2024
1 parent 28b4a1d commit b238324
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 0 deletions.
176 changes: 176 additions & 0 deletions db/persistance.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package db

import (
"bytes"
"context"
"errors"
"fmt"
"log/slog"
"path/filepath"
"strings"
"sync"
"time"

"github.com/RoaringBitmap/roaring"
"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/arrow/ipc"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/dgraph-io/badger/v4"
"github.com/oklog/ulid/v2"
v1 "github.com/vinceanalytics/vince/gen/go/staples/v1"
"github.com/vinceanalytics/vince/index"
)

var ErrKeyNotFound = errors.New("db: key not found")
Expand All @@ -21,6 +31,172 @@ type Storage interface {
Close() error
}

type Store struct {
db Storage
mem memory.Allocator
ttl time.Duration
resource string
log *slog.Logger
}

func NewStore(db Storage, mem memory.Allocator, resource string, ttl time.Duration) *Store {
return &Store{
db: db,
ttl: ttl,
resource: resource,
mem: mem,
log: slog.Default().With(
slog.String("component", "lsm-store"),
slog.String("resource", resource),
),
}
}

func (s *Store) Save(r arrow.Record, idx index.Full) (*v1.Granule, error) {

// We don't call this frequently. So make sure we run GC when we are done. This
// removes the need for periodic GC calls.
defer s.db.GC()

id := ulid.Make().String()
var key bytes.Buffer
key.WriteString(s.resource)
key.Write(slash)
key.WriteString(id)
base := bytes.Clone(key.Bytes())
size, err := s.SaveRecord(&key, base, r)
if err != nil {
return nil, err
}
var kb bytes.Buffer
idx.Columns(func(column index.Column) error {
if column.Empty() {
// skip empty indexes.
return nil
}
kb.Reset()
for _, p := range column.Path() {
kb.Write(slash)
kb.WriteString(p)
}
return s.SaveIndex(&key, base, kb.Bytes(), column)
})
lo, hi := Timestamps(r)
return &v1.Granule{
Min: lo,
Max: hi,
Size: size + idx.Size(),
Id: id,
Rows: uint64(r.NumRows()),
}, nil
}

func (s *Store) SaveRecord(
buf *bytes.Buffer,
base []byte,
r arrow.Record,
) (n uint64, err error) {
schema := r.Schema()
var x uint64
for i := 0; i < int(r.NumCols()); i++ {
x, err = s.SaveColumn(buf, base, r.ColumnName(i), schema.Field(i), r.Column(i))
if err != nil {
return
}
n += x
}
return
}

func (s *Store) SaveColumn(
buf *bytes.Buffer,
base []byte,
key string,
field arrow.Field,
a arrow.Array,
) (n uint64, err error) {
r := array.NewRecord(
arrow.NewSchema([]arrow.Field{field}, nil),
[]arrow.Array{a},
int64(a.Len()),
)
defer r.Release()
b := persistBuffer.Get().(*bytes.Buffer)
defer func() {
n = uint64(b.Len())
b.Reset()
persistBuffer.Put(b)
}()
w := ipc.NewWriter(b,
ipc.WithSchema(r.Schema()),
ipc.WithAllocator(s.mem),
ipc.WithZstd(),
ipc.WithMinSpaceSavings(0.3), //at least 30% savings
)
err = w.Write(r)
if err != nil {
return
}
err = w.Close()
if err != nil {
return
}
buf.Reset()
buf.Write(base)
buf.Write(slash)
buf.Write(recordBytes)
buf.Write(slash)
buf.WriteString(key)
err = s.db.Set(buf.Bytes(), b.Bytes(), s.ttl)
return
}

func (s *Store) SaveIndex(
buf *bytes.Buffer,
base []byte,
key []byte,
idx index.Column,
) error {
buf.Reset()
buf.Write(base)
buf.Write(slash)
buf.Write(fstBytes)
buf.Write(key) // [resource/id/fst/key]
err := s.db.Set(buf.Bytes(), idx.Fst(), s.ttl)
if err != nil {
return err
}
buf.Reset()
buf.Write(base)
buf.Write(slash)
buf.Write(bitmapBytes)

buf.Write([]byte(key))
buf.Write(slash)

// [resource/id/bitmaps/key]
base = bytes.Clone(buf.Bytes())
return idx.Bitmaps(func(i int, b *roaring.Bitmap) error {
buf.Reset()
buf.Write(base)
fmt.Fprint(buf, i) // [resource/id/bitmaps/key/row]
data, err := b.MarshalBinary()
if err != nil {
return err
}
return s.db.Set(buf.Bytes(), data, s.ttl)
})
}

var (
slash = []byte("/")
fstBytes = []byte("fst")
bitmapBytes = []byte("bitmap")
recordBytes = []byte("record")
)

var persistBuffer = &sync.Pool{New: func() any { return new(bytes.Buffer) }}

type KV struct {
db *badger.DB
}
Expand Down
19 changes: 19 additions & 0 deletions lsm/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Tree[T any] struct {
index index.Index
mem memory.Allocator
merger *staples.Merger
store *db.Store

opts Options
log *slog.Logger
Expand Down Expand Up @@ -113,6 +114,7 @@ func NewTree[T any](mem memory.Allocator, resource string, storage db.Storage, i
index: indexer,
mem: mem,
merger: m,
store: db.NewStore(storage, mem, resource, o.ttl),
primary: primary,
resource: resource,
opts: o,
Expand Down Expand Up @@ -296,6 +298,23 @@ func (lsm *Tree[T]) Compact() {
}

func (lsm *Tree[T]) persist(r arrow.Record) {
idx, err := lsm.index.Index(r)
if err != nil {
lsm.log.Error("Failed building index for record", "err", err)
return
}
result, err := lsm.store.Save(r, idx)
if err != nil {
lsm.log.Error("Failed saving record to permanent storage", "err", err)
return
}
lsm.log.Info("Saved indexed record to permanent storage",
slog.String("id", result.Id),
slog.Uint64("after_merge_size", result.Size),
slog.Time("min_ts", time.Unix(0, int64(result.Min))),
slog.Time("max_ts", time.Unix(0, int64(result.Max))),
)
lsm.primary.Add(lsm.resource, result)
return
}

Expand Down

0 comments on commit b238324

Please sign in to comment.