Skip to content

Commit

Permalink
Fix data race-related test failures (PR #171)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattayes committed Nov 11, 2021
1 parent 885fd25 commit 2768bfb
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 28 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ jobs:
go-version: [1.x.x]
platform: [ubuntu-latest]
runs-on: ${{ matrix.platform }}
name: Go ${{ matrix.go-version }} (${{ matrix.platform }})
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Install Go
uses: actions/setup-go@v1
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go-version }}
- name: Checkout code
uses: actions/checkout@v1
- name: Test
run: go test --timeout 360s -v ./...
run: go test -race -timeout 360s -v ./...
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ _testmain.go
*.test
*.prof

# ignore vendor - they are only needed for tests.
# ignore vendor - they are only needed for tests.
vendor/

# ignore bazel things that are local only
bazel-*

# Visual Studio Code
.vscode/*
7 changes: 4 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/lytics/grid/v3/codec"
Expand Down Expand Up @@ -426,7 +427,7 @@ func (c *Client) broadcast(ctx context.Context, cancel context.CancelFunc, g *Gr
receivers := g.Members()

var broadcastErr error
successes := 0
var successes int32
mu := new(sync.Mutex)
wg := new(sync.WaitGroup)
for _, rec := range receivers {
Expand All @@ -442,7 +443,7 @@ func (c *Client) broadcast(ctx context.Context, cancel context.CancelFunc, g *Gr
// if this request was successful and the group is configured to Fastest,
// then cancel the context so other requests are terminated
cancel()
successes++
atomic.AddInt32(&successes, 1)
}

mu.Lock()
Expand All @@ -457,7 +458,7 @@ func (c *Client) broadcast(ctx context.Context, cancel context.CancelFunc, g *Gr

// if the group is configured to Fastest, and we had at least one successful
// request, then don't return an error
if g.fastest && broadcastErr != nil && successes > 0 {
if g.fastest && broadcastErr != nil && atomic.LoadInt32(&successes) > 0 {
broadcastErr = nil
}
return res, broadcastErr
Expand Down
16 changes: 13 additions & 3 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,32 @@ package grid
import (
"context"
"net"
"sync"
"testing"
"time"

"github.com/lytics/grid/v3/testetcd"
)

type contextActor struct {
mu sync.RWMutex
started chan bool
ctx context.Context
}

func (a *contextActor) Act(c context.Context) {
a.mu.Lock()
a.ctx = c
a.mu.Unlock()
a.started <- true
}

func (a *contextActor) Context() context.Context {
a.mu.RLock()
defer a.mu.RUnlock()
return a.ctx
}

func TestContextError(t *testing.T) {
// Create a context that is not valid to use
// with the grid context methods. The context
Expand Down Expand Up @@ -88,23 +98,23 @@ func TestValidContext(t *testing.T) {
case <-a.started:
server.Stop()

id, err := ContextActorID(a.ctx)
id, err := ContextActorID(a.Context())
if err != nil {
t.Fatal(err)
}
if id == "" {
t.Fatal("expected non-zero value")
}

name, err := ContextActorName(a.ctx)
name, err := ContextActorName(a.Context())
if err != nil {
t.Fatal(err)
}
if name == "" {
t.Fatal("expected non-zero value")
}

namespace, err := ContextActorNamespace(a.ctx)
namespace, err := ContextActorNamespace(a.Context())
if err != nil {
t.Fatal(err)
}
Expand Down
30 changes: 19 additions & 11 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (we *WatchEvent) String() string {

// Registry for discovery.
type Registry struct {
mu sync.Mutex
mu sync.RWMutex
done chan bool
exited chan bool
kv etcdv3.KV
Expand Down Expand Up @@ -202,6 +202,7 @@ func (rr *Registry) Start(addr net.Addr) error {

// Ensure that we're the owner of the address by taking an etcd lock
tctx, cancel := context.WithTimeout(context.TODO(), rr.LeaseDuration*2) // retry until Lease is up...
defer cancel()
err = rr.waitForAddress(tctx, address)
if err != nil {
return err
Expand Down Expand Up @@ -263,17 +264,24 @@ func (rr *Registry) Start(addr net.Addr) error {

// Address of this registry in the format of <ip>:<port>
func (rr *Registry) Address() string {
rr.mu.RLock()
defer rr.mu.RUnlock()
return rr.address
}

// Registry name, which is a human readable all ASCII
// transformation of the network address.
func (rr *Registry) Registry() string {
rr.mu.RLock()
defer rr.mu.RUnlock()
return rr.name
}

// Stop Registry.
func (rr *Registry) Stop() error {
rr.mu.Lock()
defer rr.mu.Unlock()

if rr.leaseID < 0 {
return nil
}
Expand All @@ -296,8 +304,8 @@ func (rr *Registry) Stop() error {

// Watch a prefix in the registry.
func (rr *Registry) Watch(c context.Context, prefix string) ([]*Registration, <-chan *WatchEvent, error) {
rr.mu.Lock()
defer rr.mu.Unlock()
rr.mu.RLock()
defer rr.mu.RUnlock()

getRes, err := rr.kv.Get(c, prefix, etcdv3.WithPrefix())
if err != nil {
Expand Down Expand Up @@ -390,8 +398,8 @@ func (rr *Registry) Watch(c context.Context, prefix string) ([]*Registration, <-

// FindRegistrations associated with the prefix.
func (rr *Registry) FindRegistrations(c context.Context, prefix string) ([]*Registration, error) {
rr.mu.Lock()
defer rr.mu.Unlock()
rr.mu.RLock()
defer rr.mu.RUnlock()

getRes, err := rr.kv.Get(c, prefix, etcdv3.WithPrefix())
if err != nil {
Expand All @@ -411,8 +419,8 @@ func (rr *Registry) FindRegistrations(c context.Context, prefix string) ([]*Regi

// FindRegistration associated with the given key.
func (rr *Registry) FindRegistration(c context.Context, key string) (*Registration, error) {
rr.mu.Lock()
defer rr.mu.Unlock()
rr.mu.RLock()
defer rr.mu.RUnlock()

getRes, err := rr.kv.Get(c, key, etcdv3.WithLimit(1))
if err != nil {
Expand All @@ -434,8 +442,8 @@ func (rr *Registry) FindRegistration(c context.Context, key string) (*Registrati
// Hence, registration can be used for mutual-exclusion.
func (rr *Registry) Register(c context.Context, key string, annotations ...string) error {
sort.Strings(annotations)
rr.mu.Lock()
defer rr.mu.Unlock()
rr.mu.RLock()
defer rr.mu.RUnlock()

if rr.leaseID < 0 {
return ErrNotStarted
Expand Down Expand Up @@ -475,8 +483,8 @@ func (rr *Registry) Register(c context.Context, key string, annotations ...strin

// Deregister under the given key.
func (rr *Registry) Deregister(c context.Context, key string) error {
rr.mu.Lock()
defer rr.mu.Unlock()
rr.mu.RLock()
defer rr.mu.RUnlock()

if rr.leaseID < 0 {
return ErrNotStarted
Expand Down
9 changes: 3 additions & 6 deletions registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,18 @@ func TestWaitForLeaseThatDoesExpires(t *testing.T) {
}
r1.LeaseDuration = 10 * time.Second

_, err = kv.Put(context.Background(), registryLockKey(address), "")
if err != nil {
if _, err := kv.Put(context.Background(), registryLockKey(address), ""); err != nil {
t.Fatal(err)
}
time.AfterFunc(5*time.Second, func() {
// cleanup lock so that the registry can startup.
_, err = kv.Delete(context.Background(), registryLockKey(address))
if err != nil {
if _, err := kv.Delete(context.Background(), registryLockKey(address)); err != nil {
t.Fatal(err)
}
})

st := time.Now()
err = r1.Start(addr)
if err != nil {
if err := r1.Start(addr); err != nil {
t.Fatalf("unexpected error: err: %v", err)
}
// ensure that we waited 10 seconds...
Expand Down

0 comments on commit 2768bfb

Please sign in to comment.