Skip to content

Commit

Permalink
add evaluation watcher and enable atomic enqueues (#3998)
Browse files Browse the repository at this point in the history
  • Loading branch information
wdbaruni committed May 22, 2024
1 parent e984099 commit 5c482d4
Show file tree
Hide file tree
Showing 14 changed files with 688 additions and 211 deletions.
46 changes: 14 additions & 32 deletions pkg/jobstore/boltdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package boltjobstore
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/benbjohnson/clock"
Expand Down Expand Up @@ -42,11 +40,10 @@ const (
var SpecKey = []byte("spec")

type BoltJobStore struct {
database *bolt.DB
clock clock.Clock
marshaller marshaller.Marshaller
watchers []*jobstore.Watcher
watcherLock sync.Mutex
database *bolt.DB
clock clock.Clock
marshaller marshaller.Marshaller
watchersManager *jobstore.WatchersManager

inProgressIndex *Index
namespacesIndex *Index
Expand Down Expand Up @@ -91,10 +88,10 @@ func NewBoltJobStore(dbPath string, options ...Option) (*BoltJobStore, error) {
}

store := &BoltJobStore{
database: db,
clock: clock.New(),
marshaller: marshaller.NewJSONMarshaller(),
watchers: make([]*jobstore.Watcher, 0), //nolint:gomnd
database: db,
clock: clock.New(),
marshaller: marshaller.NewJSONMarshaller(),
watchersManager: jobstore.NewWatchersManager(),
}

for _, opt := range options {
Expand Down Expand Up @@ -147,26 +144,13 @@ func (b *BoltJobStore) BeginTx(ctx context.Context) (jobstore.TxContext, error)

func (b *BoltJobStore) Watch(ctx context.Context,
types jobstore.StoreWatcherType,
events jobstore.StoreEventType) chan jobstore.WatchEvent {
w := jobstore.NewWatcher(types, events)

b.watcherLock.Lock() // keep the watchers lock as narrow as possible
b.watchers = append(b.watchers, w)
b.watcherLock.Unlock()

return w.Channel()
events jobstore.StoreEventType,
options ...jobstore.WatcherOption) *jobstore.Watcher {
return b.watchersManager.NewWatcher(ctx, types, events, options...)
}

func (b *BoltJobStore) triggerEvent(t jobstore.StoreWatcherType, e jobstore.StoreEventType, object interface{}) {
data, _ := json.Marshal(object)

for _, w := range b.watchers {
if !w.IsWatchingEvent(e) || !w.IsWatchingType(t) {
return
}

_ = w.WriteEvent(t, e, data, false) // Do not block
}
func (b *BoltJobStore) triggerEvent(t jobstore.StoreWatcherType, e jobstore.StoreEventType, object any) {
b.watchersManager.Write(t, e, object)
}

// GetJob retrieves the Job identified by the id string. If the job isn't found it will
Expand Down Expand Up @@ -1274,9 +1258,7 @@ func (b *BoltJobStore) deleteEvaluation(tx *bolt.Tx, id string) error {
}

func (b *BoltJobStore) Close(ctx context.Context) error {
for _, w := range b.watchers {
w.Close()
}
b.watchersManager.Close()

log.Ctx(ctx).Debug().Msg("closing bolt-backed job store")
return b.database.Close()
Expand Down
27 changes: 12 additions & 15 deletions pkg/jobstore/boltdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package boltjobstore

import (
"context"
"encoding/json"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -579,7 +578,7 @@ func (s *BoltJobstoreTestSuite) TestShortIDs() {
}

func (s *BoltJobstoreTestSuite) TestEvents() {
ch := s.store.Watch(s.ctx,
watcher := s.store.Watch(s.ctx,
jobstore.JobWatcher|jobstore.ExecutionWatcher,
jobstore.CreateEvent|jobstore.UpdateEvent|jobstore.DeleteEvent,
)
Expand All @@ -596,14 +595,13 @@ func (s *BoltJobstoreTestSuite) TestEvents() {
s.Require().NoError(err)

// Read an event, it should be a jobcreate
ev := <-ch
ev := <-watcher.Channel()
s.Require().Equal(ev.Event, jobstore.CreateEvent)
s.Require().Equal(ev.Kind, jobstore.JobWatcher)

var decodedJob models.Job
err = json.Unmarshal(ev.Object, &decodedJob)
s.Require().NoError(err)
s.Require().Equal(decodedJob.ID, job.ID)
expectedJob, ok := ev.Object.(models.Job)
s.Require().True(ok, "expected object to be a job")
s.Require().Equal(expectedJob.ID, job.ID)
})

s.Run("execution create event", func() {
Expand All @@ -615,7 +613,7 @@ func (s *BoltJobstoreTestSuite) TestEvents() {
s.Require().NoError(err)

// Read an event, it should be a ExecutionForJob Create
ev := <-ch
ev := <-watcher.Channel()
s.Require().Equal(ev.Event, jobstore.CreateEvent)
s.Require().Equal(ev.Kind, jobstore.ExecutionWatcher)
})
Expand All @@ -630,7 +628,7 @@ func (s *BoltJobstoreTestSuite) TestEvents() {
Event: models.Event{Message: "event test"},
}
_ = s.store.UpdateJobState(s.ctx, request)
ev := <-ch
ev := <-watcher.Channel()
s.Require().Equal(ev.Event, jobstore.UpdateEvent)
s.Require().Equal(ev.Kind, jobstore.JobWatcher)
})
Expand All @@ -646,19 +644,18 @@ func (s *BoltJobstoreTestSuite) TestEvents() {
NewValues: execution,
Event: models.Event{Message: "event test"},
})
ev := <-ch
ev := <-watcher.Channel()
s.Require().Equal(ev.Event, jobstore.UpdateEvent)
s.Require().Equal(ev.Kind, jobstore.ExecutionWatcher)

var decodedExecution models.Execution
err := json.Unmarshal(ev.Object, &decodedExecution)
s.Require().NoError(err)
s.Require().Equal(decodedExecution.ID, execution.ID)
expectedExec, ok := ev.Object.(models.Execution)
s.Require().True(ok, "expected object to be an execution")
s.Require().Equal(expectedExec.ID, execution.ID)
})

s.Run("delete job event", func() {
_ = s.store.DeleteJob(s.ctx, job.ID)
ev := <-ch
ev := <-watcher.Channel()
s.Require().Equal(ev.Event, jobstore.DeleteEvent)
s.Require().Equal(ev.Kind, jobstore.JobWatcher)
})
Expand Down
15 changes: 10 additions & 5 deletions pkg/jobstore/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/jobstore/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Store interface {
// will contain a timestamp, but also the StoreWatcherType and
// StoreEventType that triggered the event. A json encoded `[]byte`
// of the related object will also be included in the [WatchEvent].
Watch(ctx context.Context, types StoreWatcherType, events StoreEventType) chan WatchEvent
Watch(ctx context.Context, types StoreWatcherType, events StoreEventType, options ...WatcherOption) *Watcher

// GetJob returns a job, identified by the id parameter, or an error if
// it does not exist.
Expand Down
Loading

0 comments on commit 5c482d4

Please sign in to comment.