Skip to content

Commit

Permalink
refactor lsm.Part to an interface
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 10, 2024
1 parent 6879184 commit 6a9fc79
Showing 1 changed file with 47 additions and 26 deletions.
73 changes: 47 additions & 26 deletions lsm/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,45 @@ import (
"github.com/vinceanalytics/vince/staples"
)

type Part struct {
ID ulid.ULID
Record arrow.Record
Index index.Full
Size uint64
Min int64
Max int64
type RecordPart struct {
id string
record arrow.Record
index.Full
size uint64
}

func NewPart(r arrow.Record, idx index.Full) *Part {
var _ Part = (*RecordPart)(nil)

func (r *RecordPart) Record() arrow.Record {
return r.record
}

func (r *RecordPart) Size() uint64 {
return r.size
}

func (r *RecordPart) ID() string {
return r.id
}

func (r *RecordPart) Release() {
r.Release()
}

type Part interface {
index.Full
ID() string
Record() arrow.Record
Release()
}

func NewPart(r arrow.Record, idx index.Full) *RecordPart {
r.Retain()
return &Part{
ID: ulid.Make(),
Record: r,
Index: idx,
Size: uint64(util.TotalRecordSize(r)) + idx.Size(),
Min: int64(idx.Min()),
Max: int64(idx.Max()),
return &RecordPart{
id: ulid.Make().String(),
record: r,
Full: idx,
size: uint64(util.TotalRecordSize(r)) + idx.Size(),
}
}

Expand Down Expand Up @@ -138,9 +159,9 @@ func (lsm *Tree[T]) Add(r arrow.Record) error {
}

part := NewPart(r, idx)
lsm.size.Add(part.Size)
lsm.size.Add(part.size)
lsm.tree.Prepend(part)
lsm.log.Debug("Added new part", "size", units.BytesSize(float64(part.Size)))
lsm.log.Debug("Added new part", "size", units.BytesSize(float64(part.size)))
return nil
}

Expand Down Expand Up @@ -191,13 +212,13 @@ func (lsm *Tree[T]) Scan(
if n.part == nil {
return true
}
if n.part.Min <= end {
if start <= n.part.Max {
r := n.part.Record
if n.part.Min() <= uint64(end) {
if uint64(start) <= n.part.Max() {
r := n.part.record
r.Retain()
defer r.Release()
ts := ScanTimestamp(r, lsm.mapping[v1.Filters_Timestamp.String()], start, end)
n.part.Index.Match(ts, compiled)
n.part.Match(ts, compiled)
if ts.IsEmpty() {
return true
}
Expand Down Expand Up @@ -248,7 +269,7 @@ func (lsm *Tree[T]) Compact() {
start := time.Now()
defer func() {
for _, r := range lsm.nodes {
r.part.Record.Release()
r.part.record.Release()
}
clear(lsm.nodes)
clear(lsm.records)
Expand All @@ -262,8 +283,8 @@ func (lsm *Tree[T]) Compact() {
return true
}
lsm.nodes = append(lsm.nodes, n)
lsm.records = append(lsm.records, n.part.Record)
oldSizes += n.part.Size
lsm.records = append(lsm.records, n.part.record)
oldSizes += n.part.size
return true
})
if oldSizes == 0 {
Expand Down Expand Up @@ -316,7 +337,7 @@ func (lsm *Tree[T]) persist(r arrow.Record) {

type Node struct {
next atomic.Pointer[Node]
part *Part
part *RecordPart
}

func (n *Node) Iterate(f func(*Node) bool) {
Expand All @@ -335,7 +356,7 @@ func (n *Node) Iterate(f func(*Node) bool) {
}
}

func (n *Node) Prepend(part *Part) *Node {
func (n *Node) Prepend(part *RecordPart) *Node {
return n.prepend(&Node{part: part})
}

Expand Down

0 comments on commit 6a9fc79

Please sign in to comment.