Skip to content

Commit

Permalink
fix(workflows): fix invalid event history graph
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jul 19, 2024
1 parent 881762b commit 033253f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 43 deletions.
13 changes: 9 additions & 4 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ impl WorkflowCtx {
}
}
Err(err) => {
tracing::warn!(name=%self.name, id=%self.workflow_id, ?err, "workflow error");

// Retry the workflow if its recoverable
let deadline = if err.is_recoverable() {
Some(rivet_util::timestamp::now() + RETRY_TIMEOUT.as_millis() as i64)
Expand All @@ -209,10 +207,16 @@ impl WorkflowCtx {
// be retried when a signal is published
let wake_signals = err.signals();

// This sub workflow come from a `wait_for_workflow` call on a workflow that did not
// This sub workflow comes from a `wait_for_workflow` call on a workflow that did not
// finish. This workflow will be retried when the sub workflow completes
let wake_sub_workflow = err.sub_workflow();

if deadline.is_some() || !wake_signals.is_empty() || wake_sub_workflow.is_some() {
tracing::info!(name=%self.name, id=%self.workflow_id, ?err, "workflow sleeping");
} else {
tracing::error!(name=%self.name, id=%self.workflow_id, ?err, "workflow error");
}

let err_str = err.to_string();

let mut retries = 0;
Expand Down Expand Up @@ -392,6 +396,7 @@ impl WorkflowCtx {
tracing::debug!(
name=%self.name,
id=%self.workflow_id,
sub_workflow_name=%sub_workflow.name,
sub_workflow_id=%sub_workflow.sub_workflow_id,
"replaying workflow dispatch"
);
Expand Down Expand Up @@ -620,7 +625,7 @@ impl WorkflowCtx {
// Activity succeeded
if let Some(output) = activity.parse_output().map_err(GlobalError::raw)? {
tracing::debug!(id=%self.workflow_id, activity_name=%I::Activity::NAME, "replaying activity");

output
}
// Activity failed, retry
Expand Down
86 changes: 47 additions & 39 deletions lib/chirp-workflow/core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,16 @@ pub async fn sleep_until_ts(ts: i64) {
}
}

/// Takes activity, signal, and sub workflow events (each with their own location) and combines them via enum
/// into a hashmap of the following structure:
/// Takes all workflow events (each with their own location) and combines them via enum into a hashmap of the
/// following structure:
///
/// Given the location [1, 2, 3], 3 is the index and [1, 2] is the root location
///
/// HashMap {
/// [1, 2]: [
/// example signal event,
/// example activity event,
/// example sub workflow event,
/// example activity event (this is [1, 2, 3])
/// ],
/// }
Expand All @@ -74,6 +75,7 @@ pub fn combine_events(
msg_send_events: Vec<MessageSendEventRow>,
sub_workflow_events: Vec<SubWorkflowEventRow>,
) -> WorkflowResult<Vec<PulledWorkflow>> {
// Map workflow rows by workflow id
let mut workflows_by_id = workflow_rows
.into_iter()
.map(|row| {
Expand All @@ -89,8 +91,6 @@ pub fn combine_events(
.expect("unreachable, workflow for event not found");
let (root_location, idx) = split_location(&event.location);

insert_placeholder(events_by_location, &event.location, idx);

events_by_location
.entry(root_location)
.or_default()
Expand All @@ -103,8 +103,6 @@ pub fn combine_events(
.expect("unreachable, workflow for event not found");
let (root_location, idx) = split_location(&event.location);

insert_placeholder(events_by_location, &event.location, idx);

events_by_location
.entry(root_location)
.or_default()
Expand All @@ -117,8 +115,6 @@ pub fn combine_events(
.expect("unreachable, workflow for event not found");
let (root_location, idx) = split_location(&event.location);

insert_placeholder(events_by_location, &event.location, idx);

events_by_location
.entry(root_location)
.or_default()
Expand All @@ -131,8 +127,6 @@ pub fn combine_events(
.expect("unreachable, workflow for event not found");
let (root_location, idx) = split_location(&event.location);

insert_placeholder(events_by_location, &event.location, idx);

events_by_location
.entry(root_location)
.or_default()
Expand All @@ -145,8 +139,6 @@ pub fn combine_events(
.expect("unreachable, workflow for event not found");
let (root_location, idx) = split_location(&event.location);

insert_placeholder(events_by_location, &event.location, idx);

events_by_location
.entry(root_location)
.or_default()
Expand All @@ -156,19 +148,35 @@ pub fn combine_events(
let workflows = workflows_by_id
.into_values()
.map(|(row, mut events_by_location)| {
// TODO(RVT-3754): This involves inserting, sorting, then recollecting into lists and recollecting into a
// hashmap. Could be improved by iterating over both lists simultaneously and sorting each item at a
// time before inserting
// Sort all of the events because we are inserting from two different lists. Both lists are already
// sorted themselves so this should be fairly cheap
// TODO(RVT-3754): This involves inserting, sorting, then recollecting into lists and recollecting
// into a hashmap
// Sort all of the events because we are inserting from two different lists. Both lists are
// already sorted themselves so this should be fairly cheap
for (_, list) in events_by_location.iter_mut() {
list.sort_by_key(|(idx, _)| *idx);
}

// Remove idx from lists
let event_history = events_by_location
.into_iter()
.map(|(k, v)| (k, v.into_iter().map(|(_, v)| v).collect()))
.map(|(k, events)| {
let mut expected_idx = 0;

// Check for missing indexes and insert a `Branch` placeholder event for each missing spot
let events = events
.into_iter()
.flat_map(|(idx, v)| {
let offset = (idx - expected_idx) as usize;
expected_idx = idx + 1;

std::iter::repeat_with(|| Event::Branch)
.take(offset)
.chain(std::iter::once(v))
})
.collect();

(k, events)
})
.collect();

PulledWorkflow {
Expand Down Expand Up @@ -197,27 +205,27 @@ fn split_location(location: &[i64]) -> (Location, i64) {
)
}

// Insert placeholder record into parent location list (ex. for [4, 0] insert into the [] list at
// the 4th index)
fn insert_placeholder(
events_by_location: &mut HashMap<Location, Vec<(i64, Event)>>,
location: &[i64],
idx: i64,
) {
if idx == 0 && location.len() > 1 {
let parent_location = location
.iter()
.take(location.len().saturating_sub(2))
.map(|x| *x as usize)
.collect::<Location>();
let parent_idx = *location.get(location.len().saturating_sub(2)).unwrap();

events_by_location
.entry(parent_location)
.or_default()
.push((parent_idx, Event::Branch));
}
}
// // Insert placeholder record into parent location list (ex. for the location [4, 0], insert placeholder into
// // the [] list at the 4th index)
// fn insert_placeholder(
// events_by_location: &mut HashMap<Location, Vec<(i64, Event)>>,
// location: &[i64],
// idx: i64,
// ) {
// if idx == 0 && location.len() > 1 {
// let parent_location = location
// .iter()
// .take(location.len().saturating_sub(2))
// .map(|x| *x as usize)
// .collect::<Location>();
// let parent_idx = *location.get(location.len().saturating_sub(2)).unwrap();

// events_by_location
// .entry(parent_location)
// .or_default()
// .push((parent_idx, Event::Branch));
// }
// }

pub fn inject_fault() -> GlobalResult<()> {
if rand::thread_rng().gen_range(0..100) < FAULT_RATE {
Expand Down

0 comments on commit 033253f

Please sign in to comment.