Skip to content

Commit

Permalink
fix column indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 10, 2024
1 parent 2dda1c9 commit 63b2a6a
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 77 deletions.
12 changes: 3 additions & 9 deletions index/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,12 @@ func (c *ColumnImpl) indexString(d *array.Dictionary, a *array.String) {
if !ok {
b = new(roaring.Bitmap)
c.mapping[name] = b
c.keys = append(c.keys, name)
c.values = append(c.values, b)
}
b.Add(uint32(i))
}
sort.Strings(c.keys)
}

func NewColIdx() *ColumnImpl {
Expand Down Expand Up @@ -222,15 +225,6 @@ func (c *ColumnImpl) Build(name string) (*FullColumn, error) {
if len(c.mapping) == 0 {
return &FullColumn{}, nil
}
c.keys = slices.Grow(c.keys, len(c.mapping))
c.values = slices.Grow(c.values, len(c.mapping))
for k := range c.mapping {
c.keys = append(c.keys, k)
}
sort.Strings(c.keys)
for i := range c.keys {
c.values = append(c.values, c.mapping[c.keys[i]])
}
for i := range c.keys {
err := c.build.Insert([]byte(c.keys[i]), uint64(i))
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion lsm/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ func (lsm *Tree[T]) Start(ctx context.Context) {
tick := time.NewTicker(interval)
defer func() {
tick.Stop()
lsm.Compact(true)
lsm.log.Info("exiting compaction loop")
}()

Expand Down Expand Up @@ -303,6 +302,7 @@ func (lsm *Tree[T]) Compact(persist ...bool) {
lsm.size.Add(-oldSizes)
if len(persist) > 0 {
// force saving after compacting
lsm.log.Debug("Saving compacted record to permanent storage")
idx, err := lsm.index.Index(r)
if err != nil {
lsm.log.Error("Failed building index to persist", "err", err)
Expand All @@ -314,6 +314,7 @@ func (lsm *Tree[T]) Compact(persist ...bool) {
return
}
lsm.primary.Add(lsm.resource, granule)
lsm.log.Debug("Saved record to disc", "size", units.BytesSize(float64(granule.Size)))
return
}
if oldSizes >= lsm.opts.compactSize {
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func app() *cli.Command {
}()
<-ctx.Done()
svr.Shutdown(context.Background())
sess.Close()
return err
},
}
Expand Down
4 changes: 4 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ func (s *Session) Scan(ctx context.Context, start, end int64, fs *v1.Filters) (a
return s.tree.Scan(ctx, start, end, fs)
}

func (s *Session) Close() {
s.tree.Compact(true)
}

func (s *Session) Flush() {
s.mu.Lock()
r := s.build.NewRecord()
Expand Down
85 changes: 18 additions & 67 deletions staples/index.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package staples

import (
"reflect"
"sort"
"sync"
"unsafe"

"github.com/RoaringBitmap/roaring"
Expand All @@ -12,91 +10,44 @@ import (
"github.com/vinceanalytics/vince/camel"
"github.com/vinceanalytics/vince/db"
"github.com/vinceanalytics/vince/filters"
v1 "github.com/vinceanalytics/vince/gen/go/staples/v1"
"github.com/vinceanalytics/vince/index"
"github.com/vinceanalytics/vince/logger"
)

type Index struct {
Browser *index.ColumnImpl
BrowserVersion *index.ColumnImpl
City *index.ColumnImpl
Country *index.ColumnImpl
Domain *index.ColumnImpl
EntryPage *index.ColumnImpl
ExitPage *index.ColumnImpl
Host *index.ColumnImpl
Event *index.ColumnImpl
Os *index.ColumnImpl
OsVersion *index.ColumnImpl
Path *index.ColumnImpl
Referrer *index.ColumnImpl
ReferrerSource *index.ColumnImpl
Region *index.ColumnImpl
Screen *index.ColumnImpl
UtmCampaign *index.ColumnImpl
UtmContent *index.ColumnImpl
UtmMedium *index.ColumnImpl
UtmSource *index.ColumnImpl
UtmTerm *index.ColumnImpl
mapping map[string]*index.ColumnImpl
mu sync.Mutex
}
type Index struct{}

func NewIndex() *Index {
idx := &Index{
Browser: index.NewColIdx(),
BrowserVersion: index.NewColIdx(),
City: index.NewColIdx(),
Country: index.NewColIdx(),
Domain: index.NewColIdx(),
EntryPage: index.NewColIdx(),
ExitPage: index.NewColIdx(),
Host: index.NewColIdx(),
Event: index.NewColIdx(),
Os: index.NewColIdx(),
OsVersion: index.NewColIdx(),
Path: index.NewColIdx(),
Referrer: index.NewColIdx(),
ReferrerSource: index.NewColIdx(),
Region: index.NewColIdx(),
Screen: index.NewColIdx(),
UtmCampaign: index.NewColIdx(),
UtmContent: index.NewColIdx(),
UtmMedium: index.NewColIdx(),
UtmSource: index.NewColIdx(),
UtmTerm: index.NewColIdx(),
mapping: make(map[string]*index.ColumnImpl),
}
r := reflect.ValueOf(idx).Elem()
typ := r.Type()
for i := 0; i < r.NumField(); i++ {
f := typ.Field(i)
if !f.IsExported() {
continue
}
idx.mapping[camel.Case(f.Name)] = r.Field(i).Interface().(*index.ColumnImpl)
}
return idx
return new(Index)
}

var _ index.Index = (*Index)(nil)

var skip = map[string]bool{
camel.Case(v1.Filters_Timestamp.String()): true,
camel.Case(v1.Filters_ID.String()): true,
camel.Case(v1.Filters_Session.String()): true,
camel.Case(v1.Filters_Bounce.String()): true,
camel.Case(v1.Filters_Duration.String()): true,
}

func (idx *Index) Index(r arrow.Record) (index.Full, error) {
idx.mu.Lock()
defer idx.mu.Unlock()
cIdx := index.NewColIdx()
defer cIdx.Release()

o := make(map[string]*index.FullColumn)
for i := 0; i < int(r.NumCols()); i++ {
name := r.ColumnName(i)
x, ok := idx.mapping[name]
if !ok {
if skip[name] {
continue
}
x.Index(r.Column(i).(*array.Dictionary))
n, err := x.Build(name)
cIdx.Index(r.Column(i).(*array.Dictionary))
n, err := cIdx.Build(name)
if err != nil {
return nil, err
}
o[name] = n
cIdx.Reset()
}
lo, hi := db.Timestamps(r)
return NewFullIdx(o, uint64(lo), uint64(hi)), nil
Expand Down

0 comments on commit 63b2a6a

Please sign in to comment.