Skip to content

Commit

Permalink
feat(dal): collapse vector clock ids for all clocks on rebase
Browse files Browse the repository at this point in the history
Co-Authored-By: Jacob Helwig <[email protected]>
  • Loading branch information
zacharyhamm and jhelwig committed Jul 9, 2024
1 parent a3e82c2 commit 4f21b25
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 55 deletions.
29 changes: 29 additions & 0 deletions lib/dal/src/change_set.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! The sequel to [`ChangeSets`](crate::ChangeSet). Coming to an SI instance near you!

use std::collections::HashSet;
use std::str::FromStr;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use si_events::VectorClockChangeSetId;
Expand All @@ -23,6 +26,8 @@ pub mod event;
pub mod status;
pub mod view;

const FIND_ANCESTORS_QUERY: &str = include_str!("queries/change_set/find_ancestors.sql");

/// The primary error type for this module.
#[remain::sorted]
#[derive(Debug, Error)]
Expand Down Expand Up @@ -57,6 +62,8 @@ pub enum ChangeSetError {
SerdeJson(#[from] serde_json::Error),
#[error("transactions error: {0}")]
Transactions(#[from] TransactionsError),
#[error("ulid decode error: {0}")]
UlidDecode(#[from] ulid::DecodeError),
#[error("found an unexpected number of open change sets matching default change set (should be one, found {0:?})")]
UnexpectedNumberOfOpenChangeSetsMatchingDefaultChangeSet(Vec<ChangeSetId>),
#[error("user error: {0}")]
Expand Down Expand Up @@ -646,6 +653,28 @@ impl ChangeSet {
Ok(false)
}
}

/// Walk the graph of change sets up to the change set that has no "base
/// change set id" and return the set.
pub async fn ancestors(
ctx: &DalContext,
change_set_id: ChangeSetId,
) -> ChangeSetResult<HashSet<ChangeSetId>> {
let mut result = HashSet::new();
let rows = ctx
.txns()
.await?
.pg()
.query(FIND_ANCESTORS_QUERY, &[&change_set_id])
.await?;

for row in rows {
let id: String = row.get("id");
result.insert(ChangeSetId::from_str(&id)?);
}

Ok(result)
}
}

impl std::fmt::Debug for ChangeSet {
Expand Down
7 changes: 7 additions & 0 deletions lib/dal/src/queries/change_set/find_ancestors.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
WITH RECURSIVE change_set_recursive AS (
SELECT id, base_change_set_id FROM change_set_pointers WHERE id=$1
UNION ALL
SELECT change_set_pointers.id, change_set_pointers.base_change_set_id FROM change_set_pointers
INNER JOIN change_set_recursive ON change_set_recursive.base_change_set_id = change_set_pointers.id
)
SELECT id FROM change_set_recursive;
26 changes: 23 additions & 3 deletions lib/dal/src/workspace_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use crate::workspace_snapshot::node_weight::category_node_weight::CategoryNodeKi
use crate::workspace_snapshot::node_weight::NodeWeight;
use crate::workspace_snapshot::update::Update;
use crate::workspace_snapshot::vector_clock::VectorClockId;
use crate::{pk, Component, ComponentError, ComponentId, Workspace, WorkspaceError};
use crate::{pk, ChangeSet, Component, ComponentError, ComponentId, Workspace, WorkspaceError};
use crate::{
workspace_snapshot::{graph::WorkspaceSnapshotGraphError, node_weight::NodeWeightError},
DalContext, TransactionsError, WorkspaceSnapshotGraph,
Expand All @@ -88,8 +88,8 @@ pub enum WorkspaceSnapshotError {
AttributePrototypeArgument(#[from] Box<AttributePrototypeArgumentError>),
#[error("could not find category node of kind: {0:?}")]
CategoryNodeNotFound(CategoryNodeKind),
// #[error("change set error: {0}")]
// ChangeSet(#[from] ChangeSetError),
#[error("change set error: {0}")]
ChangeSet(#[from] ChangeSetError),
#[error("change set {0} has no workspace snapshot address")]
ChangeSetMissingWorkspaceSnapshotAddress(ChangeSetId),
#[error("Component error: {0}")]
Expand Down Expand Up @@ -2117,4 +2117,24 @@ impl WorkspaceSnapshot {

Ok(value_ids)
}

/// Prune and collapse vector clock entries for this snapshot, based on the
/// ancestry of its change set. This method assumes that the DalContext has
/// the correct change set id for this workspace snapshot.
pub async fn collapse_vector_clocks(&self, ctx: &DalContext) -> WorkspaceSnapshotResult<()> {
let change_set_id = ctx.change_set_id();
let ancestors = ChangeSet::ancestors(ctx, change_set_id)
.await?
.into_iter()
.map(|cs_id| cs_id.into_inner().into())
.collect();

let collapse_id = VectorClockId::new(change_set_id.into_inner(), ulid::Ulid(0));

self.working_copy_mut()
.await
.collapse_vector_clock_entries(ancestors, collapse_id);

Ok(())
}
}
18 changes: 14 additions & 4 deletions lib/dal/src/workspace_snapshot/edge_weight.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! Edges

use std::collections::HashSet;

use serde::{Deserialize, Serialize};
use si_events::VectorClockChangeSetId;
use strum::EnumDiscriminants;
use thiserror::Error;

Expand Down Expand Up @@ -102,10 +105,17 @@ impl EdgeWeight {

/// Remove stale vector clock entries. `allow_list` should always include
/// the current editing clock id...
pub fn remove_vector_clock_entries(&mut self, allow_list: &[VectorClockId]) {
self.vector_clock_first_seen.remove_entries(allow_list);
self.vector_clock_recently_seen.remove_entries(allow_list);
self.vector_clock_write.remove_entries(allow_list);
pub fn collapse_vector_clock_entries(
&mut self,
allow_list: &HashSet<VectorClockChangeSetId>,
collapse_id: VectorClockId,
) {
self.vector_clock_first_seen
.collapse_entries(allow_list, collapse_id);
self.vector_clock_recently_seen
.collapse_entries(allow_list, collapse_id);
self.vector_clock_write
.collapse_entries(allow_list, collapse_id);
}

pub fn new(vector_clock_id: VectorClockId, kind: EdgeWeightKind) -> EdgeWeightResult<Self> {
Expand Down
11 changes: 8 additions & 3 deletions lib/dal/src/workspace_snapshot/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub use petgraph::Direction;
use petgraph::{algo, prelude::*};
use serde::{Deserialize, Serialize};
use si_events::merkle_tree_hash::MerkleTreeHash;
use si_events::VectorClockChangeSetId;
use si_events::{ulid::Ulid, ContentHash};
use si_layer_cache::db::serialize;
use thiserror::Error;
Expand Down Expand Up @@ -605,12 +606,16 @@ impl WorkspaceSnapshotGraph {
Ok(source)
}

pub fn remove_vector_clock_entries(&mut self, allow_list: &[VectorClockId]) {
pub fn collapse_vector_clock_entries(
&mut self,
allow_list: HashSet<VectorClockChangeSetId>,
collapse_id: VectorClockId,
) {
for edge in self.graph.edge_weights_mut() {
edge.remove_vector_clock_entries(allow_list);
edge.collapse_vector_clock_entries(&allow_list, collapse_id);
}
for node in self.graph.node_weights_mut() {
node.remove_vector_clock_entries(allow_list);
node.collapse_vector_clock_entries(&allow_list, collapse_id);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl<'a, 'b> DetectConflictsAndUpdates<'a, 'b> {
.vector_clock_recently_seen()
.entry_for(self.onto_vector_clock_id);

let onto_last_seen_by_to_rebase_vector_clock_id = onto_root_node
let onto_last_seen_by_to_rebase_vector_clock = onto_root_node
.vector_clock_recently_seen()
.entry_for(self.to_rebase_vector_clock_id);

Expand Down Expand Up @@ -416,27 +416,24 @@ impl<'a, 'b> DetectConflictsAndUpdates<'a, 'b> {
.vector_clock_first_seen()
.entry_for(self.to_rebase_vector_clock_id)
{
let maybe_seen_by_onto_at =
if let Some(onto_last_seen_by_to_rebase_vector_clock_id) =
onto_last_seen_by_to_rebase_vector_clock_id
{
if edge_first_seen_by_to_rebase
<= onto_last_seen_by_to_rebase_vector_clock_id
{
Some(onto_last_seen_by_to_rebase_vector_clock_id)
} else {
None
}
let maybe_seen_by_onto_at = if let Some(onto_last_seen_by_to_rebase_vector_clock) =
onto_last_seen_by_to_rebase_vector_clock
{
if edge_first_seen_by_to_rebase <= onto_last_seen_by_to_rebase_vector_clock {
Some(onto_last_seen_by_to_rebase_vector_clock)
} else {
to_rebase_edge_weight
.vector_clock_recently_seen()
.entry_for(self.onto_vector_clock_id)
.or_else(|| {
to_rebase_edge_weight
.vector_clock_first_seen()
.entry_for(self.onto_vector_clock_id)
})
};
None
}
} else {
to_rebase_edge_weight
.vector_clock_recently_seen()
.entry_for(self.onto_vector_clock_id)
.or_else(|| {
to_rebase_edge_weight
.vector_clock_first_seen()
.entry_for(self.onto_vector_clock_id)
})
};

if let Some(seen_by_onto_at) = maybe_seen_by_onto_at {
if to_rebase_item_weight
Expand Down Expand Up @@ -550,23 +547,6 @@ impl<'a, 'b> DetectConflictsAndUpdates<'a, 'b> {
container: container_node_information,
removed_item: removed_item_node_information,
});
// If the vector clock ids are identical, the seen
// timestamps become a little less informative. To
// determine whether this is a new edge or one that
// should stay removed in to_rebase, we simply determine
// whether to_rebase or onto was more recently "seen"
// overall
} else if self.to_rebase_vector_clock_id == self.onto_vector_clock_id
&& to_rebase_last_seen_by_onto_vector_clock_id
< onto_last_seen_by_to_rebase_vector_clock_id
{
updates.push(Update::new_edge(
self.to_rebase_graph,
self.onto_graph,
to_rebase_container_index,
only_onto_edge_info,
onto_edge_weight.to_owned(),
)?);
}
}
None => {
Expand Down
15 changes: 12 additions & 3 deletions lib/dal/src/workspace_snapshot/node_weight.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::HashSet;
use std::num::TryFromIntError;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use si_events::VectorClockChangeSetId;
use si_events::{merkle_tree_hash::MerkleTreeHash, ulid::Ulid, ContentHash, EncryptedSecretKey};
use strum::EnumDiscriminants;
use thiserror::Error;
Expand Down Expand Up @@ -375,12 +377,19 @@ impl NodeWeight {
.inc_to(vector_clock_id, new_clock_value);
}

pub fn remove_vector_clock_entries(&mut self, allow_list: &[VectorClockId]) {
pub fn collapse_vector_clock_entries(
&mut self,
allow_list: &HashSet<VectorClockChangeSetId>,
collapse_id: VectorClockId,
) {
self.vector_clock_first_seen_mut()
.remove_entries(allow_list);
.collapse_entries(allow_list, collapse_id);

self.vector_clock_recently_seen_mut()
.remove_entries(allow_list);
.collapse_entries(allow_list, collapse_id);

self.vector_clock_write_mut()
.collapse_entries(allow_list, collapse_id);
}

/// Many node kinds need to have complete control of their outgoing edges
Expand Down
27 changes: 23 additions & 4 deletions lib/dal/src/workspace_snapshot/vector_clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,29 @@ impl VectorClock {
.collect()
}

/// Remove all vector clock entries except those in `allow_list`
pub fn remove_entries(&mut self, allow_list: &[VectorClockId]) {
self.entries
.retain(|clock_id, _| allow_list.contains(clock_id));
/// Remove all vector clock entries except those in `allow_list` and
/// collapse them into the collapse_id by choosing the maximum removed entry
pub fn collapse_entries(
&mut self,
allow_list: &HashSet<VectorClockChangeSetId>,
collapse_id: VectorClockId,
) {
let mut max_removed = None;
self.entries.retain(|clock_id, &mut lamport_clock| {
if allow_list.contains(&clock_id.change_set_id()) {
true
} else {
if Some(lamport_clock) > max_removed {
max_removed = Some(lamport_clock.to_owned());
}

false
}
});

if let Some(max_removed) = max_removed {
self.inc_to(collapse_id, max_removed.counter);
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions lib/rebaser-server/src/rebase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ pub async fn perform_rebase(
workspace_pk.into_inner(),
);

to_rebase_workspace_snapshot
.collapse_vector_clocks(ctx)
.await?;

to_rebase_workspace_snapshot
.write(ctx, vector_clock_id)
.await?;
Expand Down

0 comments on commit 4f21b25

Please sign in to comment.