Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle queues #122

Merged
merged 10 commits into from
Jul 15, 2024
Merged

Shuffle queues #122

merged 10 commits into from
Jul 15, 2024

Conversation

nickstenning
Copy link
Member

@nickstenning nickstenning commented Jul 12, 2024

This PR implements read/write/length support for shuffle-sharded multi-tenant queues implemented over the top of a collection of Redis streams. This is intended to end up as the library used by all of our clients that interact with these data structures (api, autoscaler, director).

Writes

We calculate the possible streams (based on a shard key) in Go, and then pass those into a Lua script in Redis which does the following:

  1. Checks to see if it can update the queue "meta" key to reflect the current number of total streams.

    If the new value is greater than the current one it can do this immediately. If the new value is less than the current one it will check the lengths of the streams beyond the current set, and only update the meta key if all those streams are empty.

    For example: if we were previously writing to 16 streams (with indexes 0 to 15) and new requests arrive saying that we're writing to 8 streams (0 to 7), then we'll check that streams 9 to 15 are empty before updating the meta key.

  2. Finds the shortest stream out of those provided, or the first empty stream.

  3. XADDs the provided message to the selected stream.

  4. XADDs a short message to a notifications stream, which can be used to await new work when queues are not busy.

  5. Sets a timeout/expiry on all the keys of the queue.

  6. Returns the enqueued message ID.

Reads

The read side of the sharded queue implementation is quite fiddly, primarily because we cannot block/wait on a write to one of N queues easily while also guaranteeing that we never pick up more than one message at a time.

Infuriatingly, when the queue is "empty", then

XREADGROUP ... COUNT 1 BLOCK 1000 STREAMS key1 key2 [...] > > [...]

will do the right thing. It will return immediately after the first XADD on any of the monitored streams. The problem is that if there are items immediately available in more than one of the specified streams, one per stream will be claimed and returned.

This implementation works around this by adding yet another stream -- a "notifications" stream -- the sole purpose of which is to accelerate the next read by the client when the stream is mostly empty.

Every time we write to the queue we also put a tiny message on the notifications stream, which can then be used by clients to block on activity on the queue as a whole.

An individual blocking read, then, has three steps:

  1. a non-blocking pass through all the queues
  2. if nothing is found, block on the notifications queue
  3. if we get a message, do another non-blocking pass through all the queues

This is inherently racy, but should hopefully work well enough for our purposes and doesn't involve hammering Redis to scan for messages repeatedly.

The read code also handles a number of other aspects of this problem:

  • for new queues, the creation of the requested consumer group is handled automatically
  • we will automatically read from the "default" stream if it exists and has items in it -- this will allow us to migrate everything to sharded queues without downtime
  • the consumer group provided is used when reading from the new streams, but when reading from the default stream we use the stream name as a consumer group name -- this will allow us to migrate away from having different consumer group names for each queue

Len

We also expose a Len method which adds up the lengths of all the streams.

Implements one half of a shuffle-sharded queue implementation.
Specifically: the write half.

This wraps up the logic for writing to a shuffle-sharded queue so that
clients can simply provide the relevant parameters for the queue, and
the details of which stream is written to are hidden.

We calculate the possible streams (based on a shard key) in Go, and then
pass those into a Lua script in Redis which does the following:

1. Checks to see if it can update the queue "meta" key to reflect the
   current number of total streams.

   If the new value is greater than the current one it can do this
   immediately. If the new value is less than the current one it will
   check the lengths of the streams beyond the current set, and only
   update the meta key if all those streams are empty.

   For example: if we were previously writing to 16 streams (with
   indexes 0 to 15) and new requests arrive saying that we're writing to
   8 streams (0 to 7), then we'll check that streams 9 to 15 are empty
   before updating the meta key.

2. Finds the shortest stream out of those provided, or the first empty
   stream.

3. XADDs the provided message to the selected stream.

4. XADDs a short message to a `notifications` stream, which can be used
   to await new work when queues are not busy.

4. Sets a timeout/expiry on all the keys of the queue.

5. Returns the enqueued message ID.
This implements the read side of the sharded queue implementation. This
is quite fiddly, primarily because we cannot block/wait on a write to
one of N queues easily while also guaranteeing that we never pick up
more than one message at a time.

Infuriatingly, when the queue is "empty", then

  XREADGROUP ... COUNT 1 BLOCK 1000 STREAMS key1 key2 [...] > > [...]

will do the right thing. It will return immediately after the first XADD
on any of the monitored streams. The problem is that if there are items
immediately available in more than one of the specified streams, one per
stream will be claimed and returned.

This implementation works around this by adding yet another stream -- a
"notifications" stream -- the sole purpose of which is to accelerate the
next read by the client when the stream is mostly empty.

Every time we write to the queue we also put a tiny message on the
notifications stream, which can then be used by clients to block on
activity on the queue as a whole.

An individual blocking read, then, has three steps:

1. a non-blocking pass through all the queues
2. if nothing is found, block on the notifications queue
3. if we get a message, do another non-blocking pass through all the
   queues

This is inherently racy, but should hopefully work well enough for our
purposes and doesn't involve hammering Redis to scan for messages
repeatedly.

The read code also handles a number of other aspects of this problem:

- for new queues, the creation of the requested consumer group is
  handled automatically
- we will automatically read from the "default" stream if it exists and
  has items in it -- this will allow us to migrate everything to sharded
  queues without downtime
- the consumer group provided is used when reading from the new streams,
  but when reading from the default stream we use the stream name as a
  consumer group name -- this will allow us to migrate away from having
  different consumer group names for each queue
This tests the notification stream is working as intended.
@nickstenning nickstenning marked this pull request as ready for review July 15, 2024 09:38
@nickstenning nickstenning force-pushed the shuffle-queues branch 3 times, most recently from 9ac2910 to 26c4a17 Compare July 15, 2024 10:31
Copy link
Member

@evilstreak evilstreak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing! ✨

A couple of notes we discussed online:

  • There's a (very unlikely) race condition with notifications having a max length of 1: two directors can simultaneously fail a queue read and then do a blocking read on the notifications stream, and in the time between the two queue reads failing and the two notification reads making it to redis, we enqueue two (or more) predictions into the queues. Now only one director will pick up the work, and the other will wait the whole blocking time (or until another prediction is enqueued). This seems super unlikely to happen, would be mitigated by increasing the notification stream length slightly, and is something we can probably instrument for to find out if it's actually a problem in reality (I predict not).
  • Starting at a random queue position with each read doesn't guarantee fair weighting. As a pathological case, if a bulk user is assigned the shard {1,2,3,4,5,6} and a realtime user is assigned the shard {7,8,9,10,11,12} (and the remaining 52 streams are empty), then 6/64 times a prediction from the realtime user will get processed, and 58/64 times a prediction from the bulk user gets processed. We discussed addressing this by keeping a counter with the metadata and incrementing it before every read, so that all reads from all consumers share a global state which is guaranteed to round robin all queues, rather than starting at a random position.

@nickstenning
Copy link
Member Author

Thanks for spotting the second issue, as that is rather fatal to the fairness goal. I've pushed bc43b14 which I think addresses the issue.

queue/read.lua Outdated Show resolved Hide resolved
@philandstuff
Copy link
Contributor

@evilstreak for the first issue, we could potentially fix it by:

  • drop the cap on the notifications stream (so it can grow)
  • in the read lua script, if we find the streams are empty, we return the current max id of the notification stream
  • when we do XREADGROUP on the notification stream we specify a min ID to guarantee we get exactly only those notifications that arrive after we checked and found the streams empty

Discussed on gather with @nickstenning, we might not need to do this but it should fix things if we do need to.

@nickstenning nickstenning force-pushed the shuffle-queues branch 2 times, most recently from d943e93 to 433ac39 Compare July 15, 2024 12:46
@nickstenning
Copy link
Member Author

Discussed on gather with @nickstenning, we might not need to do this but it should fix things if we do need to.

We agreed that we'd save this as something we can experiment with if the race condition in question seems to be something we're seeing for real. I've added queue.pickup_delay_ms telemetry to help us with this.

Copy link
Contributor

@philandstuff philandstuff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great thank you! A few minor comments. I can't say I fully understand the algorithm but we can chat about that.

queue/types.go Show resolved Hide resolved
queue/len.lua Show resolved Hide resolved
queue/client.go Outdated Show resolved Hide resolved
queue/client_test.go Show resolved Hide resolved
shuffleshard/shuffleshard_test.go Show resolved Hide resolved
This adds a third script which calculates the total queue length by
summing the lengths of all the streams.
By analogy with most of the Redis client commands, this switches the
queue package to return a sentinel error (queue.Empty) when no messages
are available, rather than the somewhat cryptic "nil, nil".
Dom pointed out that using a random (or, as in our case, "random")
offset for choosing where to start reading doesn't actually result in
fair reads.

Consider the contrived but quite possible case of a 64-stream queue in
which tenant A is allocated streams [0,4), tenant B is allocated streams
[4,8), and no other tenants are using the queue.

Any read which starts at offsets in the range [4,8) will fetch a message
for tenant B, but a read starting at any other offset (all 60 of them)
will fetch a message for tenant A.

This updates the read code to use a globally coordinated offset stored
in the "meta" key to ensure fairness across reads, still without
requiring consumers to maintain state. Any time a message is found, the
offset is updated to point to the *next* stream in the queue, and then
reads start at the recorded offset. This should ensure that the first
read round-robins through all queues that have messages in them.
This records the time that the message has spent in the queue at pickup
on the current span as `queue.pickup_delay_ms`.
@nickstenning nickstenning merged commit 462aa88 into main Jul 15, 2024
2 checks passed
@nickstenning nickstenning deleted the shuffle-queues branch July 15, 2024 13:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants