Skip to content

Commit

Permalink
save index
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 10, 2024
1 parent 40598a5 commit 505d9a9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 83 deletions.
21 changes: 21 additions & 0 deletions buffers/buffers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package buffers

import (
"bytes"
"sync"
)

type BytesBuffer struct {
bytes.Buffer
}

func Bytes() *BytesBuffer {
return bytesPool.Get().(*BytesBuffer)
}

func (b *BytesBuffer) Release() {
b.Reset()
bytesPool.Put(b)
}

var bytesPool = &sync.Pool{New: func() any { return new(BytesBuffer) }}
107 changes: 25 additions & 82 deletions db/persistance.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ import (
"log/slog"
"path/filepath"
"strings"
"sync"
"time"

"github.com/RoaringBitmap/roaring"
"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/arrow/ipc"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/dgraph-io/badger/v4"
"github.com/oklog/ulid/v2"
"github.com/vinceanalytics/vince/buffers"
v1 "github.com/vinceanalytics/vince/gen/go/staples/v1"
"github.com/vinceanalytics/vince/index"
)
Expand Down Expand Up @@ -58,41 +57,36 @@ func (s *Store) Save(r arrow.Record, idx index.Full) (*v1.Granule, error) {
defer s.db.GC()

id := ulid.Make().String()
var key bytes.Buffer
key.WriteString(s.resource)
key.Write(slash)
key.WriteString(id)
base := bytes.Clone(key.Bytes())
size, err := s.SaveRecord(&key, base, r)
buf := buffers.Bytes()
defer buf.Release()

buf.WriteString(s.resource)
buf.Write(slash)
buf.WriteString(id)
base := bytes.Clone(buf.Bytes())
size, err := s.SaveRecord(buf, base, r)
if err != nil {
return nil, err
}
buf.Reset()
err = index.WriteFull(buf, idx, id)
if err != nil {
return nil, err
}
err = s.db.Set(base, buf.Bytes(), s.ttl)
if err != nil {
return nil, err
}
var kb bytes.Buffer
idx.Columns(func(column index.Column) error {
if column.Empty() {
// skip empty indexes.
return nil
}
kb.Reset()
kb.Write(slash)
kb.WriteString(column.Name())
return s.SaveIndex(&key, base, kb.Bytes(), column)
})
lo, hi := Timestamps(r)
return &v1.Granule{
Min: lo,
Max: hi,
Min: int64(idx.Min()),
Max: int64(idx.Max()),
Size: size + idx.Size(),
Id: id,
Rows: uint64(r.NumRows()),
}, nil
}

func (s *Store) SaveRecord(
buf *bytes.Buffer,
base []byte,
r arrow.Record,
) (n uint64, err error) {
func (s *Store) SaveRecord(buf *buffers.BytesBuffer, base []byte, r arrow.Record) (n uint64, err error) {
schema := r.Schema()
var x uint64
for i := 0; i < int(r.NumCols()); i++ {
Expand All @@ -105,24 +99,17 @@ func (s *Store) SaveRecord(
return
}

func (s *Store) SaveColumn(
buf *bytes.Buffer,
base []byte,
key string,
field arrow.Field,
a arrow.Array,
) (n uint64, err error) {
func (s *Store) SaveColumn(buf *buffers.BytesBuffer, base []byte, key string, field arrow.Field, a arrow.Array) (n uint64, err error) {
r := array.NewRecord(
arrow.NewSchema([]arrow.Field{field}, nil),
[]arrow.Array{a},
int64(a.Len()),
)
defer r.Release()
b := persistBuffer.Get().(*bytes.Buffer)
b := buffers.Bytes()
defer func() {
n = uint64(b.Len())
b.Reset()
persistBuffer.Put(b)
b.Release()
}()
w := ipc.NewWriter(b,
ipc.WithSchema(r.Schema()),
Expand All @@ -141,59 +128,15 @@ func (s *Store) SaveColumn(
buf.Reset()
buf.Write(base)
buf.Write(slash)
buf.Write(recordBytes)
buf.Write(slash)
buf.WriteString(key)
err = s.db.Set(buf.Bytes(), b.Bytes(), s.ttl)
return
}

func (s *Store) SaveIndex(
buf *bytes.Buffer,
base []byte,
key []byte,
idx index.Column,
) error {
buf.Reset()
buf.Write(base)
buf.Write(slash)
buf.Write(fstBytes)
buf.Write(key) // [resource/id/fst/key]
err := s.db.Set(buf.Bytes(), idx.Fst(), s.ttl)
if err != nil {
return err
}
buf.Reset()
buf.Write(base)
buf.Write(slash)
buf.Write(bitmapBytes)

buf.Write([]byte(key))
buf.Write(slash)

// [resource/id/bitmaps/key]
base = bytes.Clone(buf.Bytes())
return idx.Bitmaps(func(i int, b *roaring.Bitmap) error {
buf.Reset()
buf.Write(base)
fmt.Fprint(buf, i) // [resource/id/bitmaps/key/row]
data, err := b.MarshalBinary()
if err != nil {
return err
}
return s.db.Set(buf.Bytes(), data, s.ttl)
})
}

var (
slash = []byte("/")
fstBytes = []byte("fst")
bitmapBytes = []byte("bitmap")
recordBytes = []byte("record")
slash = []byte("/")
)

var persistBuffer = &sync.Pool{New: func() any { return new(bytes.Buffer) }}

type KV struct {
db *badger.DB
}
Expand Down
2 changes: 1 addition & 1 deletion index/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func chuckFromRaw(raw []byte, chunk *v1.Metadata_Chunk) []byte {
return raw[chunk.Offset : chunk.Offset+chunk.Size]
}

func writeFull(w io.Writer, full Full, id string) error {
func WriteFull(w io.Writer, full Full, id string) error {
b := new(bytes.Buffer)
meta := &v1.Metadata{
Id: id,
Expand Down

0 comments on commit 505d9a9

Please sign in to comment.