Skip to content

Commit

Permalink
Always use contexts (PR #187)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattayes committed Jun 15, 2023
1 parent 14aad2c commit bda6dbf
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 189 deletions.
2 changes: 1 addition & 1 deletion bench/clientserver_roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func BenchmarkClientServerRoundTripBigMapBigStrMsg(b *testing.B) {
func benchRunner(b *testing.B, client *grid.Client, evtMsg *Event) {
for n := 0; n < b.N; n++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
response, err := client.RequestC(ctx, mailboxName, evtMsg)
response, err := client.Request(ctx, mailboxName, evtMsg)
cancel()
if err != nil {
b.Fatal(err)
Expand Down
34 changes: 4 additions & 30 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,8 @@ func (c *Client) Close() error {
return err
}

// Request a response for the given message.
func (c *Client) Request(timeout time.Duration, receiver string, msg interface{}) (interface{}, error) {
if c == nil {
return nil, ErrNilClient
}

timeoutC, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return c.RequestC(timeoutC, receiver, msg)
}

// RequestC (request) a response for the given message. The context can be
// used to control cancelation or timeouts.
func (c *Client) RequestC(ctx context.Context, receiver string, msg interface{}) (interface{}, error) {
// Request a response for the given message. The context can be used to control cancelation or timeouts.
func (c *Client) Request(ctx context.Context, receiver string, msg interface{}) (interface{}, error) {
if c == nil {
return nil, ErrNilClient
}
Expand Down Expand Up @@ -401,23 +389,9 @@ func (c *Client) logf(format string, v ...interface{}) {
}
}

// Broadcast a message to all members in a Group
func (c *Client) Broadcast(timeout time.Duration, g *Group, msg interface{}) (BroadcastResult, error) {
if c == nil {
return nil, ErrNilClient
}
if g == nil {
return nil, ErrNilGroup
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return c.broadcast(ctx, cancel, g, msg)
}

// BroadcastC (broadcast) a message to all members in a Group. The context can be used to control
// cancellations or timeouts
func (c *Client) BroadcastC(ctx context.Context, g *Group, msg interface{}) (BroadcastResult, error) {
func (c *Client) Broadcast(ctx context.Context, g *Group, msg interface{}) (BroadcastResult, error) {
if c == nil {
return nil, ErrNilClient
}
Expand All @@ -442,7 +416,7 @@ func (c *Client) broadcast(ctx context.Context, cancel context.CancelFunc, g *Gr
wg.Add(1)
go func(receiver string) {
defer wg.Done()
resp, err := c.RequestC(ctx, receiver, msg)
resp, err := c.Request(ctx, receiver, msg)
if err != nil {
mu.Lock()
broadcastErr = ErrIncompleteBroadcast
Expand Down
78 changes: 57 additions & 21 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func TestClientClose(t *testing.T) {

func TestClientRequestWithUnregisteredMailbox(t *testing.T) {
t.Parallel()
const timeout = 3 * time.Second

// Bootstrap.
_, _, client := bootstrapClientTest(t)
Expand All @@ -139,7 +138,9 @@ func TestClientRequestWithUnregisteredMailbox(t *testing.T) {
client.cs = newClientStats()

// Send a request to some random name.
res, err := client.Request(timeout, "mock", NewActorStart("mock"))
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
res, err := client.Request(ctx, "mock", NewActorStart("mock"))
if err != ErrUnregisteredMailbox {
t.Fatal(err)
}
Expand All @@ -165,15 +166,17 @@ func TestClientRequestWithUnknownMailbox(t *testing.T) {

// Place a bogus entry in etcd with
// a matching name.
timeoutC, cancel := context.WithTimeout(context.Background(), 3*time.Second)
err := server.registry.Register(timeoutC, client.cfg.Namespace+".mailbox.mock")
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
err := server.registry.Register(ctx, client.cfg.Namespace+".mailbox.mock")
cancel()
if err != nil {
t.Fatal(err)
}

// Send a request to some random name.
res, err := client.Request(timeout, "mock", NewActorStart("mock"))
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
res, err := client.Request(ctx, "mock", NewActorStart("mock"))
cancel()
if err == nil {
t.Fatal("expected error")
}
Expand All @@ -192,13 +195,16 @@ func TestClientRequestWithUnknownMailbox(t *testing.T) {

func TestClientBroadcast(t *testing.T) {
t.Parallel()
ctx := context.Background()
const timeout = 3 * time.Second
_, server, client := bootstrapClientTest(t)

a := &echoActor{t: t, ready: make(chan bool), server: server}
server.RegisterDef("echo", func([]byte) (Actor, error) { return a, nil })

peers, err := client.Query(timeout, Peers)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
peers, err := client.Query(ctx, Peers)
if err != nil {
t.Fatalf("failed to query peers: %v", err)
} else if len(peers) != 1 {
Expand All @@ -210,7 +216,9 @@ func TestClientBroadcast(t *testing.T) {
startEchoActor := func(name string) {
actor := NewActorStart(name)
actor.Type = echoType
res, err := client.Request(timeout, peer, actor)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
res, err := client.Request(ctx, peer, actor)
if err != nil {
t.Fatalf("failed to start echo actor: %v", err)
} else if res == nil {
Expand All @@ -228,7 +236,9 @@ func TestClientBroadcast(t *testing.T) {
msg := &EchoMsg{Msg: "lol"}
t.Run("broadcast-all", func(t *testing.T) {
g := NewListGroup("echo-0", "echo-1")
res, err := client.Broadcast(timeout, g, msg)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
res, err := client.Broadcast(ctx, g, msg)
if err != nil {
t.Fatalf("failed to broadcast message: %v", err)
} else if len(res) != numActors {
Expand All @@ -249,7 +259,9 @@ func TestClientBroadcast(t *testing.T) {

t.Run("broadcast-fastest", func(t *testing.T) {
g := NewListGroup("echo-0", "echo-1")
res, err := client.Broadcast(timeout, g.Fastest(), msg)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
res, err := client.Broadcast(ctx, g.Fastest(), msg)
if err != nil {
t.Fatalf("failed to broadcast message: %v", err)
} else if len(res) != numActors {
Expand Down Expand Up @@ -279,7 +291,9 @@ func TestClientBroadcast(t *testing.T) {
g := NewListGroup("echo-0", "echo-1", "echo-2")

resultSet := make(BroadcastResult)
tmpSet, err := client.Broadcast(timeout, g.ExceptSuccesses(resultSet), msg)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
tmpSet, err := client.Broadcast(ctx, g.ExceptSuccesses(resultSet), msg)
if err != ErrIncompleteBroadcast {
t.Fatal("expected a broadcast-error")
} else if tmpSet["echo-2"].Err != ErrUnregisteredMailbox {
Expand All @@ -290,7 +304,9 @@ func TestClientBroadcast(t *testing.T) {
// start the missing actor
startEchoActor("echo-2")

tmpSet, err = client.Broadcast(timeout, g.ExceptSuccesses(resultSet), msg)
ctx, cancel = context.WithTimeout(context.Background(), timeout)
defer cancel()
tmpSet, err = client.Broadcast(ctx, g.ExceptSuccesses(resultSet), msg)
if err != nil {
t.Fatalf("expected nil error, got %v", err)
}
Expand Down Expand Up @@ -329,7 +345,9 @@ func TestClientWithRunningReceiver(t *testing.T) {
a.server = server

// Discover some peers.
peers, err := client.Query(timeout, Peers)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
peers, err := client.Query(ctx, Peers)
if err != nil {
t.Fatal(err)
}
Expand All @@ -338,7 +356,9 @@ func TestClientWithRunningReceiver(t *testing.T) {
}

// Start the echo actor on the first peer.
res, err := client.Request(timeout, peers[0].Name(), NewActorStart("echo"))
ctx, cancel = context.WithTimeout(context.Background(), timeout)
defer cancel()
res, err := client.Request(ctx, peers[0].Name(), NewActorStart("echo"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -350,7 +370,9 @@ func TestClientWithRunningReceiver(t *testing.T) {
<-a.ready

// Make a request to echo actor.
res, err = client.Request(timeout, "echo", expected)
ctx, cancel = context.WithTimeout(context.Background(), timeout)
res, err = client.Request(ctx, "echo", expected)
cancel()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -394,7 +416,9 @@ func TestClientWithErrConnectionIsUnregistered(t *testing.T) {
a.server = server

// Discover some peers.
peers, err := client.Query(timeout, Peers)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
peers, err := client.Query(ctx, Peers)
if err != nil {
t.Fatal(err)
}
Expand All @@ -403,7 +427,9 @@ func TestClientWithErrConnectionIsUnregistered(t *testing.T) {
}

// Start the echo actor on the first peer.
res, err := client.Request(timeout, peers[0].Name(), NewActorStart("echo"))
ctx, cancel = context.WithTimeout(context.Background(), timeout)
defer cancel()
res, err := client.Request(ctx, peers[0].Name(), NewActorStart("echo"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -415,7 +441,9 @@ func TestClientWithErrConnectionIsUnregistered(t *testing.T) {
<-a.ready

// Make a request to echo actor.
res, err = client.Request(timeout, "echo", expected)
ctx, cancel = context.WithTimeout(context.Background(), timeout)
res, err = client.Request(ctx, "echo", expected)
cancel()
if err != nil {
t.Fatal(err)
}
Expand All @@ -430,7 +458,9 @@ func TestClientWithErrConnectionIsUnregistered(t *testing.T) {
time.Sleep(timeout)

// Make the request again.
res, err = client.Request(timeout, "echo", expected)
ctx, cancel = context.WithTimeout(context.Background(), timeout)
res, err = client.Request(ctx, "echo", expected)
cancel()
if err == nil {
t.Fatal("expected error")
}
Expand Down Expand Up @@ -466,7 +496,9 @@ func TestClientWithBusyReceiver(t *testing.T) {
a.server = server

// Discover some peers.
peers, err := client.Query(timeout, Peers)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
peers, err := client.Query(ctx, Peers)
if err != nil {
t.Fatal(err)
}
Expand All @@ -475,7 +507,9 @@ func TestClientWithBusyReceiver(t *testing.T) {
}

// Start the busy actor on the first peer.
res, err := client.Request(timeout, peers[0].Name(), NewActorStart("busy"))
ctx, cancel = context.WithTimeout(context.Background(), timeout)
res, err := client.Request(ctx, peers[0].Name(), NewActorStart("busy"))
cancel()
if err != nil {
t.Fatal(err)
}
Expand All @@ -487,7 +521,9 @@ func TestClientWithBusyReceiver(t *testing.T) {
<-a.ready

// Make a request to busy actor.
res, err = client.Request(timeout, "busy", expected)
ctx, cancel = context.WithTimeout(context.Background(), timeout)
res, err = client.Request(ctx, "busy", expected)
cancel()
if err == nil {
t.Fatal(err)
}
Expand Down
12 changes: 8 additions & 4 deletions examples/hello/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,20 @@ type LeaderActor struct {

// Act checks for peers, ie: other processes running this code,
// in the same namespace and start the worker actor on one of them.
func (a *LeaderActor) Act(c context.Context) {
func (a *LeaderActor) Act(ctx context.Context) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

existing := make(map[string]bool)
for {
select {
case <-c.Done():
case <-ctx.Done():
return
case <-ticker.C:
// Ask for current peers.
peers, err := a.client.Query(timeout, grid.Peers)
ctx, cancel := context.WithTimeout(ctx, timeout)
peers, err := a.client.Query(ctx, grid.Peers)
cancel()
successOrDie(err)

// Check for new peers.
Expand All @@ -50,7 +52,9 @@ func (a *LeaderActor) Act(c context.Context) {
start.Type = "worker"

// On new peers start the worker.
_, err := a.client.Request(timeout, peer.Name(), start)
ctx, cancel := context.WithTimeout(ctx, timeout)
_, err := a.client.Request(ctx, peer.Name(), start)
cancel()
successOrDie(err)
}
}
Expand Down
26 changes: 17 additions & 9 deletions examples/requestreply/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,35 +36,39 @@ type LeaderActor struct {

// Act checks for peers, ie: other processes running this code,
// in the same namespace and start the worker actor on one of them.
func (a *LeaderActor) Act(c context.Context) {
func (a *LeaderActor) Act(ctx context.Context) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

fmt.Println("Starting Leader Actor")

existing := make(map[string]bool)
existing := make(map[string]struct{})
for {
select {
case <-c.Done():
case <-ctx.Done():
return
case <-ticker.C:
// Ask for current peers.
peers, err := a.client.Query(timeout, grid.Peers)
ctx, cancel := context.WithTimeout(ctx, timeout)
peers, err := a.client.Query(ctx, grid.Peers)
cancel()
successOrDie(err)

// Check for new peers.
for _, peer := range peers {
if existing[peer.Name()] {
if _, ok := existing[peer.Name()]; ok {
continue
}

// Define a worker.
existing[peer.Name()] = true
existing[peer.Name()] = struct{}{}
start := grid.NewActorStart("worker-%d", len(existing))
start.Type = "worker"

// On new peers start the worker.
_, err := a.client.Request(timeout, peer.Name(), start)
ctx, cancel := context.WithTimeout(ctx, timeout)
_, err := a.client.Request(ctx, peer.Name(), start)
cancel()
successOrDie(err)
}
}
Expand Down Expand Up @@ -210,7 +214,9 @@ func (m *apiServer) loadWorkers() {
return
}
// Ask for current peers.
peers, err := m.c.Query(timeout, grid.Peers)
ctx, cancel := context.WithTimeout(m.ctx, timeout)
peers, err := m.c.Query(ctx, grid.Peers)
cancel()
successOrDie(err)
existing := make(map[string]bool)
m.mu.Lock()
Expand Down Expand Up @@ -259,7 +265,9 @@ func (m *apiServer) Run() {
worker = m.ConsistentWorker(user)
}

res, err := m.c.Request(timeout, worker, &Event{User: user})
ctx, cancel := context.WithTimeout(r.Context(), timeout)
defer cancel()
res, err := m.c.Request(ctx, worker, &Event{User: user})
fmt.Printf("request user: %q response: %#v err=%v\n", user, res, err)
if er, ok := res.(*EventResponse); ok {
fmt.Fprintf(w, "Response %s\n\n", er.Id)
Expand Down
Loading

0 comments on commit bda6dbf

Please sign in to comment.