Skip to content

Commit

Permalink
buffer events processing
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 8, 2024
1 parent 05a0c7b commit be45172
Showing 1 changed file with 31 additions and 8 deletions.
39 changes: 31 additions & 8 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ const (
)

type Session struct {
build *staples.Arrow[staples.Event]
mu sync.Mutex
cache *ristretto.Cache
build *staples.Arrow[staples.Event]
events chan *v1.Event
mu sync.Mutex
cache *ristretto.Cache

tree *lsm.Tree[staples.Event]
log *slog.Logger
Expand All @@ -52,8 +53,9 @@ func New(mem memory.Allocator, resource string, storage db.Storage,
logger.Fail("Failed initializing cache", "err", err)
}
return &Session{
build: staples.NewArrow[staples.Event](mem),
cache: cache,
build: staples.NewArrow[staples.Event](mem),
cache: cache,
events: make(chan *v1.Event, 4<<10),
tree: lsm.NewTree[staples.Event](
mem, resource, storage, indexer, primary, opts...,
),
Expand All @@ -62,21 +64,41 @@ func New(mem memory.Allocator, resource string, storage db.Storage,
}

func (s *Session) Queue(ctx context.Context, req *v1.Event) {
s.events <- req
}

func (s *Session) doProcess(ctx context.Context) {
s.log.Info("Starting events processing loop")
for {
select {
case <-ctx.Done():
return
case e := <-s.events:
s.process(ctx, e)
}
}
}

func (s *Session) process(ctx context.Context, req *v1.Event) {
e := staples.Parse(ctx, req)
if e == nil {
return
}
e.Hit()
if o, ok := s.cache.Get(e.ID); ok {
cached := o.(*staples.Event)
cached.Update(e)
defer e.Release()
} else {
s.cache.SetWithTTL(e.ID, e, 1, DefaultSession)
s.mu.Lock()
// cached can be accessed concurrently. Protect it together with build.
cached.Update(e)
s.build.Append(e)
s.mu.Unlock()
return
}
s.mu.Lock()
s.build.Append(e)
s.mu.Unlock()
s.cache.SetWithTTL(e.ID, e, 1, DefaultSession)
}

func (s *Session) Scan(ctx context.Context, start, end int64, fs *v1.Filters) (arrow.Record, error) {
Expand All @@ -101,6 +123,7 @@ func (s *Session) Flush() {
func (s *Session) Start(ctx context.Context) {
go s.tree.Start(ctx)
go s.doFlush(ctx)
go s.doProcess(ctx)
}

func (s *Session) doFlush(ctx context.Context) {
Expand Down

0 comments on commit be45172

Please sign in to comment.