Workflow basics - Rust SDK
How to develop a Workflow
Workflows are the fundamental unit of a Temporal Application, and it all starts with the development of a Workflow Definition.
In the Temporal Rust SDK programming model, a Workflow Definition comprises a Workflow struct and associated methods decorated with macros.
A Workflow is defined by:
- A struct that holds the Workflow state
- An
#[init]method that initializes the Workflow (optional) - A
#[run]method that contains the main Workflow logic - Optional
#[signal],#[query], and#[update]methods for external interaction
use temporalio_macros::{workflow, workflow_methods};
use temporalio_sdk::{WorkflowResult, workflow::WorkflowContextView};
#[workflow]
pub struct GreetingWorkflow {
name: String,
}
#[workflow_methods]
impl GreetingWorkflow {
#[init]
fn new(_ctx: &WorkflowContextView, name: String) -> Self {
Self { name }
}
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
let name = ctx.state(|s| s.name.clone());
Ok(format!("Hello, {}!", name))
}
}
The #[workflow] macro marks the struct as a Workflow.
The #[workflow_methods] macro is applied to the impl block containing the Workflow methods.
Workflow struct
The Workflow struct holds the state of your Workflow Execution. This state is persisted and recovered during replays. All fields in a Workflow struct should be serializable.
Workflow initialization
The #[init] method is optional and is called when the Workflow first starts. It receives the initial Workflow input parameters and initializes the Workflow struct:
#[init]
fn new(_ctx: &WorkflowContextView, name: String, age: u32) -> Self {
Self {
name,
age,
started_at: Instant::now(),
}
}
The #[init] method receives a WorkflowContextView, which provides read-only access to Workflow execution information.
Run method
The #[run] method is required and contains the main Workflow logic. It:
- Must be
async - Receives a mutable
WorkflowContext<Self> - Returns
WorkflowResult<T>where T is the Workflow return type - Executes exactly once per Workflow Execution (with determinism)
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
// Execute activities, timers, child workflows, etc.
let result = ctx.activity(
MyActivities::process,
input,
ActivityOptions::default(),
).await?;
Ok(result)
}
Define Workflow parameters
How to define Workflow parameters using the Temporal Rust SDK.
Temporal Workflows may have any number of custom parameters. However, we strongly recommend that objects are used as parameters, so that the object's individual fields may be altered without breaking the signature of the Workflow. All Workflow Definition parameters must be serializable.
A method annotated with #[init] can have any number of parameters. We recommend passing a single struct that contains all the input fields:
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
pub struct ProcessingInput {
pub data: Vec<String>,
pub timeout_seconds: u32,
}
#[workflow]
pub struct ProcessingWorkflow {
data: Vec<String>,
timeout_seconds: u32,
}
#[workflow_methods]
impl ProcessingWorkflow {
#[init]
fn new(_ctx: &WorkflowContextView, input: ProcessingInput) -> Self {
Self {
data: input.data,
timeout_seconds: input.timeout_seconds,
}
}
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
// Use the initialized state
Ok("Processing complete".to_string())
}
}
All Workflow input should be serializable by serde.
Define Workflow return parameters
How to define Workflow return parameters using the Temporal Rust SDK.
Workflow return values must also be serializable. Returning results, returning errors, or throwing exceptions is fairly idiomatic in each language that is supported. However, Temporal APIs that must be used to get the result of a Workflow Execution will only ever receive one of either the result or the error.
The return type of a Workflow is WorkflowResult<T> where T implements Serialize. Success is represented by Ok(value) and failure by Err(...):
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<ProcessingResult> {
// Can return a complex result type
let result = ProcessingResult {
status: "completed".to_string(),
records_processed: 100,
};
Ok(result)
}
Customize your Workflow Type
How to customize your Workflow Type using the Temporal Rust SDK.
Workflows have a Type that is referred to as the Workflow name. By default, the Workflow type is the name of the Workflow struct. You can customize it by providing a name parameter to the #[workflow] macro:
#[workflow(name = "my-custom-workflow")]
pub struct GreetingWorkflow {
name: String,
}
The Workflow Type defaults to the struct name if not specified. For example, this Workflow would have the type GreetingWorkflow:
#[workflow]
pub struct GreetingWorkflow {
// ...
}
Workflow logic requirements
Workflow logic is constrained by deterministic execution requirements.
Workflow code must be deterministic because the Temporal Server may replay your Workflow to reconstruct its state. This means:
Don't use nondeterministic functions
- No direct system time access - use
ctx.workflow_time()instead ofSystemTime::now() - No random number generation - use
ctx.random()instead - No external I/O (network, filesystem, etc.) - perform these in Activities instead
- No UUID generation via random means - the SDK doesn't have a direct UUID function, but you can use Activities for non-deterministic operations
Don't use nondeterministic concurrency
- Don't use
tokio::spawndirectly - it introduces nondeterministic scheduling - Don't use
tokio::select!- usectx.select!()instead - Don't use
tokio::time::sleep- usectx.timer()instead
Use Workflow-safe primitives
The Rust SDK provides:
ctx.timer()- Wait for a durationctx.wait_condition(closure)- Wait until a condition is trueworkflows::select!- Deterministic select statementctx.activity()- Execute Activitiesctx.local_activity()- Execute local Activitiesctx.child_workflow()- Execute child Workflowsctx.cancelled()- Check if Workflow is cancelled
use std::time::Duration;
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
// Good - deterministic timer
ctx.timer(Duration::from_secs(10)).await;
// Good - deterministic wait for condition
ctx.wait_condition(|s| s.values.len() >= 3).await;
// Bad - nondeterministic sleep
// tokio::time::sleep(Duration::from_secs(10)).await;
// Bad - nondeterministic time
// SystemTime::now()
Ok("Done".to_string())
}
Access Workflow State
Use ctx.state() for read-only access and ctx.state_mut() for mutable access to your Workflow state:
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
// Read-only access
let name = ctx.state(|s| s.name.clone());
// Mutable access (for signal handlers or update handlers)
// Available in sync methods
Ok(name)
}
In sync signal and update handlers, you can mutate state directly via &mut self.
Workflow return types
The #[run] method must return WorkflowResult<T>. This is a type alias for Result<T, WorkflowExecution Error>.
For errors, use a WorkflowExecutionError:
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
if some_validation_fails {
return Err(WorkflowExecutionError::new("validation_failed", "Input is invalid"));
}
Ok("Success".to_string())
}
Workflow errors will cause the Workflow Execution to fail and the error details will be available to clients.