Skip to content

Commit

Permalink
multi tenant sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 20, 2024
1 parent 96ff5af commit a25eb3b
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 18 deletions.
2 changes: 1 addition & 1 deletion internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,6 @@ func SendEvent(w http.ResponseWriter, r *http.Request) {
request.Write(ctx, w, &v1.SendEventResponse{Dropped: true})
return
}
session.Get(ctx).Queue(ctx, &req)
session.Get(ctx).Queue(ctx, tenant.Default, &req)
request.Write(ctx, w, &v1.SendEventResponse{Dropped: false})
}
3 changes: 2 additions & 1 deletion internal/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/vinceanalytics/vince/internal/guard"
"github.com/vinceanalytics/vince/internal/request"
"github.com/vinceanalytics/vince/internal/session"
"github.com/vinceanalytics/vince/internal/tenant"
)

func ReceiveEvent(w http.ResponseWriter, r *http.Request) {
Expand All @@ -30,7 +31,7 @@ func ReceiveEvent(w http.ResponseWriter, r *http.Request) {
}
ev.Ip = remoteIP(r)
ev.Ua = r.UserAgent()
session.Get(ctx).Queue(ctx, ev)
session.Get(ctx).Queue(ctx, tenant.Default, ev)
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
}
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func App() *cli.Command {
return err
}
idx := staples.NewIndex()
sess := session.New(alloc, "staples", store, idx, pidx,
sess := session.New(alloc, tenants, store, idx, pidx,
lsm.WithTTL(
base.RetentionPeriod.AsDuration(),
),
Expand Down
74 changes: 62 additions & 12 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package session

import (
"context"
"errors"
"log/slog"
"sync"
"time"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/vinceanalytics/vince/internal/logger"
"github.com/vinceanalytics/vince/internal/lsm"
"github.com/vinceanalytics/vince/internal/staples"
"github.com/vinceanalytics/vince/internal/tenant"
)

const (
Expand All @@ -23,19 +25,45 @@ const (
DefaultFlushInterval = time.Minute
)

type Session struct {
build *staples.Arrow[staples.Event]
events chan *v1.Event
mu sync.Mutex
cache *ristretto.Cache
var ErrResourceNotFound = errors.New("session: Resource not found")

tree *lsm.Tree[staples.Event]
log *slog.Logger
type Tenants struct {
cache *ristretto.Cache
sessions map[string]*Session
}

func (t *Tenants) Start(ctx context.Context) {
for _, s := range t.sessions {
s.Start(ctx)
}
}

func New(mem memory.Allocator, resource string, storage db.Storage,
func (t *Tenants) Queue(ctx context.Context, resource string, req *v1.Event) {
r, ok := t.sessions[resource]
if !ok {
return
}
r.Queue(ctx, req)
}

func (t *Tenants) Scan(ctx context.Context, resource string, start int64, end int64, fs *v1.Filters) (arrow.Record, error) {
r, ok := t.sessions[resource]
if !ok {
return nil, ErrResourceNotFound
}
return r.Scan(ctx, start, end, fs)
}

func (t *Tenants) Close() {
for _, s := range t.sessions {
s.Close()
}
}

func New(mem memory.Allocator, tenants *tenant.Tenants, storage db.Storage,
indexer index.Index,
primary index.Primary, opts ...lsm.Option) *Session {
primary index.Primary, opts ...lsm.Option) *Tenants {
o := &Tenants{sessions: map[string]*Session{}}
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1e7,
MaxCost: 100 << 20, // 100MiB
Expand All @@ -52,6 +80,28 @@ func New(mem memory.Allocator, resource string, storage db.Storage,
if err != nil {
logger.Fail("Failed initializing cache", "err", err)
}
for _, v := range tenants.All() {
o.sessions[v.Id] = newSession(
mem, v.Id, cache, storage, indexer, primary, opts...,
)
}
return o
}

type Session struct {
build *staples.Arrow[staples.Event]
events chan *v1.Event
mu sync.Mutex
cache *ristretto.Cache

tree *lsm.Tree[staples.Event]
log *slog.Logger
}

func newSession(mem memory.Allocator, resource string, cache *ristretto.Cache, storage db.Storage,
indexer index.Index,
primary index.Primary, opts ...lsm.Option) *Session {

return &Session{
build: staples.NewArrow[staples.Event](mem),
cache: cache,
Expand Down Expand Up @@ -154,10 +204,10 @@ func (s *Session) doFlush(ctx context.Context) {

type sessionKey struct{}

func With(ctx context.Context, s *Session) context.Context {
func With(ctx context.Context, s *Tenants) context.Context {
return context.WithValue(ctx, sessionKey{}, s)
}

func Get(ctx context.Context) *Session {
return ctx.Value(sessionKey{}).(*Session)
func Get(ctx context.Context) *Tenants {
return ctx.Value(sessionKey{}).(*Tenants)
}
3 changes: 2 additions & 1 deletion internal/stats/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/vinceanalytics/vince/internal/logger"
"github.com/vinceanalytics/vince/internal/request"
"github.com/vinceanalytics/vince/internal/session"
"github.com/vinceanalytics/vince/internal/tenant"
)

func Aggregate(w http.ResponseWriter, r *http.Request) {
Expand All @@ -35,7 +36,7 @@ func Aggregate(w http.ResponseWriter, r *http.Request) {
slices.Sort(metrics)
metricsToProjection(filters, metrics)
from, to := PeriodToRange(ctx, time.Now, req.Period, r.URL.Query())
resultRecord, err := session.Get(ctx).Scan(ctx, from.UnixMilli(), to.UnixMilli(), filters)
resultRecord, err := session.Get(ctx).Scan(ctx, tenant.Default, from.UnixMilli(), to.UnixMilli(), filters)
if err != nil {
logger.Get(ctx).Error("Failed scanning", "err", err)
request.Internal(ctx, w)
Expand Down
3 changes: 2 additions & 1 deletion internal/stats/breakdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/vinceanalytics/vince/internal/logger"
"github.com/vinceanalytics/vince/internal/request"
"github.com/vinceanalytics/vince/internal/session"
"github.com/vinceanalytics/vince/internal/tenant"
)

func BreakDown(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -49,7 +50,7 @@ func BreakDown(w http.ResponseWriter, r *http.Request) {
slices.Sort(req.Property)
selectedColumns := metricsToProjection(filter, req.Metrics, req.Property...)
from, to := PeriodToRange(ctx, time.Now, period, r.URL.Query())
scannedRecord, err := session.Get(ctx).Scan(ctx, from.UnixMilli(), to.UnixMilli(), filter)
scannedRecord, err := session.Get(ctx).Scan(ctx, tenant.Default, from.UnixMilli(), to.UnixMilli(), filter)
if err != nil {
logger.Get(ctx).Error("Failed scanning", "err", err)
request.Internal(ctx, w)
Expand Down
2 changes: 2 additions & 0 deletions internal/stats/current_visitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/vinceanalytics/vince/internal/logger"
"github.com/vinceanalytics/vince/internal/request"
"github.com/vinceanalytics/vince/internal/session"
"github.com/vinceanalytics/vince/internal/tenant"
)

func Realtime(w http.ResponseWriter, r *http.Request) {
Expand All @@ -22,6 +23,7 @@ func Realtime(w http.ResponseWriter, r *http.Request) {
now := time.Now().UTC()
firstTime := now.Add(-5 * time.Minute)
result, err := session.Get(ctx).Scan(ctx,
tenant.Default,
firstTime.UnixMilli(),
now.UnixMilli(),
&v1.Filters{
Expand Down
3 changes: 2 additions & 1 deletion internal/stats/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/vinceanalytics/vince/internal/logger"
"github.com/vinceanalytics/vince/internal/request"
"github.com/vinceanalytics/vince/internal/session"
"github.com/vinceanalytics/vince/internal/tenant"
"github.com/vinceanalytics/vince/internal/timeutil"
)

Expand Down Expand Up @@ -43,7 +44,7 @@ func TimeSeries(w http.ResponseWriter, r *http.Request) {
slices.Sort(metrics)
metricsToProjection(filters, metrics)
from, to := PeriodToRange(ctx, time.Now, req.Period, r.URL.Query())
scanRecord, err := session.Get(ctx).Scan(ctx, from.UnixMilli(), to.UnixMilli(), filters)
scanRecord, err := session.Get(ctx).Scan(ctx, tenant.Default, from.UnixMilli(), to.UnixMilli(), filters)
if err != nil {
logger.Get(ctx).Error("Failed scanning", "err", err)
request.Internal(ctx, w)
Expand Down
6 changes: 6 additions & 0 deletions internal/tenant/tenants.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ func Config(o *v1.Config, domains []string) *v1.Config {
type Tenants struct {
domains map[string]*v1.Tenant
id map[string]*v1.Tenant
all []*v1.Tenant
}

func NewTenants(o *v1.Config) *Tenants {
t := &Tenants{
domains: make(map[string]*v1.Tenant),
id: make(map[string]*v1.Tenant),
all: o.Tenants,
}
for _, v := range o.Tenants {
t.id[v.Id] = v
Expand Down Expand Up @@ -57,3 +59,7 @@ func (t *Tenants) AllDomains() (o []*v1.Domain) {
}
return
}

func (t *Tenants) All() []*v1.Tenant {
return t.all
}

0 comments on commit a25eb3b

Please sign in to comment.