Skip to content

Commit

Permalink
pubsub: fix a subtle race in catch up logic for barely active subs (#247
Browse files Browse the repository at this point in the history
)

The order of events processing when Corrosion is handling a change is:
  - determine changes
  - send changes to the channel
  - commit changes to sub DB

When Corrosion client is catching up it subscribes to changes sent to
the channel and queries the full sub DB. It's possibly that the query
is run *before* the changes are committed, but after they've been sent
to the channel, so what happens is:

  * A change with ID N is processed by Corrosion and sent to the channel
  * catch up logic starts and subscribes to changes, but doesn't see
    this change
  * sub DB is read and sent to the client, last committed change ID is
    N - 1, so EndOfQuery{change_id: N - 1} is sent to the client
  * corrosion commits the last update so the last change in the DB becomes
    N
  * catch up handler has min_change_id = N - 1 + 1, so when it compares
    last sent change ID (N) with min_change_id (N) it ignores it becase
    the comparison is strictly greater.

min_change_id is what we expect to receive, so the fix is trivial -
comparison should be greater or equal.
  • Loading branch information
pborzenkov committed Jul 30, 2024
1 parent 4ed92e8 commit f93048a
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions crates/corro-agent/src/api/public/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ pub async fn catch_up_sub(
for i in 0..5 {
min_change_id = last_change_id + 1;

if change_id > min_change_id {
if change_id >= min_change_id {
// missed some updates!
info!(sub_id = %matcher.id(), "attempt #{} to catch up subcription from change id: {change_id:?} (last: {last_change_id:?})", i+1);

Expand All @@ -547,7 +547,7 @@ pub async fn catch_up_sub(
// sleep 100 millis
tokio::time::sleep(Duration::from_millis(100)).await;
}
if change_id > min_change_id {
if change_id >= min_change_id {
_ = evt_tx
.send(error_to_query_event_bytes_with_meta(
&mut buf,
Expand Down

0 comments on commit f93048a

Please sign in to comment.