diff --git a/app/app.go b/app/app.go index 0b02c77a9e6..c41fc7b902c 100644 --- a/app/app.go +++ b/app/app.go @@ -10,6 +10,8 @@ import ( "runtime/pprof" "strings" + "github.com/rudderlabs/rudder-server/enterprise/trackedusers" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" @@ -92,6 +94,9 @@ func (a *app) initFeatures() { EnterpriseToken: a.options.EnterpriseToken, Log: enterpriseLogger.Child("config-env"), }, + TrackedUsers: &trackedusers.Factory{ + Log: enterpriseLogger.Child("tracked-users"), + }, } } diff --git a/app/apphandlers/embeddedAppHandler.go b/app/apphandlers/embeddedAppHandler.go index b77e3814631..86899978223 100644 --- a/app/apphandlers/embeddedAppHandler.go +++ b/app/apphandlers/embeddedAppHandler.go @@ -6,6 +6,8 @@ import ( "net/http" "time" + "github.com/rudderlabs/rudder-server/enterprise/trackedusers" + "github.com/rudderlabs/rudder-schemas/go/stream" "golang.org/x/sync/errgroup" @@ -53,10 +55,12 @@ type embeddedApp struct { routerDSLimit config.ValueLoader[int] batchRouterDSLimit config.ValueLoader[int] gatewayDSLimit config.ValueLoader[int] + enableTrackedUsers bool } } func (a *embeddedApp) Setup() error { + a.config.enableTrackedUsers = config.GetBool("TrackedUsers.enabled", false) a.config.enableReplay = config.GetBoolVar(types.DefaultReplayEnabled, "Replay.enabled") a.config.processorDSLimit = config.GetReloadableIntVar(0, 1, "Processor.jobsDB.dsLimit", "JobsDB.dsLimit") a.config.gatewayDSLimit = config.GetReloadableIntVar(0, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit") @@ -236,6 +240,14 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) } }() + var trackedUsersCollector trackedusers.DataCollector + if a.config.enableTrackedUsers { + trackedUsersCollector, err = a.app.Features().TrackedUsers.Setup(misc.GetConnectionString(config, "tracked_users")) + if err != nil { + return fmt.Errorf("could not setup tracked users: %w", err) + } + } + proc := processor.New( ctx, &options.ClearDB, @@ -254,6 +266,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) destinationHandle, transformationhandle, enrichers, + trackedUsersCollector, processor.WithAdaptiveLimit(adaptiveLimit), ) throttlerFactory, err := rtThrottler.NewFactory(config, stats.Default) diff --git a/app/apphandlers/processorAppHandler.go b/app/apphandlers/processorAppHandler.go index 91056d86312..2d8dd8477e5 100644 --- a/app/apphandlers/processorAppHandler.go +++ b/app/apphandlers/processorAppHandler.go @@ -7,6 +7,8 @@ import ( "strconv" "time" + "github.com/rudderlabs/rudder-server/enterprise/trackedusers" + "github.com/go-chi/chi/v5" "golang.org/x/sync/errgroup" @@ -48,6 +50,7 @@ type processorApp struct { versionHandler func(w http.ResponseWriter, r *http.Request) log logger.Logger config struct { + enableTrackedUsers bool processorDSLimit config.ValueLoader[int] routerDSLimit config.ValueLoader[int] batchRouterDSLimit config.ValueLoader[int] @@ -64,6 +67,7 @@ type processorApp struct { } func (a *processorApp) Setup() error { + a.config.enableTrackedUsers = config.GetBool("TrackedUsers.enabled", false) a.config.http.ReadTimeout = config.GetDurationVar(0, time.Second, []string{"ReadTimeout", "ReadTimeOutInSec"}...) a.config.http.ReadHeaderTimeout = config.GetDurationVar(0, time.Second, []string{"ReadHeaderTimeout", "ReadHeaderTimeoutInSec"}...) a.config.http.WriteTimeout = config.GetDurationVar(10, time.Second, []string{"WriteTimeout", "WriteTimeOutInSec"}...) @@ -238,6 +242,14 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options return drainConfigManager.CleanupRoutine(ctx) })) + var trackedUsersCollector trackedusers.DataCollector + if a.config.enableTrackedUsers { + trackedUsersCollector, err = a.app.Features().TrackedUsers.Setup(misc.GetConnectionString(config, "tracked_users")) + if err != nil { + return fmt.Errorf("could not setup tracked users: %w", err) + } + } + p := proc.New( ctx, &options.ClearDB, @@ -256,6 +268,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options destinationHandle, transformationhandle, enrichers, + trackedUsersCollector, proc.WithAdaptiveLimit(adaptiveLimit), ) throttlerFactory, err := throttler.NewFactory(config, stats.Default) diff --git a/app/cluster/integration_test.go b/app/cluster/integration_test.go index c3f668bd260..f6f4191d5b6 100644 --- a/app/cluster/integration_test.go +++ b/app/cluster/integration_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "github.com/rudderlabs/rudder-server/enterprise/trackedusers" + "github.com/golang/mock/gomock" "github.com/google/uuid" "github.com/ory/dockertest/v3" @@ -215,6 +217,7 @@ func TestDynamicClusterManager(t *testing.T) { destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, + trackedusers.NewNoopDataCollector(), ) processor.BackendConfig = mockBackendConfig processor.Transformer = mockTransformer diff --git a/app/features.go b/app/features.go index 839407c8823..4fe06d61e7e 100644 --- a/app/features.go +++ b/app/features.go @@ -5,6 +5,8 @@ package app import ( "context" + "github.com/rudderlabs/rudder-server/enterprise/trackedusers" + "github.com/rudderlabs/rudder-go-kit/config" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/enterprise/replay" @@ -53,4 +55,9 @@ type Features struct { ConfigEnv ConfigEnvFeature Reporting ReportingFeature Replay ReplayFeature + TrackedUsers TrackedUsersFeature +} + +type TrackedUsersFeature interface { + Setup(dbConn string) (trackedusers.DataCollector, error) } diff --git a/enterprise/trackedusers/factory.go b/enterprise/trackedusers/factory.go new file mode 100644 index 00000000000..811047d5528 --- /dev/null +++ b/enterprise/trackedusers/factory.go @@ -0,0 +1,11 @@ +package trackedusers + +import "github.com/rudderlabs/rudder-go-kit/logger" + +type Factory struct { + Log logger.Logger +} + +func (f *Factory) Setup(dbConn string) (DataCollector, error) { + return NewUniqueUsersCollector(f.Log, dbConn) +} diff --git a/enterprise/trackedusers/mocks/mock_data_collector.go b/enterprise/trackedusers/mocks/mock_data_collector.go new file mode 100644 index 00000000000..f337fecf31c --- /dev/null +++ b/enterprise/trackedusers/mocks/mock_data_collector.go @@ -0,0 +1,51 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/rudderlabs/rudder-server/enterprise/trackedusers (interfaces: DataCollector) + +// Package mockdatacollector is a generated GoMock package. +package mockdatacollector + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + jobsdb "github.com/rudderlabs/rudder-server/jobsdb" + tx "github.com/rudderlabs/rudder-server/utils/tx" +) + +// MockDataCollector is a mock of DataCollector interface. +type MockDataCollector struct { + ctrl *gomock.Controller + recorder *MockDataCollectorMockRecorder +} + +// MockDataCollectorMockRecorder is the mock recorder for MockDataCollector. +type MockDataCollectorMockRecorder struct { + mock *MockDataCollector +} + +// NewMockDataCollector creates a new mock instance. +func NewMockDataCollector(ctrl *gomock.Controller) *MockDataCollector { + mock := &MockDataCollector{ctrl: ctrl} + mock.recorder = &MockDataCollectorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDataCollector) EXPECT() *MockDataCollectorMockRecorder { + return m.recorder +} + +// CollectData mocks base method. +func (m *MockDataCollector) CollectData(arg0 context.Context, arg1 []*jobsdb.JobT, arg2 *tx.Tx) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CollectData", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// CollectData indicates an expected call of CollectData. +func (mr *MockDataCollectorMockRecorder) CollectData(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CollectData", reflect.TypeOf((*MockDataCollector)(nil).CollectData), arg0, arg1, arg2) +} diff --git a/enterprise/trackedusers/noop.go b/enterprise/trackedusers/noop.go new file mode 100644 index 00000000000..981773f2abf --- /dev/null +++ b/enterprise/trackedusers/noop.go @@ -0,0 +1,20 @@ +package trackedusers + +import ( + "context" + + txn "github.com/rudderlabs/rudder-server/utils/tx" + + "github.com/rudderlabs/rudder-server/jobsdb" +) + +type NoopDataCollector struct { +} + +func NewNoopDataCollector() *NoopDataCollector { + return &NoopDataCollector{} +} + +func (n *NoopDataCollector) CollectData(context.Context, []*jobsdb.JobT, *txn.Tx) error { + return nil +} diff --git a/processor/manager.go b/processor/manager.go index 9a7e8c63278..8d17110f51d 100644 --- a/processor/manager.go +++ b/processor/manager.go @@ -4,6 +4,8 @@ import ( "context" "sync" + "github.com/rudderlabs/rudder-server/enterprise/trackedusers" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" @@ -43,6 +45,7 @@ type LifecycleManager struct { destDebugger destinationdebugger.DestinationDebugger transDebugger transformationdebugger.TransformationDebugger enrichers []enricher.PipelineEnricher + trackedUsersDataCollector trackedusers.DataCollector } // Start starts a processor, this is not a blocking call. @@ -70,6 +73,7 @@ func (proc *LifecycleManager) Start() error { proc.destDebugger, proc.transDebugger, proc.enrichers, + proc.trackedUsersDataCollector, ) currentCtx, cancel := context.WithCancel(context.Background()) @@ -110,6 +114,7 @@ func New( destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger, enrichers []enricher.PipelineEnricher, + trackedUsersDataCollector trackedusers.DataCollector, opts ...Opts, ) *LifecycleManager { proc := &LifecycleManager{ @@ -139,6 +144,7 @@ func New( destDebugger: destDebugger, transDebugger: transDebugger, enrichers: enrichers, + trackedUsersDataCollector: trackedUsersDataCollector, } for _, opt := range opts { opt(proc) diff --git a/processor/manager_test.go b/processor/manager_test.go index d6ec9f4a18d..ae8e9a0ee12 100644 --- a/processor/manager_test.go +++ b/processor/manager_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "github.com/rudderlabs/rudder-server/enterprise/trackedusers" + "github.com/golang/mock/gomock" "github.com/google/uuid" . "github.com/onsi/gomega" @@ -234,6 +236,7 @@ func TestProcessorManager(t *testing.T) { destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, + trackedusers.NewNoopDataCollector(), WithStats(statsStore), func(m *LifecycleManager) { m.Handle.config.enablePipelining = false diff --git a/processor/processor.go b/processor/processor.go index 5e8f6dc36c1..1d3fd41f59f 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/rudderlabs/rudder-server/enterprise/trackedusers" + "golang.org/x/sync/errgroup" "github.com/rudderlabs/rudder-go-kit/stringify" @@ -157,7 +159,9 @@ type Handle struct { adaptiveLimit func(int64) int64 storePlocker kitsync.PartitionLocker - sourceObservers []sourceObserver + sourceObservers []sourceObserver + trackedUsersDataCollector trackedusers.DataCollector + trackedUsersCollectionEnabled bool } type processorStats struct { statGatewayDBR func(partition string) stats.Measurement @@ -368,11 +372,13 @@ func (proc *Handle) Setup( destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger, enrichers []enricher.PipelineEnricher, + trackedUsersDataCollector trackedusers.DataCollector, ) { proc.reporting = reporting proc.destDebugger = destDebugger proc.transDebugger = transDebugger proc.reportingEnabled = config.GetBoolVar(types.DefaultReportingEnabled, "Reporting.enabled") + proc.trackedUsersCollectionEnabled = config.GetBool("TrackedUsers.enabled", false) if proc.conf == nil { proc.conf = config.Default } @@ -403,6 +409,8 @@ func (proc *Handle) Setup( proc.namespace = config.GetKubeNamespace() proc.instanceID = misc.GetInstanceID() + proc.trackedUsersDataCollector = trackedUsersDataCollector + // Stats if proc.statsFactory == nil { proc.statsFactory = stats.Default @@ -2212,7 +2220,7 @@ func (proc *Handle) sendQueryRetryStats(attempt int) { stats.Default.NewTaggedStat("jobsdb_query_timeout", stats.CountType, stats.Tags{"attempt": fmt.Sprint(attempt), "module": "processor"}).Count(1) } -func (proc *Handle) Store(partition string, in *storeMessage) { +func (proc *Handle) Store(partition string, in *storeMessage, rawJobs []*jobsdb.JobT) { spans := make([]stats.TraceSpan, 0, len(in.traces)) defer func() { for _, span := range spans { @@ -2360,6 +2368,13 @@ func (proc *Handle) Store(partition string, in *storeMessage) { } } + if proc.isTrackedUsersCollectionEnabled() { + err = proc.trackedUsersDataCollector.CollectData(ctx, rawJobs, tx.Tx()) + if err != nil { + return fmt.Errorf("storing tracked users: %w", err) + } + } + err = in.rsourcesStats.Publish(ctx, tx.SqlTx()) if err != nil { return fmt.Errorf("publishing rsources stats: %w", err) @@ -3068,6 +3083,7 @@ func (proc *Handle) handlePendingGatewayJobs(partition string) bool { rsourcesStats: rsourcesStats, }), ), + unprocessedList.Jobs, ) proc.stats.statLoopTime(partition).Since(s) @@ -3119,6 +3135,10 @@ func (proc *Handle) isReportingEnabled() bool { return proc.reporting != nil && proc.reportingEnabled } +func (proc *Handle) isTrackedUsersCollectionEnabled() bool { + return proc.trackedUsersDataCollector != nil && proc.trackedUsersCollectionEnabled +} + func (proc *Handle) updateRudderSourcesStats(ctx context.Context, tx jobsdb.StoreSafeTx, jobs []*jobsdb.JobT) error { rsourcesStats := rsources.NewStatsCollector(proc.rsourcesService) rsourcesStats.JobsStored(jobs) diff --git a/processor/processor_test.go b/processor/processor_test.go index bc04cfc8b02..a3fc0483209 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -14,6 +14,10 @@ import ( "testing" "time" + mockdatacollector "github.com/rudderlabs/rudder-server/enterprise/trackedusers/mocks" + + "github.com/rudderlabs/rudder-server/enterprise/trackedusers" + "github.com/golang/mock/gomock" "github.com/google/uuid" . "github.com/onsi/ginkgo/v2" @@ -67,19 +71,20 @@ func (m *mockObserver) ObserveSourceEvents(source *backendconfig.SourceT, events } type testContext struct { - mockCtrl *gomock.Controller - mockBackendConfig *mocksBackendConfig.MockBackendConfig - mockGatewayJobsDB *mocksJobsDB.MockJobsDB - mockRouterJobsDB *mocksJobsDB.MockJobsDB - mockBatchRouterJobsDB *mocksJobsDB.MockJobsDB - mockReadProcErrorsDB *mocksJobsDB.MockJobsDB - mockWriteProcErrorsDB *mocksJobsDB.MockJobsDB - mockEventSchemasDB *mocksJobsDB.MockJobsDB - mockArchivalDB *mocksJobsDB.MockJobsDB - MockReportingI *mockReportingTypes.MockReporting - MockDedup *mockDedup.MockDedup - MockObserver *mockObserver - MockRsourcesService *rsources.MockJobService + mockCtrl *gomock.Controller + mockBackendConfig *mocksBackendConfig.MockBackendConfig + mockGatewayJobsDB *mocksJobsDB.MockJobsDB + mockRouterJobsDB *mocksJobsDB.MockJobsDB + mockBatchRouterJobsDB *mocksJobsDB.MockJobsDB + mockReadProcErrorsDB *mocksJobsDB.MockJobsDB + mockWriteProcErrorsDB *mocksJobsDB.MockJobsDB + mockEventSchemasDB *mocksJobsDB.MockJobsDB + mockArchivalDB *mocksJobsDB.MockJobsDB + MockReportingI *mockReportingTypes.MockReporting + MockDedup *mockDedup.MockDedup + MockObserver *mockObserver + MockRsourcesService *rsources.MockJobService + mockTrackedUsersCollector *mockdatacollector.MockDataCollector } func (c *testContext) Setup() { @@ -109,6 +114,7 @@ func (c *testContext) Setup() { c.MockReportingI = mockReportingTypes.NewMockReporting(c.mockCtrl) c.MockDedup = mockDedup.NewMockDedup(c.mockCtrl) c.MockObserver = &mockObserver{} + c.mockTrackedUsersCollector = mockdatacollector.NewMockDataCollector(c.mockCtrl) } func (c *testContext) Finish() { @@ -1419,6 +1425,309 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { }) }) +var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func() { + initProcessor() + + var c *testContext + transformerServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"routerTransform": {}}`)) + w.WriteHeader(http.StatusOK) + })) + + prepareHandle := func(proc *Handle) *Handle { + proc.config.transformerURL = transformerServer.URL + isolationStrategy, err := isolation.GetStrategy(isolation.ModeNone) + Expect(err).To(BeNil()) + proc.isolationStrategy = isolationStrategy + return proc + } + BeforeEach(func() { + c = &testContext{} + c.Setup() + }) + + AfterEach(func() { + c.Finish() + }) + + AfterAll(func() { + transformerServer.Close() + }) + + Context("trackedUsers", func() { + BeforeEach(func() { + // crash recovery check + c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) + }) + + It("should track Users from unprocessed jobs ", func() { + messages := map[string]mockEventData{ + // this message should be delivered only to destination A + "message-1": { + id: "1", + jobid: 1010, + originalTimestamp: "2000-01-02T01:23:45", + expectedOriginalTimestamp: "2000-01-02T01:23:45.000Z", + sentAt: "2000-01-02 01:23", + expectedSentAt: "2000-01-02T01:23:00.000Z", + expectedReceivedAt: "2001-01-02T02:23:45.000Z", + integrations: map[string]bool{"All": false, "enabled-destination-a-definition-display-name": true}, + params: map[string]string{"source_id": "enabled-source-no-ut"}, + }, + // this message should not be delivered to destination A + "message-2": { + id: "2", + jobid: 1010, + originalTimestamp: "2000-02-02T01:23:45", + expectedOriginalTimestamp: "2000-02-02T01:23:45.000Z", + expectedReceivedAt: "2001-01-02T02:23:45.000Z", + integrations: map[string]bool{"All": true, "enabled-destination-a-definition-display-name": false}, + params: map[string]string{"source_id": "enabled-source-no-ut"}, + }, + // this message should be delivered to all destinations + "message-3": { + id: "3", + jobid: 2010, + originalTimestamp: "malformed timestamp", + sentAt: "2000-03-02T01:23:15", + expectedSentAt: "2000-03-02T01:23:15.000Z", + expectedReceivedAt: "2002-01-02T02:23:45.000Z", + integrations: map[string]bool{"All": true}, + params: map[string]string{"source_id": "enabled-source-no-ut", "source_job_run_id": "job_run_id_1", "source_task_run_id": "task_run_id_1"}, + }, + // this message should be delivered to all destinations (default All value) + "message-4": { + id: "4", + jobid: 2010, + originalTimestamp: "2000-04-02T02:23:15.000Z", // missing sentAt + expectedOriginalTimestamp: "2000-04-02T02:23:15.000Z", + expectedReceivedAt: "2002-01-02T02:23:45.000Z", + integrations: map[string]bool{}, + params: map[string]string{"source_id": "enabled-source-no-ut", "source_job_run_id": "job_run_id_1", "source_task_run_id": "task_run_id_1"}, + }, + // this message should not be delivered to any destination + "message-5": { + id: "5", + jobid: 2010, + expectedReceivedAt: "2002-01-02T02:23:45.000Z", + integrations: map[string]bool{"All": false}, + params: map[string]string{"source_id": "enabled-source-no-ut", "source_job_run_id": "job_run_id_1", "source_task_run_id": "task_run_id_1"}, + }, + } + + unprocessedJobsList := []*jobsdb.JobT{ + { + UUID: uuid.New(), + JobID: 1002, + CreatedAt: time.Date(2020, 0o4, 28, 23, 27, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 27, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: nil, + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabled), + }, + { + UUID: uuid.New(), + JobID: 1010, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2001-01-02T02:23:45.000Z", + []mockEventData{ + messages["message-1"], + messages["message-2"], + }, createMessagePayloadWithoutSources, + ), + EventCount: 2, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabledNoUT), + }, + { + UUID: uuid.New(), + JobID: 2002, + CreatedAt: time.Date(2020, 0o4, 28, 13, 27, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 27, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: nil, + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabled), + }, + { + UUID: uuid.New(), + JobID: 2003, + CreatedAt: time.Date(2020, 0o4, 28, 13, 28, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 28, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: nil, + EventCount: 1, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParameters(SourceIDEnabled), + }, + { + UUID: uuid.New(), + JobID: 2010, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: createBatchPayload( + WriteKeyEnabledNoUT, + "2002-01-02T02:23:45.000Z", + []mockEventData{ + messages["message-3"], + messages["message-4"], + messages["message-5"], + }, + createMessagePayloadWithoutSources, + ), + EventCount: 3, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + }, + } + mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) + + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) + processor.trackedUsersCollectionEnabled = true + processor.trackedUsersDataCollector = c.mockTrackedUsersCollector + + callUnprocessed := c.mockGatewayJobsDB.EXPECT().GetUnprocessed( + gomock.Any(), + jobsdb.GetQueryParams{ + CustomValFilters: gatewayCustomVal, + JobsLimit: processor.config.maxEventsToProcess.Load(), + EventsLimit: processor.config.maxEventsToProcess.Load(), + PayloadSizeLimit: processor.payloadLimit.Load(), + }).Return(jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil).Times(1) + + transformExpectations := map[string]transformExpectation{ + DestinationIDEnabledA: { + events: 3, + messageIds: "message-1,message-3,message-4", + receiveMetadata: true, + destinationDefinitionName: "enabled-destination-a-definition-name", + }, + } + + // We expect one transform call to destination A, after callUnprocessed. + mockTransformer.EXPECT().Transform( + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Times(1).After(callUnprocessed). + DoAndReturn(assertDestinationTransform( + messages, + SourceIDEnabledNoUT, + DestinationIDEnabledA, + transformExpectations[DestinationIDEnabledA], + )) + + assertStoreJob := func(job *jobsdb.JobT, i int, destination string) { + Expect(job.UUID.String()).To(testutils.BeValidUUID()) + Expect(job.JobID).To(Equal(int64(0))) + Expect(job.CreatedAt).To(BeTemporally("~", time.Now(), 200*time.Millisecond)) + Expect(job.ExpireAt).To(BeTemporally("~", time.Now(), 200*time.Millisecond)) + Expect(string(job.EventPayload)).To(Equal(fmt.Sprintf(`{"int-value":%d,"string-value":%q}`, i, destination))) + Expect(len(job.LastJobStatus.JobState)).To(Equal(0)) + require.JSONEq(GinkgoT(), fmt.Sprintf(`{ + "source_id":"source-from-transformer", + "source_name": "%s", + "destination_id":"destination-from-transformer", + "received_at":"", + "transform_at":"processor", + "message_id":"", + "gateway_job_id":0, + "source_task_run_id":"", + "source_job_id":"", + "source_job_run_id":"", + "event_name":"", + "event_type":"", + "source_definition_id":"", + "destination_definition_id":"", + "source_category":"", + "record_id":null, + "workspaceId":"", + "traceparent":"" + }`, sourceIDToName[SourceIDEnabledNoUT]), string(job.Parameters)) + } + // One Store call is expected for all events + c.mockRouterJobsDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { + _ = f(jobsdb.EmptyStoreSafeTx()) + }).Return(nil) + + callStoreRouter := c.mockRouterJobsDB.EXPECT().StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Do(func(ctx context.Context, tx jobsdb.StoreSafeTx, jobs []*jobsdb.JobT) { + Expect(jobs).To(HaveLen(2)) + for i, job := range jobs { + assertStoreJob(job, i, "value-enabled-destination-a") + } + }) + + c.MockRsourcesService.EXPECT(). + IncrementStats( + gomock.Any(), + gomock.Any(), + "job_run_id_1", + rsources.JobTargetKey{ + TaskRunID: "task_run_id_1", + SourceID: "enabled-source-no-ut", + }, + rsources.Stats{In: 2, Failed: 2}, + ).Times(1).Return(nil) + + c.MockRsourcesService.EXPECT(). + IncrementStats( + gomock.Any(), + gomock.Any(), + "job_run_id_1", + rsources.JobTargetKey{ + TaskRunID: "task_run_id_1", + SourceID: "enabled-source-no-ut", + }, + rsources.Stats{Out: 1}, + ).Times(1).Return(nil) + + c.mockArchivalDB.EXPECT(). + WithStoreSafeTx( + gomock.Any(), + gomock.Any(), + ).Times(1). + Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { + _ = f(jobsdb.EmptyStoreSafeTx()) + }).Return(nil) + c.mockArchivalDB.EXPECT(). + StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()). + Times(1). + Do(func(ctx context.Context, tx jobsdb.StoreSafeTx, jobs []*jobsdb.JobT) { + Expect(jobs).To(HaveLen(2)) + }) + + c.mockGatewayJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) { + _ = f(jobsdb.EmptyUpdateSafeTx()) + }).Return(nil).Times(1) + c.mockGatewayJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Len(len(unprocessedJobsList)), gatewayCustomVal, nil).Times(1).After(callStoreRouter). + Do(func(ctx context.Context, txn jobsdb.UpdateSafeTx, statuses []*jobsdb.JobStatusT, _, _ interface{}) { + // jobs should be sorted by jobid, so order of statuses is different from order of jobs + for i := range unprocessedJobsList { + assertJobStatus(unprocessedJobsList[i], statuses[i], jobsdb.Succeeded.State) + } + }) + + c.mockTrackedUsersCollector.EXPECT().CollectData(gomock.Any(), unprocessedJobsList, gomock.Any()).Times(1) + + Setup(processor, c, false, false) + processor.trackedUsersCollectionEnabled = true + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) + GinkgoT().Log("Processor setup and init done") + handlePendingGatewayJobs(processor) + }) + }) +}) + var _ = Describe("Processor", Ordered, func() { initProcessor() @@ -1474,6 +1783,7 @@ var _ = Describe("Processor", Ordered, func() { destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, + trackedusers.NewNoopDataCollector(), ) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -1504,6 +1814,7 @@ var _ = Describe("Processor", Ordered, func() { destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, + trackedusers.NewNoopDataCollector(), ) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -1539,6 +1850,7 @@ var _ = Describe("Processor", Ordered, func() { destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, + trackedusers.NewNoopDataCollector(), ) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -2641,6 +2953,7 @@ var _ = Describe("Processor", Ordered, func() { destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, + trackedusers.NewNoopDataCollector(), ) setMainLoopTimeout(processor, 1*time.Second) @@ -2699,6 +3012,7 @@ var _ = Describe("Processor", Ordered, func() { destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, + trackedusers.NewNoopDataCollector(), ) defer processor.Shutdown() @@ -4706,6 +5020,7 @@ func Setup(processor *Handle, c *testContext, enableDedup, enableReporting bool) destinationdebugger.NewNoOpService(), transformationdebugger.NewNoOpService(), []enricher.PipelineEnricher{}, + c.mockTrackedUsersCollector, ) processor.reportingEnabled = enableReporting processor.sourceObservers = []sourceObserver{c.MockObserver} diff --git a/processor/worker.go b/processor/worker.go index 6192eb663fc..4dcc5e837f7 100644 --- a/processor/worker.go +++ b/processor/worker.go @@ -97,7 +97,7 @@ func (w *worker) start() { for subJob := range w.channel.store { if firstSubJob && !subJob.hasMore { - w.handle.Store(w.partition, subJob) + w.handle.Store(w.partition, subJob, nil) continue } @@ -114,7 +114,7 @@ func (w *worker) start() { mergedJob.merge(subJob) if !subJob.hasMore { - w.handle.Store(w.partition, mergedJob) + w.handle.Store(w.partition, mergedJob, nil) firstSubJob = true } } diff --git a/processor/worker_handle.go b/processor/worker_handle.go index a1df2f8397a..48015fdd7a6 100644 --- a/processor/worker_handle.go +++ b/processor/worker_handle.go @@ -25,7 +25,7 @@ type workerHandle interface { jobSplitter(jobs []*jobsdb.JobT, rsourcesStats rsources.StatsCollector) []subJob processJobsForDest(partition string, subJobs subJob) *transformationMessage transformations(partition string, in *transformationMessage) *storeMessage - Store(partition string, in *storeMessage) + Store(partition string, in *storeMessage, rawJobs []*jobsdb.JobT) } // workerHandleConfig is a struct containing the processor.Handle configuration relevant for workers diff --git a/processor/worker_test.go b/processor/worker_test.go index 345ca3de84a..76a32029860 100644 --- a/processor/worker_test.go +++ b/processor/worker_test.go @@ -202,11 +202,9 @@ func (m *mockWorkerHandle) handlePendingGatewayJobs(partition string) bool { } rsourcesStats := rsources.NewStatsCollector(m.rsourcesService(), rsources.IgnoreDestinationID()) for _, subJob := range m.jobSplitter(jobs.Jobs, rsourcesStats) { - m.Store(partition, - m.transformations(partition, - m.processJobsForDest(partition, subJob), - ), - ) + m.Store(partition, m.transformations(partition, + m.processJobsForDest(partition, subJob), + ), nil) } return len(jobs.Jobs) > 0 } @@ -303,7 +301,7 @@ func (m *mockWorkerHandle) transformations(partition string, in *transformationM } } -func (m *mockWorkerHandle) Store(partition string, in *storeMessage) { +func (m *mockWorkerHandle) Store(partition string, in *storeMessage, unprocessedJobs []*jobsdb.JobT) { if m.limiters.store != nil { defer m.limiters.store.Begin(partition)() }