Skip to content

Commit

Permalink
addressed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mihir20 committed Jul 2, 2024
1 parent 6fa21ae commit 7e031e6
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 106 deletions.
36 changes: 21 additions & 15 deletions enterprise/trackedusers/users_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,22 @@ import (
)

const (
idTypeUserID = "userID"
idTypeAnonymousID = "anonymousID"
idTypeUserIDAnonymousIDCombination = "userIDAnonymousIDCombination"
idTypeUserID = "userID"
idTypeAnonymousID = "anonymousID"
idTypeIdentifiedAnonymousID = "identifiedAnonymousID"

// changing this will be non backwards compatible
murmurSeed = 123

trackUsersTable = "tracked_users_reports"
)

type UsersReport struct {
WorkspaceID string
SourceID string
UserIDHll *hll.Hll
AnonymousIDHLL *hll.Hll
IdentifiedAnonymousIDHLL *hll.Hll
AnonymousIDHll *hll.Hll
IdentifiedAnonymousIDHll *hll.Hll
}

//go:generate mockgen -destination=./mocks/mock_user_reporter.go -package=mockuserreporter github.com/rudderlabs/rudder-server/enterprise/trackedusers UsersReporter
Expand All @@ -52,6 +54,7 @@ type UniqueUsersReporter struct {
log logger.Logger
hllSettings *hll.Settings
instanceID string
now func() time.Time
}

func NewUniqueUsersReporter(log logger.Logger, conf *config.Config) (*UniqueUsersReporter, error) {
Expand All @@ -64,6 +67,9 @@ func NewUniqueUsersReporter(log logger.Logger, conf *config.Config) (*UniqueUser
SparseEnabled: true,
},
instanceID: config.GetString("INSTANCE_ID", "1"),
now: func() time.Time {
return time.Now()
},

Check warning on line 72 in enterprise/trackedusers/users_reporter.go

View check run for this annotation

Codecov / codecov/patch

enterprise/trackedusers/users_reporter.go#L71-L72

Added lines #L71 - L72 were not covered by tests
}, nil
}

Expand Down Expand Up @@ -131,7 +137,7 @@ func (u *UniqueUsersReporter) GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourc

if userID != "" && anonymousID != "" {
combinedUserIDAnonymousID := combineUserIDAnonymousID(userID, anonymousID)
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], combinedUserIDAnonymousID, idTypeUserIDAnonymousIDCombination)
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], combinedUserIDAnonymousID, idTypeIdentifiedAnonymousID)
}
}

Expand All @@ -147,8 +153,8 @@ func (u *UniqueUsersReporter) GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourc
WorkspaceID: workspaceID,
SourceID: sourceID,
UserIDHll: userIdTypeMap[idTypeUserID],
AnonymousIDHLL: userIdTypeMap[idTypeAnonymousID],
IdentifiedAnonymousIDHLL: userIdTypeMap[idTypeUserIDAnonymousIDCombination],
AnonymousIDHll: userIdTypeMap[idTypeAnonymousID],
IdentifiedAnonymousIDHll: userIdTypeMap[idTypeIdentifiedAnonymousID],
}
})...)
}
Expand All @@ -159,7 +165,7 @@ func (u *UniqueUsersReporter) ReportUsers(ctx context.Context, reports []*UsersR
if len(reports) == 0 {
return nil

Check warning on line 166 in enterprise/trackedusers/users_reporter.go

View check run for this annotation

Codecov / codecov/patch

enterprise/trackedusers/users_reporter.go#L166

Added line #L166 was not covered by tests
}
stmt, err := tx.PrepareContext(ctx, pq.CopyIn("tracked_users_reports",
stmt, err := tx.PrepareContext(ctx, pq.CopyIn(trackUsersTable,
"workspace_id",
"instance_id",
"source_id",
Expand All @@ -169,26 +175,26 @@ func (u *UniqueUsersReporter) ReportUsers(ctx context.Context, reports []*UsersR
"identified_anonymousid_hll",
))
if err != nil {
return fmt.Errorf("preparing statement: %v", err)
return fmt.Errorf("preparing statement: %w", err)

Check warning on line 178 in enterprise/trackedusers/users_reporter.go

View check run for this annotation

Codecov / codecov/patch

enterprise/trackedusers/users_reporter.go#L178

Added line #L178 was not covered by tests
}
defer func() { _ = stmt.Close() }()

for _, report := range reports {
_, err := stmt.Exec(report.WorkspaceID,
u.instanceID,
report.SourceID,
time.Now(),
u.now(),
hllToString(report.UserIDHll),
hllToString(report.AnonymousIDHLL),
hllToString(report.IdentifiedAnonymousIDHLL),
hllToString(report.AnonymousIDHll),
hllToString(report.IdentifiedAnonymousIDHll),
)
if err != nil {
return fmt.Errorf("executing statement: %v", err)
return fmt.Errorf("executing statement: %w", err)

Check warning on line 192 in enterprise/trackedusers/users_reporter.go

View check run for this annotation

Codecov / codecov/patch

enterprise/trackedusers/users_reporter.go#L192

Added line #L192 was not covered by tests
}

}
if _, err = stmt.ExecContext(ctx); err != nil {
return fmt.Errorf("executing final statement: %v", err)
return fmt.Errorf("executing final statement: %w", err)

Check warning on line 197 in enterprise/trackedusers/users_reporter.go

View check run for this annotation

Codecov / codecov/patch

enterprise/trackedusers/users_reporter.go#L197

Added line #L197 was not covered by tests
}
return nil
}
Expand Down
Loading

0 comments on commit 7e031e6

Please sign in to comment.