4.3.5. SubDAG HelpersΒΆ
Warning
This feature is currently in alpha
Typically, workflows have a fixed definition. However, certain use cases require modifying the workflow at runtime. For instance, a task might need to dynamically batch paths for child tasks to ensure load balancing, or submit different child tasks based on the results of the current task.
Sub-DAGs enable the dynamic definition of workflows. A sub-DAG can consist of a single task or a chain/graph of multiple tasks. Once submitted, the sub-DAG is automatically integrated into the parent job for reporting and job control.
The worker repository provides a helper function called task_template
which allows for defining a single task DAG graph for use with dynamic DAGs.
The function is defined in arcapix.dynamo.server.dags.util
module. An example for defining a task using the DAG helper module is shown below:
from arcapix.dynamo.server.dags.util import task_template, submit_subdag
def submit_stub_task(self, paths, source_site, dest_site, **kwargs):
stub_template = task_template(
"dynamo.tasks.reverse_stub",
kwargs={
"hydrate": False,
"overwrite": True,
},
site=dest_site,
)
submit_subdag(
paths=paths,
task_id=self.request.id,
"dynamocore.tasks.dag.run_subdag",
subgraph_template=stub_template
)
For complex sub-DAGs, the worker repository provides a helper module in arcapix.dynamo.server.dag.helper
. This module allows for defining a chain of tasks using the helper functions like add_task
and map
for use with dynamic DAGs.
An example for defining a chain of tasks using the DAG helper module is shown below:
from arcapix.dynamo.server.dags.helper import Graph, add_task
from arcapix.dynamo.server.dags.util import submit_subdag
def submit_chain_of_dag_tasks(self, paths, source_site, dest_site, **kwargs):
graph = Graph()
xattr = graph.add_task(
"dynamo.tasks.remove_location_xattrs_for_moved",
kwargs={"jobid": jobid},
site=source_site,
)
recall = graph.add_task(
"dynamo.tasks.reverse_stub",
parents=[xattr.id],
kwargs={
"jobid": jobid,
"site": dest_site,
"endpoint": endpoint,
"hydrate": True,
"overwrite": True,
"skip_hash": True,
"extra_flags": ["--skip-check-uuid"],
"batch_size": REVERSE_STUB_BATCH_SIZE,
},
site=dest_site,
)
submit_subdag(
paths=[{"path": i} for i in paths],
task_id=self.request.id,
"dynamocore.tasks.dag.run_subdag",
subgraph_template=graph.to_dict()
)
This example demonstrates how to use the Graph
and add_task
helpers to create a chain of tasks and submit them as a sub-DAG. The submit_subdag
function is used to submit the sub-DAG with the specified paths and task ID, using the dynamically generated task graph template.