From 41d934f088bc09103e2c077c450128e6ccd5a360 Mon Sep 17 00:00:00 2001 From: mihir Date: Tue, 2 Jul 2024 20:19:31 +0530 Subject: [PATCH] addressed review comments --- enterprise/trackedusers/users_reporter.go | 36 ++- .../trackedusers/users_reporter_test.go | 258 ++++++++++++------ 2 files changed, 188 insertions(+), 106 deletions(-) diff --git a/enterprise/trackedusers/users_reporter.go b/enterprise/trackedusers/users_reporter.go index fe89293387..ac02284117 100644 --- a/enterprise/trackedusers/users_reporter.go +++ b/enterprise/trackedusers/users_reporter.go @@ -23,20 +23,22 @@ import ( ) const ( - idTypeUserID = "userID" - idTypeAnonymousID = "anonymousID" - idTypeUserIDAnonymousIDCombination = "userIDAnonymousIDCombination" + idTypeUserID = "userID" + idTypeAnonymousID = "anonymousID" + idTypeIdentifiedAnonymousID = "identifiedAnonymousID" // changing this will be non backwards compatible murmurSeed = 123 + + trackUsersTable = "tracked_users_reports" ) type UsersReport struct { WorkspaceID string SourceID string UserIDHll *hll.Hll - AnonymousIDHLL *hll.Hll - IdentifiedAnonymousIDHLL *hll.Hll + AnonymousIDHll *hll.Hll + IdentifiedAnonymousIDHll *hll.Hll } //go:generate mockgen -destination=./mocks/mock_user_reporter.go -package=mockuserreporter github.com/rudderlabs/rudder-server/enterprise/trackedusers UsersReporter @@ -52,6 +54,7 @@ type UniqueUsersReporter struct { log logger.Logger hllSettings *hll.Settings instanceID string + now func() time.Time } func NewUniqueUsersReporter(log logger.Logger, conf *config.Config) (*UniqueUsersReporter, error) { @@ -64,6 +67,9 @@ func NewUniqueUsersReporter(log logger.Logger, conf *config.Config) (*UniqueUser SparseEnabled: true, }, instanceID: config.GetString("INSTANCE_ID", "1"), + now: func() time.Time { + return time.Now() + }, }, nil } @@ -131,7 +137,7 @@ func (u *UniqueUsersReporter) GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourc if userID != "" && anonymousID != "" { combinedUserIDAnonymousID := combineUserIDAnonymousID(userID, anonymousID) - workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], combinedUserIDAnonymousID, idTypeUserIDAnonymousIDCombination) + workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], combinedUserIDAnonymousID, idTypeIdentifiedAnonymousID) } } @@ -147,8 +153,8 @@ func (u *UniqueUsersReporter) GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourc WorkspaceID: workspaceID, SourceID: sourceID, UserIDHll: userIdTypeMap[idTypeUserID], - AnonymousIDHLL: userIdTypeMap[idTypeAnonymousID], - IdentifiedAnonymousIDHLL: userIdTypeMap[idTypeUserIDAnonymousIDCombination], + AnonymousIDHll: userIdTypeMap[idTypeAnonymousID], + IdentifiedAnonymousIDHll: userIdTypeMap[idTypeIdentifiedAnonymousID], } })...) } @@ -159,7 +165,7 @@ func (u *UniqueUsersReporter) ReportUsers(ctx context.Context, reports []*UsersR if len(reports) == 0 { return nil } - stmt, err := tx.PrepareContext(ctx, pq.CopyIn("tracked_users_reports", + stmt, err := tx.PrepareContext(ctx, pq.CopyIn(trackUsersTable, "workspace_id", "instance_id", "source_id", @@ -169,7 +175,7 @@ func (u *UniqueUsersReporter) ReportUsers(ctx context.Context, reports []*UsersR "identified_anonymousid_hll", )) if err != nil { - return fmt.Errorf("preparing statement: %v", err) + return fmt.Errorf("preparing statement: %w", err) } defer func() { _ = stmt.Close() }() @@ -177,18 +183,18 @@ func (u *UniqueUsersReporter) ReportUsers(ctx context.Context, reports []*UsersR _, err := stmt.Exec(report.WorkspaceID, u.instanceID, report.SourceID, - time.Now(), + u.now(), hllToString(report.UserIDHll), - hllToString(report.AnonymousIDHLL), - hllToString(report.IdentifiedAnonymousIDHLL), + hllToString(report.AnonymousIDHll), + hllToString(report.IdentifiedAnonymousIDHll), ) if err != nil { - return fmt.Errorf("executing statement: %v", err) + return fmt.Errorf("executing statement: %w", err) } } if _, err = stmt.ExecContext(ctx); err != nil { - return fmt.Errorf("executing final statement: %v", err) + return fmt.Errorf("executing final statement: %w", err) } return nil } diff --git a/enterprise/trackedusers/users_reporter_test.go b/enterprise/trackedusers/users_reporter_test.go index 2edb34af8c..684e2614a5 100644 --- a/enterprise/trackedusers/users_reporter_test.go +++ b/enterprise/trackedusers/users_reporter_test.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" "testing" + "time" "github.com/rudderlabs/rudder-go-kit/config" @@ -31,46 +32,11 @@ var ( sampleWorkspaceID2 = "workspaceID2" sampleSourceID = "sourceID" sampleSourceToFilter = "filtered-source-id" - sampleTestJob1 = &jobsdb.JobT{ - Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`), - EventPayload: []byte(`{"receivedAt":"2021-06-06T20:26:39.598+05:30","writeKey":"writeKey","requestIP":"[::1]", "batch": [{"anonymousId":"anon_id","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"49e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"Demo Track","integrations":{"All":true},"messageId":"b96f3d8a-7c26-4329-9671-4e3202f42f15","originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","floatVal":4.501,"label":"Demo Label","testArray":[{"id":"elem1","value":"e1"},{"id":"elem2","value":"e2"}],"testMap":{"t1":"a","t2":4},"value":5},"rudderId":"a-292e-4e79-9880-f8009e0ae4a3","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`), - UserID: "a-292e-4e79-9880-f8009e0ae4a3", - UUID: uuid.New(), - CustomVal: "GW", - WorkspaceId: "workspaceID", - } - - sampleTestJob2 = &jobsdb.JobT{ - Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`), - EventPayload: []byte(`{"receivedAt":"2021-06-06T20:26:39.598+05:30","writeKey":"writeKey","requestIP":"[::1]", "batch": [{"userId":"user_id","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"49e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"Demo Track","integrations":{"All":true},"messageId":"b96f3d8a-7c26-4329-9671-4e3202f42f15","originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","floatVal":4.501,"label":"Demo Label","testArray":[{"id":"elem1","value":"e1"},{"id":"elem2","value":"e2"}],"testMap":{"t1":"a","t2":4},"value":5},"rudderId":"a-292e-4e79-9880-f8009e0ae4a3","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`), - UserID: "a-292e-4e79-9880-f8009e0ae4a3", - UUID: uuid.New(), - CustomVal: "GW", - WorkspaceId: "workspaceID", - } - - sampleTestJob3 = &jobsdb.JobT{ - Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`), - EventPayload: []byte(`{"receivedAt":"2021-06-06T20:26:39.598+05:30","writeKey":"writeKey","requestIP":"[::1]", "batch": [{"userId":"user_id","anonymousId":"anon_id","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"49e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"Demo Track","integrations":{"All":true},"messageId":"b96f3d8a-7c26-4329-9671-4e3202f42f15","originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","floatVal":4.501,"label":"Demo Label","testArray":[{"id":"elem1","value":"e1"},{"id":"elem2","value":"e2"}],"testMap":{"t1":"a","t2":4},"value":5},"rudderId":"a-292e-4e79-9880-f8009e0ae4a3","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`), - UserID: "a-292e-4e79-9880-f8009e0ae4a3", - UUID: uuid.New(), - CustomVal: "GW", - WorkspaceId: "workspaceID", - } - - sampleTestJob4 = &jobsdb.JobT{ - Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`), - EventPayload: []byte(`{"receivedAt":"2021-06-06T20:26:39.598+05:30","writeKey":"writeKey","requestIP":"[::1]", "batch": [{"userId":"user_id_1","anonymousId_1":"anon_id_1","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"49e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"Demo Track","integrations":{"All":true},"messageId":"b96f3d8a-7c26-4329-9671-4e3202f42f15","originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","floatVal":4.501,"label":"Demo Label","testArray":[{"id":"elem1","value":"e1"},{"id":"elem2","value":"e2"}],"testMap":{"t1":"a","t2":4},"value":5},"rudderId":"a-292e-4e79-9880-f8009e0ae4a3","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`), - UserID: "a-292e-4e79-9880-f8009e0ae4a3", - UUID: uuid.New(), - CustomVal: "GW", - WorkspaceId: "workspaceID", - } prepareJob = func(sourceID, userID, annID, workspaceID string) *jobsdb.JobT { return &jobsdb.JobT{ - Parameters: []byte(fmt.Sprintf(`{"batch_id":1,"source_id":%q,"source_job_run_id":""}`, sourceID)), - EventPayload: []byte(fmt.Sprintf(`{"receivedAt":"2021-06-06T20:26:39.598+05:30","writeKey":"writeKey","requestIP":"[::1]", "batch": [{"userId":%q,"anonymousId_1":%q,"channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"49e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"Demo Track","integrations":{"All":true},"messageId":"b96f3d8a-7c26-4329-9671-4e3202f42f15","originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","floatVal":4.501,"label":"Demo Label","testArray":[{"id":"elem1","value":"e1"},{"id":"elem2","value":"e2"}],"testMap":{"t1":"a","t2":4},"value":5},"rudderId":"a-292e-4e79-9880-f8009e0ae4a3","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`, userID, annID)), + Parameters: []byte(fmt.Sprintf(`{"source_id":%q}`, sourceID)), + EventPayload: []byte(fmt.Sprintf(`{"batch": [{"anonymousId":%q,"userId":%q,"type":"track"}]}`, annID, userID)), UserID: uuid.NewString(), UUID: uuid.New(), CustomVal: "GW", @@ -94,8 +60,8 @@ var ( WorkspaceID: workspaceID, SourceID: sourceID, UserIDHll: &userIDHll, - AnonymousIDHLL: &annIDHll, - IdentifiedAnonymousIDHLL: &identifiedAnnIDHll, + AnonymousIDHll: &annIDHll, + IdentifiedAnonymousIDHll: &identifiedAnnIDHll, } } ) @@ -109,48 +75,120 @@ func TestUniqueUsersReporter(t *testing.T) { name string jobs []*jobsdb.JobT sourceIDtoFilter map[string]bool - trackedUsers map[string]map[string]int + trackedUsers []*UsersReport }{ { name: "happy case", jobs: []*jobsdb.JobT{ - sampleTestJob1, - sampleTestJob2, - sampleTestJob3, - sampleTestJob4, + prepareJob(sampleSourceID, "", "anon_id", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id", "", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id", "anon_id", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id_1", "anon_id_1", sampleWorkspaceID), }, - trackedUsers: map[string]map[string]int{ - "workspaceID": { - "sourceID": 2, + trackedUsers: []*UsersReport{ + { + WorkspaceID: sampleWorkspaceID, + SourceID: sampleSourceID, + UserIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed)) + return &resHll + }(), + AnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id_1"), murmurSeed)) + return &resHll + }(), + IdentifiedAnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user_id", "anon_id")), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user_id_1", "anon_id_1")), murmurSeed)) + return &resHll + }(), }, }, }, { name: "happy case 2", jobs: []*jobsdb.JobT{ - sampleTestJob1, - sampleTestJob2, - sampleTestJob3, - sampleTestJob4, + prepareJob(sampleSourceID, "", "anon_id", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id", "", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id", "anon_id", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id_1", "anon_id_1", sampleWorkspaceID), prepareJob(sampleSourceID, "user", "ann", sampleWorkspaceID), prepareJob(sampleSourceID, "user", "ann", sampleWorkspaceID2), }, - trackedUsers: map[string]map[string]int{ - sampleWorkspaceID: { - sampleSourceID: 3, + trackedUsers: []*UsersReport{ + { + WorkspaceID: sampleWorkspaceID, + SourceID: sampleSourceID, + UserIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed)) + return &resHll + }(), + AnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id_1"), murmurSeed)) + return &resHll + }(), + IdentifiedAnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user_id", "anon_id")), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user_id_1", "anon_id_1")), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user", "ann")), murmurSeed)) + return &resHll + }(), }, - sampleWorkspaceID2: { - sampleSourceID: 1, + { + WorkspaceID: sampleWorkspaceID2, + SourceID: sampleSourceID, + UserIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed)) + return &resHll + }(), + AnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed)) + return &resHll + }(), + IdentifiedAnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user", "ann")), murmurSeed)) + return &resHll + }(), }, }, }, { name: "filter non event stream sources", jobs: []*jobsdb.JobT{ - sampleTestJob1, - sampleTestJob2, - sampleTestJob3, - sampleTestJob4, + prepareJob(sampleSourceID, "", "anon_id", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id", "", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id", "anon_id", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id_1", "anon_id_1", sampleWorkspaceID), prepareJob(sampleSourceID, "user", "ann", sampleWorkspaceID), prepareJob(sampleSourceID, "user", "ann", sampleWorkspaceID2), prepareJob(sampleSourceToFilter, uuid.NewString(), uuid.NewString(), sampleWorkspaceID2), @@ -158,12 +196,60 @@ func TestUniqueUsersReporter(t *testing.T) { sourceIDtoFilter: map[string]bool{ sampleSourceToFilter: true, }, - trackedUsers: map[string]map[string]int{ - sampleWorkspaceID: { - sampleSourceID: 3, + trackedUsers: []*UsersReport{ + { + WorkspaceID: sampleWorkspaceID, + SourceID: sampleSourceID, + UserIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed)) + return &resHll + }(), + AnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id_1"), murmurSeed)) + return &resHll + }(), + IdentifiedAnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user_id", "anon_id")), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user_id_1", "anon_id_1")), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user", "ann")), murmurSeed)) + return &resHll + }(), }, - sampleWorkspaceID2: { - sampleSourceID: 1, + { + WorkspaceID: sampleWorkspaceID2, + SourceID: sampleSourceID, + UserIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed)) + return &resHll + }(), + AnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed)) + return &resHll + }(), + IdentifiedAnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user", "ann")), murmurSeed)) + return &resHll + }(), }, }, }, @@ -173,7 +259,7 @@ func TestUniqueUsersReporter(t *testing.T) { prepareJob("", "user", "ann", sampleWorkspaceID), prepareJob("", "user", "ann", sampleWorkspaceID), }, - trackedUsers: map[string]map[string]int{}, + trackedUsers: nil, }, { name: "no event tracked in reports", @@ -182,7 +268,7 @@ func TestUniqueUsersReporter(t *testing.T) { prepareJob("", "user", "ann", sampleWorkspaceID), prepareJob("", "user", "ann", sampleWorkspaceID), }, - trackedUsers: map[string]map[string]int{}, + trackedUsers: nil, }, { name: "no workspace id in job", @@ -190,7 +276,7 @@ func TestUniqueUsersReporter(t *testing.T) { prepareJob("source", "user", "ann", ""), prepareJob("source", "user", "ann", ""), }, - trackedUsers: map[string]map[string]int{}, + trackedUsers: nil, }, } @@ -198,23 +284,7 @@ func TestUniqueUsersReporter(t *testing.T) { t.Run(tc.name, func(t *testing.T) { collector := &UniqueUsersReporter{log: logger.NOP, hllSettings: &hllSettings} reports := collector.GenerateReportsFromJobs(tc.jobs, tc.sourceIDtoFilter) - - result := make(map[string]map[string]int) - for _, e := range reports { - if result[e.WorkspaceID] == nil { - result[e.WorkspaceID] = make(map[string]int) - } - if e.UserIDHll != nil { - result[e.WorkspaceID][e.SourceID] += int(e.UserIDHll.Cardinality()) - } - if e.AnonymousIDHLL != nil { - result[e.WorkspaceID][e.SourceID] += int(e.AnonymousIDHLL.Cardinality()) - } - if e.IdentifiedAnonymousIDHLL != nil { - result[e.WorkspaceID][e.SourceID] -= int(e.IdentifiedAnonymousIDHLL.Cardinality()) - } - } - require.Equal(t, tc.trackedUsers, result) + require.ElementsMatch(t, tc.trackedUsers, reports) }) } }) @@ -251,6 +321,10 @@ func TestUniqueUsersReporter(t *testing.T) { sqlTx, err := postgresContainer.DB.Begin() require.NoError(t, err) tx := &txn.Tx{Tx: sqlTx} + fixedTime := time.Date(2021, 6, 6, 20, 26, 39, 598000000, time.UTC) + collector.now = func() time.Time { + return fixedTime + } err = collector.ReportUsers(context.Background(), tc.reports, tx) if tc.shouldFail { @@ -261,7 +335,7 @@ func TestUniqueUsersReporter(t *testing.T) { require.NoError(t, err) require.NoError(t, tx.Commit()) - rows, err := postgresContainer.DB.Query("SELECT workspace_id, source_id, userid_hll, anonymousid_hll, identified_anonymousid_hll FROM tracked_users_reports") + rows, err := postgresContainer.DB.Query("SELECT workspace_id, source_id, userid_hll, anonymousid_hll, identified_anonymousid_hll, reported_at FROM tracked_users_reports") require.NoError(t, err) require.NoError(t, rows.Err()) defer func() { _ = rows.Close() }() @@ -269,7 +343,8 @@ func TestUniqueUsersReporter(t *testing.T) { entries := make([]UsersReport, 0) for rows.Next() { var userIDHllStr, annIDHllStr, combHllStr string - err = rows.Scan(&entry.WorkspaceID, &entry.SourceID, &userIDHllStr, &annIDHllStr, &combHllStr) + var reportedTime time.Time + err = rows.Scan(&entry.WorkspaceID, &entry.SourceID, &userIDHllStr, &annIDHllStr, &combHllStr, &reportedTime) require.NoError(t, err) userHllBytes, err := hex.DecodeString(userIDHllStr) require.NoError(t, err) @@ -280,13 +355,14 @@ func TestUniqueUsersReporter(t *testing.T) { require.NoError(t, err) annHll, err := hll.FromBytes(annIDHllBytes) require.NoError(t, err) - entry.AnonymousIDHLL = &annHll + entry.AnonymousIDHll = &annHll combineHllBytes, err := hex.DecodeString(combHllStr) require.NoError(t, err) combHll, err := hll.FromBytes(combineHllBytes) require.NoError(t, err) - entry.IdentifiedAnonymousIDHLL = &combHll + entry.IdentifiedAnonymousIDHll = &combHll entries = append(entries, entry) + require.Equal(t, fixedTime, reportedTime) } result := make(map[string]map[string]int) for _, e := range entries { @@ -294,8 +370,8 @@ func TestUniqueUsersReporter(t *testing.T) { result[e.WorkspaceID] = make(map[string]int) } result[e.WorkspaceID][e.SourceID] += int(e.UserIDHll.Cardinality()) - result[e.WorkspaceID][e.SourceID] += int(e.AnonymousIDHLL.Cardinality()) - result[e.WorkspaceID][e.SourceID] -= int(e.IdentifiedAnonymousIDHLL.Cardinality()) + result[e.WorkspaceID][e.SourceID] += int(e.AnonymousIDHll.Cardinality()) + result[e.WorkspaceID][e.SourceID] -= int(e.IdentifiedAnonymousIDHll.Cardinality()) } require.Equal(t, tc.trackedUsers, result) })