Skip to content

Commit

Permalink
use compute for http api
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 24, 2024
1 parent 45c89ed commit 5e2369b
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 226 deletions.
47 changes: 2 additions & 45 deletions internal/stats/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@ package stats

import (
"net/http"
"slices"
"time"

"github.com/apache/arrow/go/v15/arrow"
v1 "github.com/vinceanalytics/vince/gen/go/vince/v1"
"github.com/vinceanalytics/vince/internal/compute"
"github.com/vinceanalytics/vince/internal/logger"
"github.com/vinceanalytics/vince/internal/request"
"github.com/vinceanalytics/vince/internal/session"
"github.com/vinceanalytics/vince/internal/tenant"
)

func Aggregate(w http.ResponseWriter, r *http.Request) {
Expand All @@ -23,50 +19,11 @@ func Aggregate(w http.ResponseWriter, r *http.Request) {
Metrics: ParseMetrics(ctx, query),
Filters: ParseFilters(ctx, query),
}
if !request.Validate(ctx, w, &req) {
return
}
filters := &v1.Filters{
List: append(req.Filters, &v1.Filter{
Property: v1.Property_domain,
Op: v1.Filter_equal,
Value: req.SiteId,
}),
}
metrics := slices.Clone(req.Metrics)
slices.Sort(metrics)
compute.MetricsToProjection(filters, metrics)
from, to := PeriodToRange(ctx, time.Now, req.Period, r.URL.Query())
resultRecord, err := session.Get(ctx).Scan(ctx, tenant.Get(ctx), from.UnixMilli(), to.UnixMilli(), filters)
res, err := compute.Aggregate(ctx, session.Get(ctx), &req)
if err != nil {
logger.Get(ctx).Error("Failed scanning", "err", err)
request.Internal(ctx, w)
return
}
defer resultRecord.Release()
mapping := map[string]arrow.Array{}
for i := 0; i < int(resultRecord.NumCols()); i++ {
mapping[resultRecord.ColumnName(i)] = resultRecord.Column(i)
}
result := make(map[string]AggregateValue)
xc := &compute.Compute{Mapping: mapping}
for _, metric := range metrics {
value, err := xc.Metric(ctx, metric)
if err != nil {
logger.Get(ctx).Error("Failed calculating metric", "metric", metric)
request.Internal(ctx, w)
return
}
result[metric.String()] = AggregateValue{Value: value}
}
request.Write(ctx, w, &AggregateResponse{Results: result})
return
}

type AggregateResponse struct {
Results map[string]AggregateValue `json:"results"`
}

type AggregateValue struct {
Value float64 `json:"value"`
request.Write(ctx, w, res)
}
85 changes: 2 additions & 83 deletions internal/stats/breakdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,12 @@ package stats

import (
"net/http"
"slices"
"time"

"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/arrow/memory"
v1 "github.com/vinceanalytics/vince/gen/go/vince/v1"
"github.com/vinceanalytics/vince/internal/compute"
"github.com/vinceanalytics/vince/internal/logger"
"github.com/vinceanalytics/vince/internal/request"
"github.com/vinceanalytics/vince/internal/session"
"github.com/vinceanalytics/vince/internal/tenant"
)

func BreakDown(w http.ResponseWriter, r *http.Request) {
Expand All @@ -26,86 +20,11 @@ func BreakDown(w http.ResponseWriter, r *http.Request) {
Filters: ParseFilters(ctx, query),
Property: ParseProperty(ctx, query),
}
if !request.Validate(ctx, w, &req) {
return
}
period := req.Period
if period == nil {
period = &v1.TimePeriod{
Value: &v1.TimePeriod_Base_{
Base: v1.TimePeriod__30d,
},
}
}
filter := &v1.Filters{
List: append(req.Filters, &v1.Filter{
Property: v1.Property_domain,
Op: v1.Filter_equal,
Value: req.SiteId,
}),
}
slices.Sort(req.Metrics)
slices.Sort(req.Property)
selectedColumns := compute.MetricsToProjection(filter, req.Metrics, req.Property...)
from, to := PeriodToRange(ctx, time.Now, period, r.URL.Query())
scannedRecord, err := session.Get(ctx).Scan(ctx, tenant.Get(ctx), from.UnixMilli(), to.UnixMilli(), filter)
res, err := compute.Breakdown(ctx, session.Get(ctx), &req)
if err != nil {
logger.Get(ctx).Error("Failed scanning", "err", err)
request.Internal(ctx, w)
return
}
defer scannedRecord.Release()
mapping := map[string]arrow.Array{}
for i := 0; i < int(scannedRecord.NumCols()); i++ {
mapping[scannedRecord.ColumnName(i)] = scannedRecord.Column(i)
}
defer clear(mapping)
// build key mapping
b := array.NewUint32Builder(memory.DefaultAllocator)
defer b.Release()
// TODO: run this concurrently
xc := &compute.Compute{
Mapping: make(map[string]arrow.Array),
}
defer xc.Release()

result := make(map[string]map[string]map[string]float64)
for _, prop := range req.Property {
keys := make(map[string]map[string]float64)
for key, bitmap := range compute.HashProp(mapping[prop.String()]) {
b.AppendValues(bitmap.ToArray(), nil)
idx := b.NewUint32Array()

xc.Reset(nil)

for _, name := range selectedColumns {
a, err := compute.Take(ctx, mapping[name], idx)
if err != nil {
logger.Get(ctx).Error("Failed taking array for breaking down", "column", name, "err", err)
request.Internal(ctx, w)
return
}
xc.Mapping[name] = a
}

values := make(map[string]float64)
for _, metric := range req.Metrics {
value, err := xc.Metric(ctx, metric)
if err != nil {
logger.Get(ctx).Error("Failed computing metric", "metric", metric)
request.Internal(ctx, w)
return
}
values[metric.String()] = value
}
keys[key] = values
idx.Release()
}
result[prop.String()] = keys
}
request.Write(ctx, w, &BreakDownResponse{Results: result})
}

type BreakDownResponse struct {
Results map[string]map[string]map[string]float64 `json:"results"`
request.Write(ctx, w, res)
}
31 changes: 2 additions & 29 deletions internal/stats/current_visitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package stats

import (
"net/http"
"time"

v1 "github.com/vinceanalytics/vince/gen/go/vince/v1"
"github.com/vinceanalytics/vince/internal/compute"
"github.com/vinceanalytics/vince/internal/logger"
"github.com/vinceanalytics/vince/internal/request"
"github.com/vinceanalytics/vince/internal/session"
"github.com/vinceanalytics/vince/internal/tenant"
)

func Realtime(w http.ResponseWriter, r *http.Request) {
Expand All @@ -18,36 +16,11 @@ func Realtime(w http.ResponseWriter, r *http.Request) {
req := v1.Realtime_Request{
SiteId: query.Get("site_id"),
}
if !request.Validate(ctx, w, &req) {
return
}
now := time.Now().UTC()
firstTime := now.Add(-5 * time.Minute)
result, err := session.Get(ctx).Scan(ctx,
tenant.Get(ctx),
firstTime.UnixMilli(),
now.UnixMilli(),
&v1.Filters{
Projection: []v1.Filters_Projection{
v1.Filters_id,
},
List: []*v1.Filter{
{Property: v1.Property_domain, Op: v1.Filter_equal, Value: req.SiteId},
},
},
)
res, err := compute.Realtime(ctx, session.Get(ctx), &req)
if err != nil {
logger.Get(ctx).Error("Failed scanning", "err", err)
request.Internal(ctx, w)
return
}
defer result.Release()
m := compute.NewCompute(result)
visitors, err := m.Visitors(ctx)
if err != nil {
logger.Get(ctx).Error("Failed computing unique user id", "err", err)
request.Internal(ctx, w)
return
}
request.Write(ctx, w, uint64(visitors))
request.Write(ctx, w, res)
}
71 changes: 2 additions & 69 deletions internal/stats/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,12 @@ package stats

import (
"net/http"
"slices"
"time"

"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
v1 "github.com/vinceanalytics/vince/gen/go/vince/v1"
"github.com/vinceanalytics/vince/internal/columns"
"github.com/vinceanalytics/vince/internal/compute"
"github.com/vinceanalytics/vince/internal/logger"
"github.com/vinceanalytics/vince/internal/request"
"github.com/vinceanalytics/vince/internal/session"
"github.com/vinceanalytics/vince/internal/tenant"
"github.com/vinceanalytics/vince/internal/timeutil"
)

func TimeSeries(w http.ResponseWriter, r *http.Request) {
Expand All @@ -27,71 +20,11 @@ func TimeSeries(w http.ResponseWriter, r *http.Request) {
Interval: ParseInterval(ctx, query),
Filters: ParseFilters(ctx, query),
}
if !request.Validate(ctx, w, &req) {
return
}
// make sure we have valid interval
if !ValidByPeriod(req.Period, req.Interval) {
request.Error(ctx, w, http.StatusBadRequest, "Interval out of range")
return
}
filters := &v1.Filters{
List: append(req.Filters, &v1.Filter{
Property: v1.Property_domain,
Op: v1.Filter_equal,
Value: req.SiteId,
}),
}
metrics := slices.Clone(req.Metrics)
slices.Sort(metrics)
compute.MetricsToProjection(filters, metrics)
from, to := PeriodToRange(ctx, time.Now, req.Period, r.URL.Query())
scanRecord, err := session.Get(ctx).Scan(ctx, tenant.Get(ctx), from.UnixMilli(), to.UnixMilli(), filters)
res, err := compute.Timeseries(ctx, session.Get(ctx), &req)
if err != nil {
logger.Get(ctx).Error("Failed scanning", "err", err)
request.Internal(ctx, w)
return
}

mapping := map[string]int{}
for i := 0; i < int(scanRecord.NumCols()); i++ {
mapping[scanRecord.ColumnName(i)] = i
}
tsKey := mapping[columns.Timestamp]
ts := scanRecord.Column(tsKey).(*array.Int64).Int64Values()
var buckets []Bucket
xc := &compute.Compute{Mapping: make(map[string]arrow.Array)}
err = timeutil.TimeBuckets(req.Interval, ts, func(bucket int64, start, end int) error {
n := scanRecord.NewSlice(int64(start), int64(end))
defer n.Release()
buck := Bucket{
Timestamp: time.UnixMilli(bucket),
Values: make(map[string]float64),
}
xc.Reset(n)
for _, x := range metrics {
value, err := xc.Metric(ctx, x)
if err != nil {
return err
}
buck.Values[x.String()] = value
}
buckets = append(buckets, buck)
return nil
})
if err != nil {
logger.Get(ctx).Error("Failed processing buckets", "err", err)
request.Internal(ctx, w)
return
}
request.Write(ctx, w, &Series{Results: buckets})
}

type Bucket struct {
Timestamp time.Time `json:"timestamp"`
Values map[string]float64 `json:"values"`
}

type Series struct {
Results []Bucket `json:"results"`
request.Write(ctx, w, res)
}

0 comments on commit 5e2369b

Please sign in to comment.