Skip to content

Commit

Permalink
Revert "only index filter fields"
Browse files Browse the repository at this point in the history
This reverts commit ccfbdf9.
  • Loading branch information
gernest committed Feb 10, 2024
1 parent ccfbdf9 commit 28b4a1d
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 28 deletions.
8 changes: 5 additions & 3 deletions segment/cold/indes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/stretchr/testify/require"
"github.com/docker/go-units"
"github.com/vinceanalytics/vince/staples"
)

Expand All @@ -24,6 +24,8 @@ func TestIndex(t *testing.T) {
defer r.Release()

seg, err := New(r)
require.NoError(t, err)
_ = seg
if err != nil {
t.Fatal(err)
}
t.Error(units.BytesSize(float64(seg.Size())))
}
16 changes: 10 additions & 6 deletions segment/cold/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import (
)

var skipFields = map[string]struct{}{
camel.Case(v1.Filters_Timestamp.String()): {},
camel.Case(v1.Filters_ID.String()): {},
camel.Case(v1.Filters_Session.String()): {},
camel.Case(v1.Filters_Bounce.String()): {},
camel.Case(v1.Filters_Duration.String()): {},
camel.Case(v1.Filters_ID.String()): {},
camel.Case(v1.Filters_Session.String()): {},
camel.Case(v1.Filters_Bounce.String()): {},
camel.Case(v1.Filters_Duration.String()): {},
}

type Index struct {
Expand All @@ -22,7 +21,12 @@ type Index struct {
}

func New(r arrow.Record) (*Index, error) {
seg, size, err := ice.New([]segment.Document{&record{r: r}}, func(s string, i int) float32 { return 0 })
document := NewRecord(r, func(r arrow.Record, i int) bool {
_, ok := skipFields[r.ColumnName(i)]
return !ok
})
defer document.Release()
seg, size, err := ice.New([]segment.Document{document}, func(s string, i int) float32 { return 0 })
if err != nil {
return nil, err
}
Expand Down
94 changes: 94 additions & 0 deletions segment/cold/int64_field.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package cold

import (
"encoding/binary"
"sync"

"github.com/RoaringBitmap/roaring"
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/vinceanalytics/vince/segment"
)

type Int64 struct {
base
array *array.Int64

// This field can be quiet large. Use bitmaps that we can reuse ro avoid
// excessive allocation.
mapping map[int64]*roaring.Bitmap

tern numericTerm
}

var _ segment.Field = (*Int64)(nil)

func NewInt64(name string, a *array.Int64) *Int64 {
m := make(map[int64]*roaring.Bitmap)
for i, v := range a.Int64Values() {
r, ok := m[v]
if !ok {
r = get()
m[v] = r
}
r.Add(uint32(i))
}
return &Int64{
base: base{
name: name,
len: len(m),
},
array: a,
mapping: m,
}
}

func (i *Int64) EachTerm(vt segment.VisitTerm) {
for k, v := range i.mapping {
vt(i.tern.newTerm(k, v))
}
}

func (i *Int64) Release() {
for _, r := range i.mapping {
release(r)
}
clear(i.mapping)
}

var roaringPool = &sync.Pool{New: func() any { return new(roaring.Bitmap) }}

func get() *roaring.Bitmap {
return roaringPool.Get().(*roaring.Bitmap)
}

func release(b *roaring.Bitmap) {
b.Clear()
roaringPool.Put(b)
}

type numericTerm struct {
term [binary.MaxVarintLen64]byte
n int
bitmap *roaring.Bitmap
}

func (n *numericTerm) newTerm(value int64, r *roaring.Bitmap) *numericTerm {
n.n = binary.PutVarint(n.term[:], value)
n.bitmap = r
return n
}

var _ segment.FieldTerm = (*numericTerm)(nil)

func (t *numericTerm) Term() []byte { return t.term[:t.n] }

func (t *numericTerm) Frequency() int { return int(t.bitmap.GetCardinality()) }

func (t *numericTerm) EachLocation(vl segment.VisitLocation) {
var loc Location
t.bitmap.Iterate(func(x uint32) bool {
loc.pos = int(x)
vl(&loc)
return true
})
}
54 changes: 35 additions & 19 deletions segment/cold/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,47 @@ import (
"github.com/vinceanalytics/vince/segment"
)

type record struct {
r arrow.Record
type Record struct {
record arrow.Record
accept func(r arrow.Record, idx int) bool
}

var _ segment.Document = (*record)(nil)
func NewRecord(r arrow.Record, accept func(arrow.Record, int) bool) *Record {
r.Retain()
if accept == nil {
accept = func(r arrow.Record, i int) bool { return true }
}
return &Record{
record: r,
accept: accept,
}
}

var _ segment.Document = (*Record)(nil)

func (r *record) Analyze() {}
func (r *Record) Analyze() {}

func (r *record) EachField(vf segment.VisitField) {
for i := 0; i < int(r.r.NumCols()); i++ {
_, ok := skipFields[r.r.ColumnName(i)]
if ok {
continue
func (r *Record) Release() {
r.record.Release()
}

func (r *Record) EachField(vf segment.VisitField) {
for i := 0; i < int(r.record.NumCols()); i++ {
if r.accept(r.record, i) {
visitArray(r.record.Column(i), r.record.ColumnName(i), vf)
}
visitArray(r.r.Column(i).(*array.Dictionary), r.r.ColumnName(i), vf)
}
}

func visitArray(a *array.Dictionary, name string, cb segment.VisitField) {
cb(&DictionaryField{
base: base{
name: name,
len: a.Len(),
},
array: a,
data: a.Dictionary().(*array.Binary),
})
func visitArray(a arrow.Array, name string, cb segment.VisitField) {
switch e := a.(type) {
case *array.Int64:
x := NewInt64(name, e)
cb(x)
x.Release()
case *array.Dictionary:
if v, ok := e.Dictionary().(*array.Binary); ok {
cb(NewDict(name, e, v))
}
}
}

0 comments on commit 28b4a1d

Please sign in to comment.