Skip to content

Commit

Permalink
addressed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mihir20 committed Jul 2, 2024
1 parent dd8965d commit 41d934f
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 106 deletions.
36 changes: 21 additions & 15 deletions enterprise/trackedusers/users_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,22 @@ import (
)

const (
idTypeUserID = "userID"
idTypeAnonymousID = "anonymousID"
idTypeUserIDAnonymousIDCombination = "userIDAnonymousIDCombination"
idTypeUserID = "userID"
idTypeAnonymousID = "anonymousID"
idTypeIdentifiedAnonymousID = "identifiedAnonymousID"

// changing this will be non backwards compatible
murmurSeed = 123

trackUsersTable = "tracked_users_reports"
)

type UsersReport struct {
WorkspaceID string
SourceID string
UserIDHll *hll.Hll
AnonymousIDHLL *hll.Hll
IdentifiedAnonymousIDHLL *hll.Hll
AnonymousIDHll *hll.Hll
IdentifiedAnonymousIDHll *hll.Hll
}

//go:generate mockgen -destination=./mocks/mock_user_reporter.go -package=mockuserreporter github.com/rudderlabs/rudder-server/enterprise/trackedusers UsersReporter
Expand All @@ -52,6 +54,7 @@ type UniqueUsersReporter struct {
log logger.Logger
hllSettings *hll.Settings
instanceID string
now func() time.Time
}

func NewUniqueUsersReporter(log logger.Logger, conf *config.Config) (*UniqueUsersReporter, error) {
Expand All @@ -64,6 +67,9 @@ func NewUniqueUsersReporter(log logger.Logger, conf *config.Config) (*UniqueUser
SparseEnabled: true,
},
instanceID: config.GetString("INSTANCE_ID", "1"),
now: func() time.Time {
return time.Now()
},
}, nil
}

Expand Down Expand Up @@ -131,7 +137,7 @@ func (u *UniqueUsersReporter) GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourc

if userID != "" && anonymousID != "" {
combinedUserIDAnonymousID := combineUserIDAnonymousID(userID, anonymousID)
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], combinedUserIDAnonymousID, idTypeUserIDAnonymousIDCombination)
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], combinedUserIDAnonymousID, idTypeIdentifiedAnonymousID)
}
}

Expand All @@ -147,8 +153,8 @@ func (u *UniqueUsersReporter) GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourc
WorkspaceID: workspaceID,
SourceID: sourceID,
UserIDHll: userIdTypeMap[idTypeUserID],
AnonymousIDHLL: userIdTypeMap[idTypeAnonymousID],
IdentifiedAnonymousIDHLL: userIdTypeMap[idTypeUserIDAnonymousIDCombination],
AnonymousIDHll: userIdTypeMap[idTypeAnonymousID],
IdentifiedAnonymousIDHll: userIdTypeMap[idTypeIdentifiedAnonymousID],
}
})...)
}
Expand All @@ -159,7 +165,7 @@ func (u *UniqueUsersReporter) ReportUsers(ctx context.Context, reports []*UsersR
if len(reports) == 0 {
return nil
}
stmt, err := tx.PrepareContext(ctx, pq.CopyIn("tracked_users_reports",
stmt, err := tx.PrepareContext(ctx, pq.CopyIn(trackUsersTable,
"workspace_id",
"instance_id",
"source_id",
Expand All @@ -169,26 +175,26 @@ func (u *UniqueUsersReporter) ReportUsers(ctx context.Context, reports []*UsersR
"identified_anonymousid_hll",
))
if err != nil {
return fmt.Errorf("preparing statement: %v", err)
return fmt.Errorf("preparing statement: %w", err)
}
defer func() { _ = stmt.Close() }()

for _, report := range reports {
_, err := stmt.Exec(report.WorkspaceID,
u.instanceID,
report.SourceID,
time.Now(),
u.now(),
hllToString(report.UserIDHll),
hllToString(report.AnonymousIDHLL),
hllToString(report.IdentifiedAnonymousIDHLL),
hllToString(report.AnonymousIDHll),
hllToString(report.IdentifiedAnonymousIDHll),
)
if err != nil {
return fmt.Errorf("executing statement: %v", err)
return fmt.Errorf("executing statement: %w", err)
}

}
if _, err = stmt.ExecContext(ctx); err != nil {
return fmt.Errorf("executing final statement: %v", err)
return fmt.Errorf("executing final statement: %w", err)
}
return nil
}
Expand Down
Loading

0 comments on commit 41d934f

Please sign in to comment.