Temporal Parallel Child Workflows
Temporal provides helpful primitives called Workflows and Activities for orchestrating processes. A common pattern I’ve found useful is the ability to run multiple “child workflows” in parallel from a single “parent” workflow.
Let’s say we have the following activity and workflow (imports omitted for brevity)
Activity code
@dataclassclass MyGoodActivityArgs: arg1: str arg2: str
@dataclassclass MyGoodActivityResult: arg1: str arg2: str random_val: float
@activity.defnasync def my_good_activity(args: MyGoodActivityArgs) -> MyGoodActivityResult: activity.logger.info("Running my good activity") return MyGoodActivityResult( arg1=args.arg1, arg2=args.arg2, random_val=random.random(), )
Workflow code
@dataclassclass MyGoodWorkflowArgs: arg1: str arg2: str
@dataclassclass MyGoodWorkflowResult: result: MyGoodActivityResult
@workflow.defnclass MyGoodWorkflow: @workflow.run async def run(self, args: MyGoodWorkflowArgs) -> MyGoodWorkflowResult: result: MyGoodActivityResult = await workflow.execute_activity( my_good_activity, MyGoodActivityArgs( arg1=f"activity arg1: {args.arg1}", arg2=f"activity arg2: {args.arg2}", ), schedule_to_close_timeout=timedelta(seconds=60), task_queue=TASK_QUEUE, ) return MyGoodWorkflowResult(result=result)
We can now write another Temporal workflow that starts multiple instances of MyGoodWorkflow
.
@dataclassclass BatchWorkflowArgs: inputs: List[MyGoodWorkflowArgs]
@dataclassclass BatchWorkflowResult: results: List[MyGoodWorkflowResult]
@workflow.defnclass MyBatchWorkflow: @workflow.run async def run(self, args: BatchWorkflowArgs) -> BatchWorkflowResult: # Create a list to store the workflow futures workflow_futures = []
# Create child workflow stubs for each set of args for i, workflow_args in enumerate(args.inputs): future = await workflow.start_child_workflow( MyGoodWorkflow, workflow_args, id=f"my_good_workflow_{i}", task_queue=TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=3), ) workflow_futures.append(future)
# Wait for all workflows to complete and collect results results: List[MyGoodWorkflowResult] = await asyncio.gather(*workflow_futures)
workflow.logger.info( f"Completed {len(workflow_futures)} MyGoodWorkflow workflows" )
return BatchWorkflowResult(results)
The main parts to focus on are await workflow.start_child_workflow
, which creates a future that we can await
on to get the workflow result.
Instead of calling await
explicitly, we use await asyncio.gather(*workflow_futures)
, which gets us all the results together.
In this example, we use dataclass
es as the inputs and outputs to all activities and workflows to allow us to evolve the inputs and outputs without breaking the data contract between the workflow components[1].
Finally, we can run MyBatchWorkflow
.
async def main() -> BatchWorkflowResult: client = await Client.connect("localhost:7233")
batch_args = BatchWorkflowArgs( inputs=[ MyGoodWorkflowArgs(arg1="workflow arg1", arg2="workflow arg2"), MyGoodWorkflowArgs(arg1="workflow arg3", arg2="workflow arg4"), ] )
result: BatchWorkflowResult = await client.execute_workflow( MyBatchWorkflow.run, batch_args, id=str(uuid.uuid4()), task_queue=TASK_QUEUE, )
print(f"Batch workflow completed with results: {result}") return result
if __name__ == "__main__": asyncio.run(main())
When we run this script, we get
python -m run_workflowBatch workflow completed with results: BatchWorkflowResult(results=[MyGoodWorkflowResult(result=MyGoodActivityResult(arg1='activity arg1: workflow arg1', arg2='activity arg2: workflow arg2', random_val=0.8471340083778467)), MyGoodWorkflowResult(result=MyGoodActivityResult(arg1='activity arg1: workflow arg3', arg2='activity arg2: workflow arg4', random_val=0.21755659662944782))])
In this specific example, we collect all the results of the child workflows after they complete running.
Keep in mind this could lead to a large workflow history if run on a very large list of inputs
with big payloads.
A possible workaround if you encounter an issue with large workflow history using this approach is to write the results from each workflow to blob store and don’t return them from the workflows themselves, which avoids putting them into the workflow history.
You can find working code for this toy example here.
Footnotes
-
For example, if an activity returned
str
, we would struggle to add an additional return parameter without changing the return type. We can get in front of this issue by always returning classes. ↩
Recommended
Using Multiple Temporal Task Queues
Temporal gives you flexibility to define different task queues to route workflows and activities to specific workers. When a worker starts up, it is...
Importing Activities for a Temporal Workflow in Python
A spot where I slipped up in trying to adopt Temporal in an existing Python project and then again in starting a new Python project was in defining a...