Skip to content

Commit

Permalink
move schema to events
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 22, 2024
1 parent 12ca2b9 commit 07b3570
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
14 changes: 14 additions & 0 deletions internal/closter/events/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@ import (
"google.golang.org/protobuf/reflect/protoreflect"
)

var Mapping, Schema = mapping()

func mapping() (map[string]int, *arrow.Schema) {
b := New(memory.NewGoAllocator())
defer b.Release()
r := b.NewRecord()
defer r.Release()
o := make(map[string]int)
for i := 0; i < int(r.NumCols()); i++ {
o[r.ColumnName(i)] = i
}
return o, r.Schema()
}

type Builder struct {
r *array.RecordBuilder
fields map[protoreflect.FieldNumber]buildFunc
Expand Down
22 changes: 4 additions & 18 deletions internal/lsm/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ type Tree struct {

primary index.Primary
resource string
mapping map[string]int
schema *arrow.Schema

nodes []*RecordNode
records []arrow.Record
Expand Down Expand Up @@ -140,15 +138,8 @@ func WithTTL(ttl time.Duration) Option {
}

func NewTree(mem memory.Allocator, resource string, storage db.Storage, indexer index.Index, primary index.Primary, opts ...Option) *Tree {
eb := events.New(mem)
defer eb.Release()
schema := eb.NewRecord().Schema()

m := staples.NewMerger(mem, schema)
mapping := make(map[string]int)
for i, f := range schema.Fields() {
mapping[f.Name] = i
}
m := staples.NewMerger(mem, events.Schema)
o := DefaultLSMOptions()
for _, f := range opts {
f(&o)
Expand Down Expand Up @@ -181,8 +172,6 @@ func NewTree(mem memory.Allocator, resource string, storage db.Storage, indexer
primary: primary,
resource: resource,
opts: o,
mapping: mapping,
schema: schema,
nodes: make([]*RecordNode, 0, 64),
records: make([]arrow.Record, 0, 64),
log: slog.Default().With(
Expand Down Expand Up @@ -236,15 +225,12 @@ func (lsm *Tree) Scan(ctx context.Context, start, end int64, fs *v1.Filters) (ar
}
project := make([]int, 0, len(fs.Projection))
for _, name := range fs.Projection {
col, ok := lsm.mapping[name.String()]
if !ok {
return nil, fmt.Errorf("column %s does not exist", name)
}
col := events.Mapping[name.String()]
project = append(project, col)
}
fields := make([]arrow.Field, len(project))
for i := range project {
fields[i] = lsm.schema.Field(project[i])
fields[i] = events.Schema.Field(project[i])
}
schema := arrow.NewSchema(fields, nil)
tr, tk := staples.NewTaker(lsm.mem, schema)
Expand Down Expand Up @@ -338,7 +324,7 @@ func (lsm *Tree) processPart(part index.Part, start, end int64,
r := part.Record()
r.Retain()
defer r.Release()
ts := ScanTimestamp(r, lsm.mapping[columns.Timestamp], start, end)
ts := ScanTimestamp(r, events.Mapping[columns.Timestamp], start, end)
part.Match(ts, compiled)
if ts.IsEmpty() {
return
Expand Down

0 comments on commit 07b3570

Please sign in to comment.