Skip to content

Commit

Permalink
Remove obsolete bounce routines from manager package.
Browse files Browse the repository at this point in the history
  • Loading branch information
knadh committed Sep 26, 2021
1 parent 4056187 commit 3ffd88f
Showing 1 changed file with 1 addition and 51 deletions.
52 changes: 1 addition & 51 deletions internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ const (
// BaseTPL is the name of the base template.
BaseTPL = "base"

BounceTypeBlocklist = "blocklist"
BounceTypeDelete = "delete"

// ContentTpl is the name of the compiled message.
ContentTpl = "content"

Expand All @@ -38,11 +35,6 @@ type Store interface {
GetCampaign(campID int) (*models.Campaign, error)
UpdateCampaignStatus(campID int, status string) error
CreateLink(url string) (string, error)

// RecordBounce records an external bounce event identified by
// a user's UUID/e-mail and a campaign UUID.
RecordBounce(b models.Bounce) (int64, int, error)

BlocklistSubscriber(id int64) error
DeleteSubscriber(id int64) error
}
Expand Down Expand Up @@ -72,7 +64,6 @@ type Manager struct {
campMsgErrorQueue chan msgError
campMsgErrorCounts map[int]int
msgQueue chan Message
bounceQueue chan models.Bounce

// Sliding window keeps track of the total number of messages sent in a period
// and on reaching the specified limit, waits until the window is over before
Expand Down Expand Up @@ -124,8 +115,6 @@ type Config struct {
MessageURL string
ViewTrackURL string
UnsubHeader bool
BounceCount int
BounceAction string
}

type msgError struct {
Expand Down Expand Up @@ -159,7 +148,6 @@ func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18
subFetchQueue: make(chan *models.Campaign, cfg.Concurrency),
campMsgQueue: make(chan CampaignMessage, cfg.Concurrency*2),
msgQueue: make(chan Message, cfg.Concurrency),
bounceQueue: make(chan models.Bounce, cfg.Concurrency),
campMsgErrorQueue: make(chan msgError, cfg.MaxSendErrors),
campMsgErrorCounts: make(map[int]int),
slidingWindowStart: time.Now(),
Expand Down Expand Up @@ -240,20 +228,6 @@ func (m *Manager) HasRunningCampaigns() bool {
return len(m.camps) > 0
}

// PushBounce records a bounce event.
func (m *Manager) PushBounce(b models.Bounce) error {
t := time.NewTicker(pushTimeout)
defer t.Stop()

select {
case m.bounceQueue <- b:
case <-t.C:
m.logger.Printf("bounce pushed timed out: %s / %s", b.SubscriberUUID, b.Email)
return errors.New("bounce push timed out")
}
return nil
}

// Run is a blocking function (that should be invoked as a goroutine)
// that scans the data source at regular intervals for pending campaigns,
// and queues them for processing. The process queue fetches batches of
Expand Down Expand Up @@ -292,7 +266,7 @@ func (m *Manager) Run(tick time.Duration) {
}
}

// worker is a blocking function that perpetually listents to events (message, bounce) on different
// worker is a blocking function that perpetually listents to events (message) on different
// queues and processes them.
func (m *Manager) worker() {
// Counter to keep track of the message / sec rate limit.
Expand Down Expand Up @@ -365,30 +339,6 @@ func (m *Manager) worker() {
if err != nil {
m.logger.Printf("error sending message '%s': %v", msg.Subject, err)
}

// Bounce event.
case b, ok := <-m.bounceQueue:
if !ok {
return
}

subID, count, err := m.store.RecordBounce(b)
if err != nil {
m.logger.Printf("error recording bounce %s / %s", b.SubscriberUUID, b.Email)
}

if count >= m.cfg.BounceCount {
switch m.cfg.BounceAction {
case BounceTypeBlocklist:
err = m.store.BlocklistSubscriber(subID)
case BounceTypeDelete:
err = m.store.DeleteSubscriber(subID)
}

if err != nil {
m.logger.Printf("error executing bounce for subscriber: %s", b.SubscriberUUID)
}
}
}
}
}
Expand Down

0 comments on commit 3ffd88f

Please sign in to comment.