Skip to content

Commit

Permalink
Add accurate sent count, last sent subscriber tracking on campaigns.
Browse files Browse the repository at this point in the history
- Sent count is no longer the batch size fetched from the DB but is
  the actual count of messages sent.
- Pausing and resuming now accurately tracks the last subscriber that
  was processed and resumes from there.
- Fix multiple concurrent campaigns blocking.

Closes #1616. Closes #905. Closes #1496. Closes #1250. Closes #1010.
  • Loading branch information
knadh committed Jan 2, 2024
1 parent 414c5c0 commit 772476c
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 16 deletions.
14 changes: 11 additions & 3 deletions cmd/manager_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ func newManagerStore(q *models.Queries, c *core.Core, m media.Store) *store {
}
}

// NextCampaigns retrieves active campaigns ready to be processed.
func (s *store) NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error) {
// NextCampaigns retrieves active campaigns ready to be processed excluding
// campaigns that are also being processed. Additionally, it takes a map of campaignID:sentCount
// of campaigns that are being processed and updates them in the DB.
func (s *store) NextCampaigns(currentIDs []int64, sentCounts []int64) ([]*models.Campaign, error) {
var out []*models.Campaign
err := s.queries.NextCampaigns.Select(&out, pq.Int64Array(excludeIDs))
err := s.queries.NextCampaigns.Select(&out, pq.Int64Array(currentIDs), pq.Int64Array(sentCounts))
return out, err
}

Expand All @@ -58,6 +60,12 @@ func (s *store) UpdateCampaignStatus(campID int, status string) error {
return err
}

// UpdateCampaignStatus updates a campaign's status.
func (s *store) UpdateCampaignCounts(campID int, toSend int, sent int, lastSubID int) error {
_, err := s.queries.UpdateCampaignCounts.Exec(campID, toSend, sent, lastSubID)
return err
}

// GetAttachment fetches a media attachment blob.
func (s *store) GetAttachment(mediaID int) (models.Attachment, error) {
m, err := s.core.GetMedia(mediaID, "", s.media)
Expand Down
45 changes: 39 additions & 6 deletions internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ const (
// Store represents a data backend, such as a database,
// that provides subscriber and campaign records.
type Store interface {
NextCampaigns(excludeIDs []int64) ([]*models.Campaign, error)
NextCampaigns(currentIDs []int64, sentCounts []int64) ([]*models.Campaign, error)
NextSubscribers(campID, limit int) ([]models.Subscriber, error)
GetCampaign(campID int) (*models.Campaign, error)
GetAttachment(mediaID int) (models.Attachment, error)
UpdateCampaignStatus(campID int, status string) error
UpdateCampaignCounts(campID int, toSend int, sent int, lastSubID int) error
CreateLink(url string) (string, error)
BlocklistSubscriber(id int64) error
DeleteSubscriber(id int64) error
Expand Down Expand Up @@ -165,9 +166,9 @@ func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18
pipes: make(map[int]*pipe),
tpls: make(map[int]*models.Template),
links: make(map[string]string),
nextPipes: make(chan *pipe, cfg.Concurrency),
campMsgQ: make(chan CampaignMessage, cfg.Concurrency*2),
msgQ: make(chan models.Message, cfg.Concurrency),
nextPipes: make(chan *pipe, 1000),
campMsgQ: make(chan CampaignMessage, cfg.Concurrency*cfg.MessageRate*2),
msgQ: make(chan models.Message, cfg.Concurrency*cfg.MessageRate*2),
slidingStart: time.Now(),
}
m.tplFuncs = m.makeGnericFuncMap()
Expand Down Expand Up @@ -275,7 +276,10 @@ func (m *Manager) Run() {

if has {
// There are more subscribers to fetch. Queue again.
m.nextPipes <- p
select {
case m.nextPipes <- p:
default:
}
} else {
// Mark the pseudo counter that's added in makePipe() that is used
// to force a wait on a pipe.
Expand Down Expand Up @@ -388,7 +392,8 @@ func (m *Manager) scanCampaigns(tick time.Duration) {
select {
// Periodically scan the data source for campaigns to process.
case <-t.C:
campaigns, err := m.store.NextCampaigns(m.getRunningCampaignIDs())
ids, counts := m.getCurrentCampaigns()
campaigns, err := m.store.NextCampaigns(ids, counts)
if err != nil {
m.log.Printf("error fetching campaigns: %v", err)
continue
Expand Down Expand Up @@ -488,7 +493,12 @@ func (m *Manager) worker() {
if err != nil {
msg.pipe.OnError()
} else {
id := uint64(msg.Subscriber.ID)
if id > msg.pipe.lastID.Load() {
msg.pipe.lastID.Store(uint64(msg.Subscriber.ID))
}
msg.pipe.rate.Incr(1)
msg.pipe.sent.Add(1)
}
}

Expand Down Expand Up @@ -518,6 +528,29 @@ func (m *Manager) getRunningCampaignIDs() []int64 {
return ids
}

// getCurrentCampaigns returns the IDs of campaigns currently being processed
// and their sent counts.
func (m *Manager) getCurrentCampaigns() ([]int64, []int64) {
// Needs to return an empty slice in case there are no campaigns.
m.pipesMut.RLock()
defer m.pipesMut.RUnlock()

var (
ids = make([]int64, 0, len(m.pipes))
counts = make([]int64, 0, len(m.pipes))
)
for _, p := range m.pipes {
ids = append(ids, int64(p.camp.ID))

// Get the sent counts for campaigns and reset them to 0
// as in the database, they're stored cumulatively (sent += $newSent).
counts = append(counts, p.sent.Load())
p.sent.Store(0)
}

return ids, counts
}

// isCampaignProcessing checks if the campaign is being processed.
func (m *Manager) isCampaignProcessing(id int) bool {
m.pipesMut.RLock()
Expand Down
9 changes: 8 additions & 1 deletion internal/manager/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ type pipe struct {
camp *models.Campaign
rate *ratecounter.RateCounter
wg *sync.WaitGroup
stopped atomic.Bool
sent atomic.Int64
lastID atomic.Uint64
errors atomic.Uint64
stopped atomic.Bool
withErrors atomic.Bool

m *Manager
Expand Down Expand Up @@ -182,6 +184,11 @@ func (p *pipe) cleanup() {
p.m.pipesMut.Unlock()
}()

// Update campaign's "sent" count.
if err := p.m.store.UpdateCampaignCounts(p.camp.ID, 0, int(p.sent.Load()), int(p.lastID.Load())); err != nil {
p.m.log.Printf("error updating campaign counts (%s): %v", p.camp.Name, err)
}

// The campaign was auto-paused due to errors.
if p.withErrors.Load() {
if err := p.m.store.UpdateCampaignStatus(p.camp.ID, models.CampaignStatusPaused); err != nil {
Expand Down
16 changes: 10 additions & 6 deletions queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ SELECT id, status, to_send, sent, started_at, updated_at
-- that is, the total number of subscribers to be processed across all lists of a campaign.
-- Thus, it has a sideaffect.
-- In addition, it finds the max_subscriber_id, the upper limit across all lists of
-- a campaign. This is used to fetch and slice subscribers for the campaign in next-subscriber-campaigns.
-- a campaign. This is used to fetch and slice subscribers for the campaign in next-campaign-subscribers.
WITH camps AS (
-- Get all running campaigns and their template bodies (if the template's deleted, the default template body instead)
SELECT campaigns.*, COALESCE(templates.body, (SELECT body FROM templates WHERE is_default = true LIMIT 1)) AS template_body
Expand Down Expand Up @@ -666,6 +666,12 @@ counts AS (
)
GROUP BY camps.id
),
updateCounts AS (
WITH uc (campaign_id, sent_count) AS (SELECT * FROM unnest($1::INT[], $2::INT[]))
UPDATE campaigns
SET sent = sent + uc.sent_count
FROM uc WHERE campaigns.id = uc.campaign_id
),
u AS (
-- For each campaign, update the to_send count and set the max_subscriber_id.
UPDATE campaigns AS ca
Expand Down Expand Up @@ -767,9 +773,7 @@ subs AS (
),
u AS (
UPDATE campaigns
SET last_subscriber_id = (SELECT MAX(id) FROM subs),
sent = sent + (SELECT COUNT(id) FROM subs),
updated_at = NOW()
SET last_subscriber_id = (SELECT MAX(id) FROM subs), updated_at = NOW()
WHERE (SELECT COUNT(id) FROM subs) > 0 AND id=$1
)
SELECT * FROM subs;
Expand Down Expand Up @@ -829,8 +833,8 @@ INSERT INTO campaign_lists (campaign_id, list_id, list_name)
-- name: update-campaign-counts
UPDATE campaigns SET
to_send=(CASE WHEN $2 != 0 THEN $2 ELSE to_send END),
sent=(CASE WHEN $3 != 0 THEN $3 ELSE sent END),
last_subscriber_id=(CASE WHEN $4 != 0 THEN $4 ELSE last_subscriber_id END),
sent=sent+$3,
last_subscriber_id=(CASE WHEN $4 > 0 THEN $4 ELSE to_send END),
updated_at=NOW()
WHERE id=$1;

Expand Down

0 comments on commit 772476c

Please sign in to comment.