Skip to content

Commit

Permalink
fix(workflows): add sql retries, improve history diverged errors
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jul 19, 2024
1 parent c31f3c2 commit 881762b
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 221 deletions.
101 changes: 89 additions & 12 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ impl WorkflowCtx {
activity_id: &ActivityId,
create_ts: i64,
) -> WorkflowResult<A::Output> {
tracing::debug!(id=%self.workflow_id, activity_name=%A::NAME, "running activity");

let ctx = ActivityCtx::new(
self.workflow_id,
self.db.clone(),
Expand Down Expand Up @@ -372,11 +374,19 @@ impl WorkflowCtx {
let id = if let Some(event) = event {
// Validate history is consistent
let Event::SubWorkflow(sub_workflow) = event else {
return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw);
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found sub workflow {}",
I::Workflow::NAME
)))
.map_err(GlobalError::raw);
};

if sub_workflow.sub_workflow_name != I::Workflow::NAME {
return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw);
if sub_workflow.name != I::Workflow::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found sub_workflow {}",
I::Workflow::NAME
)))
.map_err(GlobalError::raw);
}

tracing::debug!(
Expand Down Expand Up @@ -592,15 +602,25 @@ impl WorkflowCtx {
let output = if let Some(event) = event {
// Validate history is consistent
let Event::Activity(activity) = event else {
return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw);
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found activity {}",
activity_id.name
)))
.map_err(GlobalError::raw);
};

if activity.activity_id != activity_id {
return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw);
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found activity {}",
activity_id.name
)))
.map_err(GlobalError::raw);
}

// Activity succeeded
if let Some(output) = activity.parse_output().map_err(GlobalError::raw)? {
tracing::debug!(id=%self.workflow_id, activity_name=%I::Activity::NAME, "replaying activity");

output
}
// Activity failed, retry
Expand Down Expand Up @@ -677,9 +697,21 @@ impl WorkflowCtx {
let signal_id = if let Some(event) = event {
// Validate history is consistent
let Event::SignalSend(signal) = event else {
return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw);
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal send {}",
T::NAME
)))
.map_err(GlobalError::raw);
};

if signal.name != T::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal send {}",
T::NAME
)))
.map_err(GlobalError::raw);
}

tracing::debug!(id=%self.workflow_id, signal_name=%signal.name, signal_id=%signal.signal_id, "replaying signal dispatch");

signal.signal_id
Expand Down Expand Up @@ -728,9 +760,21 @@ impl WorkflowCtx {
let signal_id = if let Some(event) = event {
// Validate history is consistent
let Event::SignalSend(signal) = event else {
return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw);
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal send {}",
T::NAME
)))
.map_err(GlobalError::raw);
};

if signal.name != T::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal send {}",
T::NAME
)))
.map_err(GlobalError::raw);
}

tracing::debug!(id=%self.workflow_id, signal_name=%signal.name, signal_id=%signal.signal_id, "replaying tagged signal dispatch");

signal.signal_id
Expand Down Expand Up @@ -777,7 +821,10 @@ impl WorkflowCtx {
let signal = if let Some(event) = event {
// Validate history is consistent
let Event::Signal(signal) = event else {
return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw);
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal"
)))
.map_err(GlobalError::raw);
};

tracing::debug!(name=%self.name, id=%self.workflow_id, signal_name=%signal.name, "replaying signal");
Expand Down Expand Up @@ -827,7 +874,10 @@ impl WorkflowCtx {
let signal = if let Some(event) = event {
// Validate history is consistent
let Event::Signal(signal) = event else {
return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw);
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal"
)))
.map_err(GlobalError::raw);
};

tracing::debug!(name=%self.name, id=%self.workflow_id, signal_name=%signal.name, "replaying signal");
Expand Down Expand Up @@ -876,7 +926,10 @@ impl WorkflowCtx {

// Validate history is consistent
let Event::Signal(signal) = event else {
return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw);
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal"
)))
.map_err(GlobalError::raw);
};

Some(T::parse(&signal.name, signal.body.clone()).map_err(GlobalError::raw)?)
Expand Down Expand Up @@ -908,9 +961,21 @@ impl WorkflowCtx {
if let Some(event) = event {
// Validate history is consistent
let Event::MessageSend(msg) = event else {
return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw);
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found message send {}",
M::NAME
)))
.map_err(GlobalError::raw);
};

if msg.name != M::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found message send {}",
M::NAME
)))
.map_err(GlobalError::raw);
}

tracing::debug!(id=%self.workflow_id, msg_name=%msg.name, "replaying message dispatch");
}
// Send message
Expand Down Expand Up @@ -954,9 +1019,21 @@ impl WorkflowCtx {
if let Some(event) = event {
// Validate history is consistent
let Event::MessageSend(msg) = event else {
return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw);
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found message send {}",
M::NAME
)))
.map_err(GlobalError::raw);
};

if msg.name != M::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found message send {}",
M::NAME
)))
.map_err(GlobalError::raw);
}

tracing::debug!(id=%self.workflow_id, msg_name=%msg.name, "replaying message dispatch");
}
// Send message
Expand Down
Loading

0 comments on commit 881762b

Please sign in to comment.