Message passing - Rust SDK
This page shows how to implement Signals, Queries, and Updates in your Workflows using the Rust SDK. These are the primary ways to communicate with running Workflows.
Signals
A Signal is a message sent to a workflow that can modify its state or behavior. Signals are one-way asynchronous messages.
Define a Signal Handler
Use the #[signal] macro on a method to create a Signal handler:
#[workflow]
pub struct WorkflowWithSignals {
name: String,
count: i32,
}
#[workflow_methods]
impl WorkflowWithSignals {
#[signal]
fn set_name(&mut self, new_name: String) {
self.name = new_name;
}
#[signal]
fn increment(&mut self, amount: i32) {
self.count += amount;
}
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
// Wait for signal
ctx.wait_condition(|s| s.count > 0).await;
let name = ctx.state(|s| s.name.clone());
let count = ctx.state(|s| s.count);
Ok(format!("{}: {}", name, count))
}
}
Signal handlers can be either async or synchronous. Synchronous signal handlers receive &mut self allowing direct mutation of state.
Send a Signal
Use the client to send a signal to a running workflow:
let handle = client.get_workflow_handle::<WorkflowWithSignals>("workflow-id");
// Send a signal
handle.signal(
WorkflowWithSignals::set_name,
"new-name".to_string(),
SignalOptions::default(),
).await?;
Queries
A Query is a read-only request to a workflow that returns information about its current state. Queries must be synchronous and cannot modify workflow state.
Define a Query Handler
Use the #[query] macro on a synchronous method to create a Query handler:
#[workflow]
pub struct WorkflowWithQueries {
items: Vec<String>,
status: String,
}
#[workflow_methods]
impl WorkflowWithQueries {
#[query]
fn get_items(&self) -> Vec<String> {
self.items.clone()
}
#[query]
fn get_status(&self) -> String {
self.status.clone()
}
#[query(name = "item_count")]
fn count_items(&self) -> i32 {
self.items.len() as i32
}
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
// Workflow logic
Ok("done".to_string())
}
}
Query handlers:
- Must be synchronous (not
async) - Must not modify state (receive
&self) - Can use custom names via the
nameparameter - Return their result immediately
Execute a Query
Use the client to query a running workflow:
let handle = client.get_workflow_handle::<WorkflowWithQueries>("workflow-id");
// Execute a query
let items = handle.query(
WorkflowWithQueries::get_items,
(),
QueryOptions::default(),
).await?;
println!("Items: {:?}", items);
Updates
An Update is a synchronous message to a Workflow that can modify its state and return a result. Updates are more powerful than Signals because they guarantee that the update handler executes and returns a result.
Define an Update Handler
Use the #[update] macro on a method to create an Update handler:
#[workflow]
pub struct WorkflowWithUpdates {
queue: Vec<String>,
}
#[workflow_methods]
impl WorkflowWithUpdates {
#[update]
async fn add_to_queue(&mut self, item: String) -> Vec<String> {
self.queue.push(item);
self.queue.clone()
}
#[update]
fn clear_queue(&mut self) -> i32 {
let count = self.queue.len() as i32;
self.queue.clear();
count
}
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
// Workflow logic
Ok("done".to_string())
}
}
Update handlers can be:
- Synchronous (receive
&mut self) - Asynchronous (receive
&mut selfor use context inside) - Can modify state
- Must return a result
Execute an Update
Use the client to execute an update on a running workflow:
let handle = client.get_workflow_handle::<WorkflowWithUpdates>("workflow-id");
// Execute an update synchronously
let result = handle.execute_update(
WorkflowWithUpdates::add_to_queue,
"new-item".to_string(),
UpdateOptions::default(),
).await?;
println!("Queue now has: {:?}", result);
Asynchronous Updates
You can also start an update and wait for it asynchronously:
// Start an update
let update_handle = handle.start_update(
WorkflowWithUpdates::add_to_queue,
"item".to_string(),
StartUpdateOptions::builder()
.wait_for_stage(WorkflowUpdateWaitStage::Accepted)
.build(),
).await?;
// Wait for result later
let result = update_handle.get_result().await?;
Complete Message Passing Example
Here's a complete example showing all three types of message passing:
#[workflow]
pub struct ProcessingWorkflow {
items: Vec<String>,
paused: bool,
processed_count: i32,
}
#[workflow_methods]
impl ProcessingWorkflow {
#[init]
fn new(_ctx: &WorkflowContextView) -> Self {
Self {
items: Vec::new(),
paused: false,
processed_count: 0,
}
}
// Signal: add items to process
#[signal]
fn add_item(&mut self, item: String) {
self.items.push(item);
}
// Signal: pause/resume processing
#[signal]
fn set_paused(&mut self, paused: bool) {
self.paused = paused;
}
// Query: get current status
#[query]
fn get_status(&self) -> String {
format!(
"Items: {}, Processed: {}, Paused: {}",
self.items.len(),
self.processed_count,
self.paused
)
}
// Update: process all items and return result
#[update]
async fn process_all(
&mut self,
ctx: &mut WorkflowContext<Self>,
) -> i32 {
let mut count = 0;
while !self.items.is_empty() {
if self.paused {
ctx.wait_condition(|s| !s.paused).await;
}
let item = self.items.remove(0);
// Process item
ctx.activity(
ProcessingActivities::process,
item,
ActivityOptions {
start_to_close_timeout: Some(Duration::from_secs(60)),
..Default::default()
},
).await.ok();
count += 1;
self.processed_count = count;
}
count
}
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
// Wait for update to be called
ctx.wait_condition(|s| s.processed_count > 0).await;
Ok(format!("Processed {} items", ctx.state(|s| s.processed_count)))
}
}
Best Practices
- Use Signals for notifications: When you just need to inform the Workflow of something
- Use Queries for status: To check Workflow progress without modifying it
- Use Updates for side effects: When you need to modify Workflow state and get a result
- Keep handlers fast: Don't do time-consuming work in message handlers
- Use custom names: With the
nameparameter for clarity in logs and monitoring - Handle concurrent messages: Signals, Queries, and Updates can arrive concurrently
For more information, see the Workflow Concepts documentation.