Skip to content

Commit

Permalink
feat: push events data from processor to unique users data collector
Browse files Browse the repository at this point in the history
  • Loading branch information
mihir20 committed Jul 2, 2024
1 parent 41d934f commit 5c7583e
Show file tree
Hide file tree
Showing 15 changed files with 489 additions and 24 deletions.
5 changes: 5 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"runtime/pprof"
"strings"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"

Expand Down Expand Up @@ -92,6 +94,9 @@ func (a *app) initFeatures() {
EnterpriseToken: a.options.EnterpriseToken,
Log: enterpriseLogger.Child("config-env"),
},
TrackedUsers: &trackedusers.Factory{
Log: enterpriseLogger.Child("tracked-users"),
},
}
}

Expand Down
13 changes: 13 additions & 0 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"
"time"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"github.com/rudderlabs/rudder-schemas/go/stream"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -53,10 +55,12 @@ type embeddedApp struct {
routerDSLimit config.ValueLoader[int]
batchRouterDSLimit config.ValueLoader[int]
gatewayDSLimit config.ValueLoader[int]
enableTrackedUsers bool
}
}

func (a *embeddedApp) Setup() error {
a.config.enableTrackedUsers = config.GetBool("TrackedUsers.enabled", false)
a.config.enableReplay = config.GetBoolVar(types.DefaultReplayEnabled, "Replay.enabled")
a.config.processorDSLimit = config.GetReloadableIntVar(0, 1, "Processor.jobsDB.dsLimit", "JobsDB.dsLimit")
a.config.gatewayDSLimit = config.GetReloadableIntVar(0, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
Expand Down Expand Up @@ -236,6 +240,14 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
}
}()

var trackedUsersCollector trackedusers.DataCollector
if a.config.enableTrackedUsers {
trackedUsersCollector, err = a.app.Features().TrackedUsers.Setup(misc.GetConnectionString(config, "tracked_users"))
if err != nil {
return fmt.Errorf("could not setup tracked users: %w", err)
}
}

proc := processor.New(
ctx,
&options.ClearDB,
Expand All @@ -254,6 +266,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
destinationHandle,
transformationhandle,
enrichers,
trackedUsersCollector,
processor.WithAdaptiveLimit(adaptiveLimit),
)
throttlerFactory, err := rtThrottler.NewFactory(config, stats.Default)
Expand Down
13 changes: 13 additions & 0 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strconv"
"time"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"github.com/go-chi/chi/v5"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -48,6 +50,7 @@ type processorApp struct {
versionHandler func(w http.ResponseWriter, r *http.Request)
log logger.Logger
config struct {
enableTrackedUsers bool
processorDSLimit config.ValueLoader[int]
routerDSLimit config.ValueLoader[int]
batchRouterDSLimit config.ValueLoader[int]
Expand All @@ -64,6 +67,7 @@ type processorApp struct {
}

func (a *processorApp) Setup() error {
a.config.enableTrackedUsers = config.GetBool("TrackedUsers.enabled", false)
a.config.http.ReadTimeout = config.GetDurationVar(0, time.Second, []string{"ReadTimeout", "ReadTimeOutInSec"}...)
a.config.http.ReadHeaderTimeout = config.GetDurationVar(0, time.Second, []string{"ReadHeaderTimeout", "ReadHeaderTimeoutInSec"}...)
a.config.http.WriteTimeout = config.GetDurationVar(10, time.Second, []string{"WriteTimeout", "WriteTimeOutInSec"}...)
Expand Down Expand Up @@ -238,6 +242,14 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
return drainConfigManager.CleanupRoutine(ctx)
}))

var trackedUsersCollector trackedusers.DataCollector
if a.config.enableTrackedUsers {
trackedUsersCollector, err = a.app.Features().TrackedUsers.Setup(misc.GetConnectionString(config, "tracked_users"))
if err != nil {
return fmt.Errorf("could not setup tracked users: %w", err)
}
}

p := proc.New(
ctx,
&options.ClearDB,
Expand All @@ -256,6 +268,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
destinationHandle,
transformationhandle,
enrichers,
trackedUsersCollector,
proc.WithAdaptiveLimit(adaptiveLimit),
)
throttlerFactory, err := throttler.NewFactory(config, stats.Default)
Expand Down
3 changes: 3 additions & 0 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/ory/dockertest/v3"
Expand Down Expand Up @@ -215,6 +217,7 @@ func TestDynamicClusterManager(t *testing.T) {
destinationdebugger.NewNoOpService(),
transformationdebugger.NewNoOpService(),
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
processor.BackendConfig = mockBackendConfig
processor.Transformer = mockTransformer
Expand Down
7 changes: 7 additions & 0 deletions app/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package app
import (
"context"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"github.com/rudderlabs/rudder-go-kit/config"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/replay"
Expand Down Expand Up @@ -53,4 +55,9 @@ type Features struct {
ConfigEnv ConfigEnvFeature
Reporting ReportingFeature
Replay ReplayFeature
TrackedUsers TrackedUsersFeature
}

type TrackedUsersFeature interface {
Setup(dbConn string) (trackedusers.DataCollector, error)
}
11 changes: 11 additions & 0 deletions enterprise/trackedusers/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package trackedusers

import "github.com/rudderlabs/rudder-go-kit/logger"

type Factory struct {
Log logger.Logger
}

func (f *Factory) Setup(dbConn string) (DataCollector, error) {

Check failure on line 9 in enterprise/trackedusers/factory.go

View workflow job for this annotation

GitHub Actions / Warehouse Integration (mssql)

undefined: DataCollector
return NewUniqueUsersCollector(f.Log, dbConn)

Check failure on line 10 in enterprise/trackedusers/factory.go

View workflow job for this annotation

GitHub Actions / Warehouse Integration (mssql)

undefined: NewUniqueUsersCollector
}
51 changes: 51 additions & 0 deletions enterprise/trackedusers/mocks/mock_data_collector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions enterprise/trackedusers/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package trackedusers

import (
"context"

txn "github.com/rudderlabs/rudder-server/utils/tx"

"github.com/rudderlabs/rudder-server/jobsdb"
)

type NoopDataCollector struct {
}

func NewNoopDataCollector() *NoopDataCollector {
return &NoopDataCollector{}
}

func (n *NoopDataCollector) CollectData(context.Context, []*jobsdb.JobT, *txn.Tx) error {
return nil
}
6 changes: 6 additions & 0 deletions processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -43,6 +45,7 @@ type LifecycleManager struct {
destDebugger destinationdebugger.DestinationDebugger
transDebugger transformationdebugger.TransformationDebugger
enrichers []enricher.PipelineEnricher
trackedUsersDataCollector trackedusers.DataCollector
}

// Start starts a processor, this is not a blocking call.
Expand Down Expand Up @@ -70,6 +73,7 @@ func (proc *LifecycleManager) Start() error {
proc.destDebugger,
proc.transDebugger,
proc.enrichers,
proc.trackedUsersDataCollector,
)

currentCtx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -110,6 +114,7 @@ func New(
destDebugger destinationdebugger.DestinationDebugger,
transDebugger transformationdebugger.TransformationDebugger,
enrichers []enricher.PipelineEnricher,
trackedUsersDataCollector trackedusers.DataCollector,
opts ...Opts,
) *LifecycleManager {
proc := &LifecycleManager{
Expand Down Expand Up @@ -139,6 +144,7 @@ func New(
destDebugger: destDebugger,
transDebugger: transDebugger,
enrichers: enrichers,
trackedUsersDataCollector: trackedUsersDataCollector,
}
for _, opt := range opts {
opt(proc)
Expand Down
3 changes: 3 additions & 0 deletions processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -234,6 +236,7 @@ func TestProcessorManager(t *testing.T) {
destinationdebugger.NewNoOpService(),
transformationdebugger.NewNoOpService(),
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
WithStats(statsStore),
func(m *LifecycleManager) {
m.Handle.config.enablePipelining = false
Expand Down
24 changes: 22 additions & 2 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"time"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/stringify"
Expand Down Expand Up @@ -157,7 +159,9 @@ type Handle struct {
adaptiveLimit func(int64) int64
storePlocker kitsync.PartitionLocker

sourceObservers []sourceObserver
sourceObservers []sourceObserver
trackedUsersDataCollector trackedusers.DataCollector
trackedUsersCollectionEnabled bool
}
type processorStats struct {
statGatewayDBR func(partition string) stats.Measurement
Expand Down Expand Up @@ -368,11 +372,13 @@ func (proc *Handle) Setup(
destDebugger destinationdebugger.DestinationDebugger,
transDebugger transformationdebugger.TransformationDebugger,
enrichers []enricher.PipelineEnricher,
trackedUsersDataCollector trackedusers.DataCollector,
) {
proc.reporting = reporting
proc.destDebugger = destDebugger
proc.transDebugger = transDebugger
proc.reportingEnabled = config.GetBoolVar(types.DefaultReportingEnabled, "Reporting.enabled")
proc.trackedUsersCollectionEnabled = config.GetBool("TrackedUsers.enabled", false)
if proc.conf == nil {
proc.conf = config.Default
}
Expand Down Expand Up @@ -403,6 +409,8 @@ func (proc *Handle) Setup(
proc.namespace = config.GetKubeNamespace()
proc.instanceID = misc.GetInstanceID()

proc.trackedUsersDataCollector = trackedUsersDataCollector

// Stats
if proc.statsFactory == nil {
proc.statsFactory = stats.Default
Expand Down Expand Up @@ -2212,7 +2220,7 @@ func (proc *Handle) sendQueryRetryStats(attempt int) {
stats.Default.NewTaggedStat("jobsdb_query_timeout", stats.CountType, stats.Tags{"attempt": fmt.Sprint(attempt), "module": "processor"}).Count(1)
}

func (proc *Handle) Store(partition string, in *storeMessage) {
func (proc *Handle) Store(partition string, in *storeMessage, rawJobs []*jobsdb.JobT) {
spans := make([]stats.TraceSpan, 0, len(in.traces))
defer func() {
for _, span := range spans {
Expand Down Expand Up @@ -2360,6 +2368,13 @@ func (proc *Handle) Store(partition string, in *storeMessage) {
}
}

if proc.isTrackedUsersCollectionEnabled() {
err = proc.trackedUsersDataCollector.CollectData(ctx, rawJobs, tx.Tx())
if err != nil {
return fmt.Errorf("storing tracked users: %w", err)
}
}

err = in.rsourcesStats.Publish(ctx, tx.SqlTx())
if err != nil {
return fmt.Errorf("publishing rsources stats: %w", err)
Expand Down Expand Up @@ -3068,6 +3083,7 @@ func (proc *Handle) handlePendingGatewayJobs(partition string) bool {
rsourcesStats: rsourcesStats,
}),
),
unprocessedList.Jobs,
)
proc.stats.statLoopTime(partition).Since(s)

Expand Down Expand Up @@ -3119,6 +3135,10 @@ func (proc *Handle) isReportingEnabled() bool {
return proc.reporting != nil && proc.reportingEnabled
}

func (proc *Handle) isTrackedUsersCollectionEnabled() bool {
return proc.trackedUsersDataCollector != nil && proc.trackedUsersCollectionEnabled
}

func (proc *Handle) updateRudderSourcesStats(ctx context.Context, tx jobsdb.StoreSafeTx, jobs []*jobsdb.JobT) error {
rsourcesStats := rsources.NewStatsCollector(proc.rsourcesService)
rsourcesStats.JobsStored(jobs)
Expand Down
Loading

0 comments on commit 5c7583e

Please sign in to comment.