Skip to content

Commit

Permalink
move stats handlers to plain http
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 7, 2024
1 parent 5ba4c40 commit 106ef56
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 80 deletions.
18 changes: 16 additions & 2 deletions request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@ const (
maxBodySize = 1 << 20
)

func Read(w http.ResponseWriter, r *http.Request, o proto.Message, valid *protovalidate.Validator) bool {
type validatorKey struct{}

func With(ctx context.Context, v *protovalidate.Validator) context.Context {
return context.WithValue(ctx, validatorKey{}, v)
}

func Get(ctx context.Context) *protovalidate.Validator {
return ctx.Value(validatorKey{}).(*protovalidate.Validator)
}

func Read(w http.ResponseWriter, r *http.Request, o proto.Message) bool {
ctx := r.Context()
if r.ContentLength == 0 || r.ContentLength > maxBodySize {
logger.Get(ctx).Error("Invalid content length", "contentLength", r.ContentLength)
Expand All @@ -37,7 +47,7 @@ func Read(w http.ResponseWriter, r *http.Request, o proto.Message, valid *protov
Error(ctx, w, http.StatusBadRequest, http.StatusText(http.StatusBadRequest))
return false
}
if err := valid.Validate(o); err != nil {
if err := Get(ctx).Validate(o); err != nil {
logger.Get(ctx).Error("Failed validating request body", "err", err)
Error(ctx, w, http.StatusBadRequest, err.Error())
return false
Expand Down Expand Up @@ -67,6 +77,10 @@ func Error(ctx context.Context, w http.ResponseWriter, code int, reason string)
}
}

func Internal(ctx context.Context, w http.ResponseWriter) {
Error(ctx, w, http.StatusInternalServerError, "Something went wrong")
}

type errResult struct {
Reason string `json:"reason"`
}
45 changes: 27 additions & 18 deletions stats/aggregate.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package stats

import (
"context"
"net/http"
"slices"
"time"

Expand All @@ -10,10 +10,16 @@ import (
"github.com/apache/arrow/go/v15/arrow/math"
v1 "github.com/vinceanalytics/vince/gen/go/staples/v1"
"github.com/vinceanalytics/vince/logger"
"github.com/vinceanalytics/vince/request"
"github.com/vinceanalytics/vince/session"
)

func Aggregate(ctx context.Context, req *v1.Aggregate_Request) (*v1.Aggregate_Response, error) {
func Aggregate(w http.ResponseWriter, r *http.Request) {
var req v1.Aggregate_Request
if !request.Read(w, r, &req) {
return
}
ctx := r.Context()
filters := &v1.Filters{
List: append(req.Filters, &v1.Filter{
Property: v1.Property_domain,
Expand All @@ -25,15 +31,16 @@ func Aggregate(ctx context.Context, req *v1.Aggregate_Request) (*v1.Aggregate_Re
slices.Sort(metrics)
metricsToProjection(filters, metrics)
from, to := PeriodToRange(time.Now, req.Period)
r, err := session.Get(ctx).Scan(ctx, from.UnixMilli(), to.UnixMilli(), filters)
resultRecord, err := session.Get(ctx).Scan(ctx, from.UnixMilli(), to.UnixMilli(), filters)
if err != nil {
logger.Get(ctx).Error("Failed scanning", "err", err)
return nil, InternalError
request.Internal(ctx, w)
return
}
defer r.Release()
defer resultRecord.Release()
mapping := map[string]int{}
for i := 0; i < int(r.NumCols()); i++ {
mapping[r.ColumnName(i)] = i
for i := 0; i < int(resultRecord.NumCols()); i++ {
mapping[resultRecord.ColumnName(i)] = i
}
var result []*v1.Value
var visits *float64
Expand All @@ -42,21 +49,22 @@ func Aggregate(ctx context.Context, req *v1.Aggregate_Request) (*v1.Aggregate_Re
var value float64
switch metric {
case v1.Metric_pageviews:
a := r.Column(mapping[v1.Filters_Event.String()])
a := resultRecord.Column(mapping[v1.Filters_Event.String()])
count := calcPageViews(a)
view = &count
value = count
case v1.Metric_visitors:
a := r.Column(mapping[v1.Filters_ID.String()])
a := resultRecord.Column(mapping[v1.Filters_ID.String()])
u, err := compute.Unique(ctx, compute.NewDatumWithoutOwning(a))
if err != nil {
logger.Get(ctx).Error("Failed calculating visitors", "err", err)
return nil, InternalError
request.Internal(ctx, w)
return
}
value = float64(u.Len())
u.Release()
case v1.Metric_visits:
a := r.Column(mapping[v1.Filters_Session.String()]).(*array.Int64)
a := resultRecord.Column(mapping[v1.Filters_Session.String()]).(*array.Int64)
sum := float64(math.Int64.Sum(a))
visits = &sum
value = sum
Expand All @@ -65,17 +73,17 @@ func Aggregate(ctx context.Context, req *v1.Aggregate_Request) (*v1.Aggregate_Re
if visits != nil {
vis = *visits
} else {
a := r.Column(mapping[v1.Filters_Session.String()]).(*array.Int64)
a := resultRecord.Column(mapping[v1.Filters_Session.String()]).(*array.Int64)
vis = float64(math.Int64.Sum(a))
}
a := r.Column(mapping[v1.Filters_Bounce.String()]).(*array.Int64)
a := resultRecord.Column(mapping[v1.Filters_Bounce.String()]).(*array.Int64)
sum := float64(math.Int64.Sum(a))
if vis != 0 {
sum /= vis
}
value = sum
case v1.Metric_visit_duration:
a := r.Column(mapping[v1.Filters_Duration.String()]).(*array.Float64)
a := resultRecord.Column(mapping[v1.Filters_Duration.String()]).(*array.Float64)
sum := math.Float64.Sum(a)
count := float64(a.Len())
var avg float64
Expand All @@ -88,28 +96,29 @@ func Aggregate(ctx context.Context, req *v1.Aggregate_Request) (*v1.Aggregate_Re
if visits != nil {
vis = *visits
} else {
a := r.Column(mapping[v1.Filters_Session.String()]).(*array.Int64)
a := resultRecord.Column(mapping[v1.Filters_Session.String()]).(*array.Int64)
vis = float64(math.Int64.Sum(a))
}
var vw float64
if view != nil {
vw = *view
} else {
a := r.Column(mapping[v1.Filters_Event.String()])
a := resultRecord.Column(mapping[v1.Filters_Event.String()])
vw = calcPageViews(a)
}
if vis != 0 {
vw /= vis
}
value = vw
case v1.Metric_events:
a := r.Column(mapping[v1.Filters_Event.String()])
a := resultRecord.Column(mapping[v1.Filters_Event.String()])
value = float64(a.Len())
}
result = append(result, &v1.Value{
Metric: metric,
Value: value,
})
}
return &v1.Aggregate_Response{Results: result}, nil
request.Write(ctx, w, &v1.Aggregate_Response{Results: result})
return
}
86 changes: 50 additions & 36 deletions stats/breakdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stats

import (
"context"
"net/http"
"slices"
"time"

Expand All @@ -13,10 +14,17 @@ import (
"github.com/vinceanalytics/vince/filters"
v1 "github.com/vinceanalytics/vince/gen/go/staples/v1"
"github.com/vinceanalytics/vince/logger"
"github.com/vinceanalytics/vince/request"
"github.com/vinceanalytics/vince/session"
)

func BreakDown(ctx context.Context, req *v1.BreakDown_Request) (*v1.BreakDown_Response, error) {
func BreakDown(w http.ResponseWriter, r *http.Request) {

var req v1.BreakDown_Request
if !request.Read(w, r, &req) {
return
}
ctx := r.Context()
period := req.Period
if period == nil {
period = &v1.TimePeriod{
Expand All @@ -36,15 +44,16 @@ func BreakDown(ctx context.Context, req *v1.BreakDown_Request) (*v1.BreakDown_Re
slices.Sort(req.Property)
metricsToProjection(filter, req.Metrics, req.Property...)
from, to := PeriodToRange(time.Now, period)
r, err := session.Get(ctx).Scan(ctx, from.UnixMilli(), to.UnixMilli(), filter)
scannedRecord, err := session.Get(ctx).Scan(ctx, from.UnixMilli(), to.UnixMilli(), filter)
if err != nil {
logger.Get(ctx).Error("Failed scanning", "err", err)
return nil, InternalError
request.Internal(ctx, w)
return
}
defer r.Release()
defer scannedRecord.Release()
mapping := map[string]arrow.Array{}
for i := 0; i < int(r.NumCols()); i++ {
mapping[r.ColumnName(i)] = r.Column(i)
for i := 0; i < int(scannedRecord.NumCols()); i++ {
mapping[scannedRecord.ColumnName(i)] = scannedRecord.Column(i)
}
defer clear(mapping)
// build key mapping
Expand All @@ -64,32 +73,32 @@ func BreakDown(ctx context.Context, req *v1.BreakDown_Request) (*v1.BreakDown_Re
var value float64
switch metric {
case v1.Metric_pageviews:
a, err := take(ctx, metric, v1.Filters_Event, mapping, idx)
if err != nil {
return nil, err
}
a := mapping[v1.Filters_Event.String()]
count := calcPageViews(a)
a.Release()
view = &count
value = count
case v1.Metric_visitors:
a, err := take(ctx, metric, v1.Filters_ID, mapping, idx)
if err != nil {
return nil, err
a, ok := take(ctx, metric, v1.Filters_ID, mapping, idx)
if !ok {
request.Internal(ctx, w)
return
}
u, err := compute.Unique(ctx, compute.NewDatumWithoutOwning(a))
if err != nil {
a.Release()
logger.Get(ctx).Error("Failed calculating visitors", "err", err)
return nil, InternalError
request.Internal(ctx, w)
return
}
a.Release()
value = float64(u.Len())
u.Release()
case v1.Metric_visits:
a, err := take(ctx, metric, v1.Filters_Session, mapping, idx)
if err != nil {
return nil, err
a, ok := take(ctx, metric, v1.Filters_Session, mapping, idx)
if !ok {
request.Internal(ctx, w)
return
}
sum := float64(math.Int64.Sum(a.(*array.Int64)))
a.Release()
Expand All @@ -100,16 +109,18 @@ func BreakDown(ctx context.Context, req *v1.BreakDown_Request) (*v1.BreakDown_Re
if visits != nil {
vis = *visits
} else {
a, err := take(ctx, metric, v1.Filters_Session, mapping, idx)
if err != nil {
return nil, err
a, ok := take(ctx, metric, v1.Filters_Session, mapping, idx)
if !ok {
request.Internal(ctx, w)
return
}
vis = float64(math.Int64.Sum(a.(*array.Int64)))
a.Release()
}
a, err := take(ctx, metric, v1.Filters_Bounce, mapping, idx)
if err != nil {
return nil, err
a, ok := take(ctx, metric, v1.Filters_Bounce, mapping, idx)
if !ok {
request.Internal(ctx, w)
return
}
sum := float64(math.Int64.Sum(a.(*array.Int64)))
a.Release()
Expand All @@ -118,9 +129,10 @@ func BreakDown(ctx context.Context, req *v1.BreakDown_Request) (*v1.BreakDown_Re
}
value = sum
case v1.Metric_visit_duration:
a, err := take(ctx, metric, v1.Filters_Duration, mapping, idx)
if err != nil {
return nil, err
a, ok := take(ctx, metric, v1.Filters_Duration, mapping, idx)
if !ok {
request.Internal(ctx, w)
return
}
sum := math.Float64.Sum(a.(*array.Float64))
a.Release()
Expand All @@ -135,9 +147,10 @@ func BreakDown(ctx context.Context, req *v1.BreakDown_Request) (*v1.BreakDown_Re
if visits != nil {
vis = *visits
} else {
a, err := take(ctx, metric, v1.Filters_Session, mapping, idx)
if err != nil {
return nil, err
a, ok := take(ctx, metric, v1.Filters_Session, mapping, idx)
if !ok {
request.Internal(ctx, w)
return
}
vis = float64(math.Int64.Sum(a.(*array.Int64)))
a.Release()
Expand All @@ -146,9 +159,10 @@ func BreakDown(ctx context.Context, req *v1.BreakDown_Request) (*v1.BreakDown_Re
if view != nil {
vw = *view
} else {
a, err := take(ctx, metric, v1.Filters_Event, mapping, idx)
if err != nil {
return nil, err
a, ok := take(ctx, metric, v1.Filters_Event, mapping, idx)
if !ok {
request.Internal(ctx, w)
return
}
vw = calcPageViews(a)
a.Release()
Expand Down Expand Up @@ -176,20 +190,20 @@ func BreakDown(ctx context.Context, req *v1.BreakDown_Request) (*v1.BreakDown_Re
Groups: groups,
})
}
return &v1.BreakDown_Response{Results: result}, nil
request.Write(ctx, w, &v1.BreakDown_Response{Results: result})
}

func take(ctx context.Context, metric v1.Metric, f v1.Filters_Projection, mapping map[string]arrow.Array, idx *array.Uint32) (arrow.Array, error) {
func take(ctx context.Context, metric v1.Metric, f v1.Filters_Projection, mapping map[string]arrow.Array, idx *array.Uint32) (arrow.Array, bool) {
a, err := compute.TakeArray(ctx,
mapping[f.String()], idx,
)
if err != nil {
idx.Release()
logger.Get(ctx).Error("Failed taking array values",
"err", err, "metric", metric, "projection", f)
return nil, InternalError
return nil, false
}
return a, nil
return a, true
}
func hashProp(a arrow.Array) map[string]*roaring.Bitmap {
o := make(map[string]*roaring.Bitmap)
Expand Down
Loading

0 comments on commit 106ef56

Please sign in to comment.