Skip to content

Commit

Permalink
fix(dal): new edge in identical vector clock corner cases
Browse files Browse the repository at this point in the history
  • Loading branch information
zacharyhamm committed Jun 25, 2024
1 parent 4f1c203 commit 5059dfd
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 46 deletions.
49 changes: 49 additions & 0 deletions lib/dal/examples/rebase/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::{env, fs::File, io::prelude::*};

use si_layer_cache::db::serialize;

use dal::WorkspaceSnapshotGraph;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + 'static>>;

const USAGE: &str = "usage: cargo run --example rebase <TO_REBASE_FILE_PATH> <ONTO_FILE_PATH>";

fn load_snapshot_graph(path: &str) -> Result<WorkspaceSnapshotGraph> {
let mut file = File::open(path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;

Ok(serialize::from_bytes(&bytes)?)
}

fn main() -> Result<()> {
let args: Vec<String> = env::args().take(3).map(Into::into).collect();
let to_rebase_path = args.get(1).expect(USAGE);
let onto_path = args.get(2).expect(USAGE);

let mut to_rebase_graph = load_snapshot_graph(&to_rebase_path)?;
let onto_graph = load_snapshot_graph(&onto_path)?;

let to_rebase_vector_clock_id = to_rebase_graph
.max_recently_seen_clock_id(None)
.expect("Unable to find a vector clock id in to_rebase");
let onto_vector_clock_id = onto_graph
.max_recently_seen_clock_id(None)
.expect("Unable to find a vector clock id in onto");

let conflicts_and_updates = to_rebase_graph.detect_conflicts_and_updates(
to_rebase_vector_clock_id,
&onto_graph,
onto_vector_clock_id,
)?;

dbg!(&conflicts_and_updates);

to_rebase_graph.perform_updates(
to_rebase_vector_clock_id,
&onto_graph,
&conflicts_and_updates.updates,
)?;

Ok(())
}
17 changes: 15 additions & 2 deletions lib/dal/src/workspace_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

pub mod conflict;
pub mod content_address;
pub mod edge_info;
pub mod edge_weight;
pub mod graph;
pub mod lamport_clock;
Expand Down Expand Up @@ -392,7 +393,7 @@ impl WorkspaceSnapshot {
Ok(self.working_copy().await.root())
}

#[instrument(name = "workspace_snapshot.working_copy", level = "debug", skip_all)]
#[instrument(name = "workspace_snapshot.working_copy", level = "trace", skip_all)]
async fn working_copy(&self) -> SnapshotReadGuard<'_> {
SnapshotReadGuard {
read_only_graph: self.read_only_graph.clone(),
Expand All @@ -402,7 +403,7 @@ impl WorkspaceSnapshot {

#[instrument(
name = "workspace_snapshot.working_copy_mut",
level = "debug",
level = "trace",
skip_all
)]
async fn working_copy_mut(&self) -> SnapshotWriteGuard<'_> {
Expand All @@ -416,6 +417,14 @@ impl WorkspaceSnapshot {
}
}

/// Discard all changes in the working copy and return the graph to the
/// version fetched from the layer db
pub async fn revert(&self) {
if self.working_copy.read().await.is_some() {
*self.working_copy.write().await = None
}
}

pub async fn serialized(&self) -> WorkspaceSnapshotResult<Vec<u8>> {
Ok(si_layer_cache::db::serialize::to_vec(
&self.working_copy().await.clone(),
Expand Down Expand Up @@ -751,6 +760,10 @@ impl WorkspaceSnapshot {
}
}

pub async fn write_to_disk(&self, file_suffix: &str) {
self.working_copy().await.write_to_disk(file_suffix);
}

#[instrument(
name = "workspace_snapshot.get_node_index_by_id",
level = "debug",
Expand Down
26 changes: 26 additions & 0 deletions lib/dal/src/workspace_snapshot/edge_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use petgraph::prelude::{EdgeIndex, NodeIndex};

use crate::{EdgeWeightKindDiscriminants, WorkspaceSnapshotGraph};

#[derive(Debug, Copy, Clone)]
pub struct EdgeInfo {
pub source_node_index: NodeIndex,
pub target_node_index: NodeIndex,
pub edge_kind: EdgeWeightKindDiscriminants,
pub edge_index: EdgeIndex,
}

impl EdgeInfo {
pub fn simple_debug_string(&self, graph: &WorkspaceSnapshotGraph) -> String {
let source = graph.graph().node_weight(self.source_node_index);
let target = graph.graph().node_weight(self.target_node_index);
let edge_weight = graph.graph().edge_weight(self.edge_index);

format!(
"{:?} -- {:?} --> {:?}",
source.map(|s| s.id()),
edge_weight.map(|e| e.kind()),
target.map(|t| t.id())
)
}
}
108 changes: 66 additions & 42 deletions lib/dal/src/workspace_snapshot/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ use petgraph::{algo, prelude::*, visit::DfsEvent};
use serde::{Deserialize, Serialize};
use si_events::merkle_tree_hash::MerkleTreeHash;
use si_events::{ulid::Ulid, ContentHash};
use si_layer_cache::db::serialize;
use thiserror::Error;

use telemetry::prelude::*;
use ulid::Generator;

use crate::workspace_snapshot::content_address::ContentAddressDiscriminants;
use crate::workspace_snapshot::edge_info::EdgeInfo;
use crate::workspace_snapshot::node_weight::category_node_weight::CategoryNodeKind;
use crate::workspace_snapshot::node_weight::{CategoryNodeWeight, NodeWeightDiscriminants};
use crate::workspace_snapshot::vector_clock::{HasVectorClocks, VectorClockId};
Expand Down Expand Up @@ -75,6 +77,8 @@ pub enum WorkspaceSnapshotGraphError {
NodeWithIdNotFound(Ulid),
#[error("No Prop found for NodeIndex {0:?}")]
NoPropFound(NodeIndex),
#[error("Ordering node {0} has id in its order for non-existent node {1}")]
OrderingNodeHasNonexistentNodeInOrder(Ulid, Ulid),
#[error("NodeIndex has too many Ordering children: {0:?}")]
TooManyOrderingForNode(NodeIndex),
#[error("NodeIndex has too many Prop children: {0:?}")]
Expand Down Expand Up @@ -1162,7 +1166,21 @@ impl WorkspaceSnapshotGraph {
})
}

#[allow(dead_code, clippy::disallowed_methods)]
/// Write the graph to disk. This *MAY PANIC*. Use only for debugging.
#[allow(clippy::disallowed_methods)]
pub fn write_to_disk(&self, file_suffix: &str) {
let serialized = serialize::to_vec(self).expect("unable to serialize");
let filename = format!("{}-{}", Ulid::new(), file_suffix);

let home_env = std::env::var("HOME").expect("No HOME environment variable set");
let home = std::path::Path::new(&home_env);
let mut file = File::create(home.join(&filename)).expect("could not create file");
file.write_all(&serialized).expect("could not write file");

println!("Wrote graph to {}", home.join(&filename).display());
}

#[allow(clippy::disallowed_methods)]
pub fn tiny_dot_to_file(&self, suffix: Option<&str>) {
let suffix = suffix.unwrap_or("dot");
// NOTE(nick): copy the output and execute this on macOS. It will create a file in the
Expand Down Expand Up @@ -1339,14 +1357,6 @@ impl WorkspaceSnapshotGraph {
pub target_lineage: Ulid,
}

#[derive(Debug, Copy, Clone)]
struct EdgeInfo {
pub source_node_index: NodeIndex,
pub target_node_index: NodeIndex,
pub edge_kind: EdgeWeightKindDiscriminants,
pub edge_index: EdgeIndex,
}

let mut updates = Vec::new();
let mut conflicts = Vec::new();

Expand Down Expand Up @@ -1419,11 +1429,11 @@ impl WorkspaceSnapshotGraph {
let to_rebase_root_node = self.get_node_weight(self.root())?;
let onto_root_node = onto.get_node_weight(onto.root())?;

let to_rebase_last_saw_onto = to_rebase_root_node
let to_rebase_last_seen_by_onto_vector_clock_id = to_rebase_root_node
.vector_clock_recently_seen()
.entry_for(onto_vector_clock_id);

let onto_last_saw_to_rebase = onto_root_node
let onto_last_seen_by_to_rebase_vector_clock_id = onto_root_node
.vector_clock_recently_seen()
.entry_for(to_rebase_vector_clock_id);

Expand All @@ -1450,9 +1460,13 @@ impl WorkspaceSnapshotGraph {
.entry_for(to_rebase_vector_clock_id)
{
let maybe_seen_by_onto_at =
if let Some(onto_last_saw_to_rebase) = onto_last_saw_to_rebase {
if edge_first_seen_by_to_rebase <= onto_last_saw_to_rebase {
Some(onto_last_saw_to_rebase)
if let Some(onto_last_seen_by_to_rebase_vector_clock_id) =
onto_last_seen_by_to_rebase_vector_clock_id
{
if edge_first_seen_by_to_rebase
<= onto_last_seen_by_to_rebase_vector_clock_id
{
Some(onto_last_seen_by_to_rebase_vector_clock_id)
} else {
None
}
Expand Down Expand Up @@ -1526,9 +1540,13 @@ impl WorkspaceSnapshotGraph {
.entry_for(onto_vector_clock_id)
{
let maybe_seen_by_to_rebase_at =
if let Some(to_rebase_last_saw_onto) = to_rebase_last_saw_onto {
if edge_weight_first_seen_by_onto <= to_rebase_last_saw_onto {
Some(to_rebase_last_saw_onto)
if let Some(to_rebase_last_seen_by_onto_vector_clock_id) =
to_rebase_last_seen_by_onto_vector_clock_id
{
if edge_weight_first_seen_by_onto
<= to_rebase_last_seen_by_onto_vector_clock_id
{
Some(to_rebase_last_seen_by_onto_vector_clock_id)
} else {
None
}
Expand Down Expand Up @@ -1568,28 +1586,34 @@ impl WorkspaceSnapshotGraph {
container: container_node_information,
removed_item: removed_item_node_information,
});
// If the vector clock ids are identical, the seen
// timestamps become a little less informative. To
// determine whether this is a new edge or one that
// should stay removed in to_rebase, we simply determine
// whether to_rebase or onto was more recently "seen"
// overall
} else if to_rebase_vector_clock_id == onto_vector_clock_id
&& to_rebase_last_seen_by_onto_vector_clock_id
< onto_last_seen_by_to_rebase_vector_clock_id
{
updates.push(Update::new_edge(
self,
onto,
to_rebase_container_index,
only_onto_edge_info,
onto_edge_weight.to_owned(),
)?);
}
}
None => {
let source_node_weight = self.get_node_weight(to_rebase_container_index)?;
let destination_node_weight =
onto.get_node_weight(only_onto_edge_info.target_node_index)?;
let source_node_information = NodeInformation {
index: to_rebase_container_index,
id: source_node_weight.id().into(),
node_weight_kind: source_node_weight.into(),
};
let destination_node_information = NodeInformation {
index: only_onto_edge_info.target_node_index,
id: destination_node_weight.id().into(),
node_weight_kind: destination_node_weight.into(),
};

updates.push(Update::NewEdge {
source: source_node_information,
destination: destination_node_information,
edge_weight: onto_edge_weight.clone(),
})
// This edge has never been seen by to_rebase
updates.push(Update::new_edge(
self,
onto,
to_rebase_container_index,
only_onto_edge_info,
onto_edge_weight.to_owned(),
)?);
}
}
}
Expand Down Expand Up @@ -1813,12 +1837,12 @@ impl WorkspaceSnapshotGraph {
self.get_node_weight(container_ordering_index)?
{
for ordered_id in ordering_weight.order() {
ordered_child_indexes.push(
*self
.node_index_by_id
.get(ordered_id)
.ok_or(WorkspaceSnapshotGraphError::NodeWithIdNotFound(*ordered_id))?,
);
ordered_child_indexes.push(*self.node_index_by_id.get(ordered_id).ok_or(
WorkspaceSnapshotGraphError::OrderingNodeHasNonexistentNodeInOrder(
ordering_weight.id(),
*ordered_id,
),
)?);
}
}
} else {
Expand Down
41 changes: 39 additions & 2 deletions lib/dal/src/workspace_snapshot/update.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use petgraph::prelude::NodeIndex;
use si_events::ulid::Ulid;

use serde::{Deserialize, Serialize};
use strum::EnumDiscriminants;

use super::edge_weight::{EdgeWeight, EdgeWeightKindDiscriminants};
use crate::workspace_snapshot::NodeInformation;
use super::{
edge_info::EdgeInfo,
edge_weight::{EdgeWeight, EdgeWeightKindDiscriminants},
graph::WorkspaceSnapshotGraphResult,
};
use crate::{workspace_snapshot::NodeInformation, WorkspaceSnapshotGraph};

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, EnumDiscriminants)]
pub enum Update {
Expand Down Expand Up @@ -34,3 +39,35 @@ pub enum Update {
onto_category_id: Ulid,
},
}

impl Update {
/// Produce a NewEdge update from an edge that exists only in the "onto" graph
pub fn new_edge(
to_rebase_graph: &WorkspaceSnapshotGraph,
onto_graph: &WorkspaceSnapshotGraph,
to_rebase_source_index: NodeIndex,
only_onto_edge_info: &EdgeInfo,
only_onto_edge_weight: EdgeWeight,
) -> WorkspaceSnapshotGraphResult<Self> {
let source_node_weight = to_rebase_graph.get_node_weight(to_rebase_source_index)?;
let target_node_weight =
onto_graph.get_node_weight(only_onto_edge_info.target_node_index)?;

let source = NodeInformation {
index: to_rebase_source_index,
id: source_node_weight.id().into(),
node_weight_kind: source_node_weight.into(),
};
let destination = NodeInformation {
index: only_onto_edge_info.target_node_index,
id: target_node_weight.id().into(),
node_weight_kind: target_node_weight.into(),
};

Ok(Update::NewEdge {
source,
destination,
edge_weight: only_onto_edge_weight,
})
}
}

0 comments on commit 5059dfd

Please sign in to comment.