Skip to content

Commit

Permalink
lsm: separate part store from lsm
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 24, 2024
1 parent 9717e6c commit 0e094ff
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 67 deletions.
83 changes: 16 additions & 67 deletions internal/lsm/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ func NewPart(r arrow.Record, idx index.Full) *RecordPart {
type RecordNode = Node[index.Part]

type Tree struct {
tree *RecordNode
size atomic.Uint64
ps *PartStore
index index.Index
mem memory.Allocator
merger *staples.Merger
Expand Down Expand Up @@ -164,7 +163,7 @@ func NewTree(mem memory.Allocator, resource string, storage db.Storage, indexer
logger.Fail("Failed creating parts cache", "err", err)
}
return &Tree{
tree: &RecordNode{},
ps: NewPartStore(mem),
index: indexer,
mem: mem,
merger: m,
Expand Down Expand Up @@ -194,25 +193,13 @@ func (lsm *Tree) Add(r arrow.Record) error {
}

part := NewPart(r, idx)
lsm.size.Add(part.size)
lsm.tree.Prepend(part)
lsm.ps.Add(part)
lsm.log.Debug("Added new part", "size", units.BytesSize(float64(part.size)))
lsm.Metrics.NodeSize.Update(float64(part.size))
lsm.Metrics.TreeSize.Update(float64(lsm.Size()))
return nil
}

func (lsm *Tree) findNode(node *RecordNode) (list *RecordNode) {
lsm.tree.Iterate(func(n *RecordNode) bool {
if n.next.Load() == node {
list = n
return false
}
return true
})
return
}

func (lsm *Tree) Scan(ctx context.Context, start, end int64, fs *v1.Filters) (arrow.Record, error) {
ctx = compute.WithAllocator(ctx, lsm.mem)
compiled, err := filters.CompileFilters(fs)
Expand All @@ -239,15 +226,9 @@ func (lsm *Tree) Scan(ctx context.Context, start, end int64, fs *v1.Filters) (ar
lsm.ScanCold(ctx, start, end, compiled, project, func(r arrow.Record, ts *roaring.Bitmap) {
tk(r, project, ts.ToArray())
})

lsm.tree.Iterate(func(n *RecordNode) bool {
if n.part == nil {
return true
}
return index.AcceptWith(int64(n.part.Min()), int64(n.part.Max()), start, end, func() {
lsm.processPart(n.part, start, end, compiled, func(r arrow.Record, ts *roaring.Bitmap) {
tk(r, project, ts.ToArray())
})
lsm.ps.Scan(start, end, func(p index.Part) {
lsm.processPart(p, start, end, compiled, func(r arrow.Record, ts *roaring.Bitmap) {
tk(r, project, ts.ToArray())
})
})
return tr.NewRecord(), nil
Expand Down Expand Up @@ -369,61 +350,29 @@ func (lsm *Tree) Start(ctx context.Context) {
//
// Cold data is still scanned by lsm tree but no account is about about its size.
func (lsm *Tree) Size() uint64 {
return lsm.size.Load()
return lsm.ps.Size()
}

func (lsm *Tree) Compact(persist ...bool) {
lsm.log.Debug("Start compaction")
var oldSizes uint64
start := time.Now()
defer func() {
nodes := len(lsm.nodes)
for _, r := range lsm.nodes {
r.part.Release()
}
clear(lsm.nodes)
clear(lsm.records)
lsm.nodes = lsm.nodes[:0]
lsm.records = lsm.records[:0]
if oldSizes != 0 {
lsm.Metrics.CompactionDuration.UpdateDuration(time.Now())
lsm.Metrics.CompactionCounter.Inc()
lsm.Metrics.NodesPerCompaction.Update(float64(nodes))
}
}()

lsm.tree.Iterate(func(n *RecordNode) bool {
if n.part == nil || !n.part.CanIndex() {
return true
}
lsm.nodes = append(lsm.nodes, n)
lsm.records = append(lsm.records, n.part.Record())
oldSizes += n.part.Size()
return true
})
if oldSizes == 0 {
lsm.log.Debug("Skipping compaction, there is nothing in lsm tree")
return
}
lsm.log.Debug("Compacting", "nodes", len(lsm.nodes), "size", oldSizes)
r := lsm.merger.Merge(lsm.records...)
r, stats := lsm.ps.Compact()
defer r.Release()
node := lsm.findNode(lsm.nodes[0])
x := &RecordNode{}
for !node.next.CompareAndSwap(lsm.nodes[0], x) {
node = lsm.findNode(lsm.nodes[0])
}
lsm.size.Add(-oldSizes)
if oldSizes >= lsm.opts.compactSize || len(persist) > 0 {
lsm.persist(r)
if r.NumRows() == 0 {
lsm.log.Debug("Skipping compaction, there is nothing in lsm tree")
return
}

lsm.log.Debug("Compacted", "nodes", stats.CompactedNodesCount, "size", stats.OldSize)
err := lsm.Add(r)
if err != nil {
lsm.log.Error("Failed adding compacted record to lsm", "err", err)
return
}
lsm.log.Debug("Completed compaction", "elapsed", time.Since(start).String())
lsm.log.Debug("Completed compaction", "elapsed", stats.Elapsed.String())
lsm.Metrics.CompactionDuration.UpdateDuration(start)
lsm.Metrics.CompactionCounter.Inc()
lsm.Metrics.NodesPerCompaction.Update(float64(stats.CompactedNodesCount))
}

func (lsm *Tree) persist(r arrow.Record) {
Expand Down
104 changes: 104 additions & 0 deletions internal/lsm/part.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package lsm

import (
"sync/atomic"
"time"

"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/vinceanalytics/vince/internal/closter/events"
"github.com/vinceanalytics/vince/internal/index"
"github.com/vinceanalytics/vince/internal/staples"
)

// PartStore is in memory storage of parts
type PartStore struct {
size atomic.Uint64
tree *RecordNode
merger *staples.Merger
nodes []*RecordNode
}

func NewPartStore(mem memory.Allocator) *PartStore {
return &PartStore{
tree: new(RecordNode),
merger: staples.NewMerger(mem, events.Schema),
}
}

func (p *PartStore) Size() uint64 {
return p.size.Load()
}

func (p *PartStore) Add(r *RecordPart) {
p.size.Add(r.Size())
p.tree.Prepend(r)
}

func (p *PartStore) Scan(start, end int64, f func(index.Part)) {
p.tree.Iterate(func(n *RecordNode) bool {
if n.part == nil {
return true
}
return index.AcceptWith(
int64(n.part.Min()),
int64(n.part.Max()),
start, end,
func() {
f(n.part)
},
)
})
}

type CompactStats struct {
OldSize uint64
CompactedNodesCount int
Elapsed time.Duration
}

func (p *PartStore) Compact() (r arrow.Record, stats CompactStats) {
start := time.Now()
defer func() {
for _, r := range p.nodes {
r.part.Release()
r.part = nil
}
clear(p.nodes)
stats.CompactedNodesCount = len(p.nodes)
p.nodes = p.nodes[:0]
stats.Elapsed = time.Since(start)
}()
p.tree.Iterate(func(n *RecordNode) bool {
if n.part == nil {
return true
}
p.merger.Add(n.part.Record())
stats.OldSize += n.part.Size()
p.nodes = append(p.nodes, n)
return true
})
if stats.OldSize == 0 {
r = p.merger.NewRecord()
return
}
node := p.findNode(p.nodes[0])
x := &RecordNode{}
for !node.next.CompareAndSwap(p.nodes[0], x) {
node = p.findNode(p.nodes[0])
}
p.size.Add(-stats.OldSize)
r = p.merger.NewRecord()
return
}

func (p *PartStore) findNode(node *RecordNode) (list *RecordNode) {
p.tree.Iterate(func(n *RecordNode) bool {
if n.next.Load() == node {
list = n
return false
}
return true
})
return
}
8 changes: 8 additions & 0 deletions internal/staples/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ func (m *Merger) Merge(records ...arrow.Record) arrow.Record {
return m.b.NewRecord()
}

func (m *Merger) Add(record arrow.Record) {
m.merge(record)
}

func (m *Merger) NewRecord() arrow.Record {
return m.b.NewRecord()
}

func NewMerger(mem memory.Allocator, as *arrow.Schema) *Merger {
b := array.NewRecordBuilder(mem, as)
fields := make([]func(arrow.Array), len(b.Fields()))
Expand Down

0 comments on commit 0e094ff

Please sign in to comment.