2.6. Advanced Topics

This section covers advanced plugin patterns including streaming results, dynamic task spawning, sub-DAG construction, and queue management.

2.6.1. Streaming Task Results

This applies to delete and move handlers in a snapdiff workflow.

Rather than returning results at the end of execution, delete and move handlers in snapdiff workflows must stream results back to Ngenea Hub. When Ngenea Hub expects streaming, it passes stream_results=True as a keyword argument to the task.

Important

If your task will be used as a move or delete handler in a snapdiff workflow, it must accept stream_results either as an explicit parameter or via **kwargs.

Danger

Snapdiff workflows can discover millions of paths. If your task is used as a move or delete handler without implementing StreamTaskResults, all results will accumulate in memory, eventually causing the Worker process to be killed by the OOM killer. This will abort the job mid-execution and may leave the filesystem in an inconsistent state (partially moved or deleted files). Always check for stream_results=True and use StreamTaskResults accordingly.

2.6.1.1. Using StreamTaskResults

A StreamTaskResults class is provided to facilitate streaming:

from arcapix.dynamo.server.streamstatus import StreamTaskResults
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name

@app.task(bind=True, name="dynamo.custom.my_snapdiff_handler")
def my_snapdiff_handler(self, *args, paths=None, jobid=None, stream_results=False, **kwargs):
    queue_name = get_current_task_queue_name()

    if stream_results:
        status = StreamTaskResults(self.name, paths, jobid, queue_name, self.request.id)
    else:
        status = FileActionStatus(self.name, paths, jobid, queue_name)

    for item in paths:
        status.add("processed", item["path"])

    return status.dump()

The StreamTaskResults class behaves the same as FileActionStatus, but sends results to Ngenea Hub in batches in the background. Calling status.dump() flushes any outstanding results.

The additional parameter compared to FileActionStatus is taskid — the Celery task ID, retrieved via self.request.id.

2.6.1.2. Streaming Path Input

For snapdiff workflows, paths may be streamed to your task rather than passed all at once. Use expand_paths to consume them:

from arcapix.dynamo.server.utils.inputs import expand_paths

Danger

Never materialize the expand_paths generator into a list (e.g., list(expand_paths(...)) or storing all values in a collection). Snapdiff workflows may stream millions of paths. Collecting them all into memory will exhaust Worker RAM, crash the process, and leave the job in an incomplete state with potential data inconsistency. Process paths one at a time or in small batches using the generator directly.

2.6.2. Dynamic Children

Some tasks need to spawn child tasks at runtime — for example, to batch paths for load balancing or to submit different tasks based on processing results.

2.6.2.1. Using DynamicSubmitter

The DynamicSubmitter class handles dynamic child task spawning and automatically manages the will_spawn_children flag, which Ngenea Hub uses to track job completion:

from arcapix.dynamo.server.dags.status import DynamicSubmitter
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name

@app.task(bind=True, name="dynamo.custom.dynamic_plugin")
def dynamic_plugin(self, *args, paths=None, jobid=None, **kwargs):
    status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())

    submitter = DynamicSubmitter(
        status=status,
        settings={},
        task_id=self.request.id,
        chunk_size=40,
        chunk_gb=1,
        auto_chunk_size=False,
    )

    for path in paths:
        submitter.add("files", path["path"])

    return submitter.dump()

2.6.2.2. Why will_spawn_children Matters

The will_spawn_children flag tells Ngenea Hub whether your task has spawned dynamic children. Without proper tracking:

  • A job may report as “ONGOING” after all tasks have completed.

  • A job may report as completed while child tasks are still in flight.

Danger

Dynamic child spawning has no built-in limit on the number of tasks created. Setting chunk_size to a very small value (e.g., 1) on a large job, or implementing recursive child spawning without a termination condition, can flood the Celery broker with millions of tasks. This will stall all Workers across all sites — not just the current job — and may require manual broker intervention to recover. Always set reasonable chunk sizes and verify that your spawning logic has a well-defined termination condition.

Warning

If will_spawn_children incorrectly returns False when children have been spawned, Ngenea Hub will mark the job as complete while child tasks are still executing. Downstream automation (policies, notifications, cleanup) may trigger prematurely, acting on incomplete data. Always use DynamicSubmitter rather than implementing child spawning manually.

The DynamicSubmitter handles this automatically. You can override the will_spawn_children() method to implement custom logic:

class MyCustomSubmitter(DynamicSubmitter):
    def will_spawn_children(self) -> bool:
        return self._spawned_children

2.6.3. Sub-DAGs

Sub-DAGs allow you to dynamically define and submit workflows at runtime. A sub-DAG can consist of a single task or a chain of multiple tasks. Once submitted, it is automatically integrated into the parent job for reporting and job control.

2.6.3.1. Single-Task Sub-DAG

Use task_template to define a single-task sub-DAG:

from arcapix.dynamo.server.dags.util import task_template, submit_subdag

def submit_stub_task(self, paths, source_site, dest_site, **kwargs):
    stub_template = task_template(
        "dynamo.tasks.reverse_stub",
        kwargs={
            "hydrate": False,
            "overwrite": True,
        },
        site=dest_site,
    )

    submit_subdag(
        paths,
        self.request.id,
        "dynamocore.tasks.dag.run_subdag",
        subgraph_template=stub_template,
    )

2.6.3.2. Multi-Task Sub-DAG (Chains)

For complex sub-DAGs with chained tasks, use the Graph and add_task helpers from arcapix.dynamo.server.dag.helper:

from arcapix.dynamo.server.dag.helper import Graph, add_task
from arcapix.dynamo.server.dags.util import submit_subdag

def submit_chain(self, paths, jobid, source_site, dest_site, endpoint, **kwargs):
    graph = Graph()

    xattr = graph.add_task(
        "dynamo.tasks.remove_location_xattrs_for_moved",
        kwargs={"jobid": jobid},
        site=source_site,
    )

    recall = graph.add_task(
        "dynamo.tasks.reverse_stub",
        parents=[xattr.id],
        kwargs={
            "jobid": jobid,
            "site": dest_site,
            "endpoint": endpoint,
            "hydrate": True,
            "overwrite": True,
        },
        site=dest_site,
    )

    submit_subdag(
        [{"path": p} for p in paths],
        self.request.id,
        "dynamocore.tasks.dag.run_subdag",
        subgraph_template=graph.to_dict(),
    )

The parents parameter establishes dependencies between tasks in the sub-DAG. In the example above, recall will only run after xattr completes.

Warning

If the parents parameter is omitted or incorrect, sub-DAG tasks will execute in parallel rather than sequentially. For operations that depend on ordering (e.g., removing xattrs before recalling a file), parallel execution can cause data corruption or silent failures. Always verify the dependency chain by confirming each task’s parents list references the correct predecessor task IDs.

2.6.4. Queue Utilities

Several utility functions are available for working with Celery queues in your tasks.

from arcapix.dynamo.server.queue import (
    get_current_task_queue_name,
    get_current_site,
    get_formatted_queue_name,
    ParsedQueueName,
)

Function / Class

Description

get_current_task_queue_name()

Returns the name of the Celery queue the current task is running on.

get_current_site()

Returns the name of the site the current task is running on.

get_formatted_queue_name(site, queue, function)

Returns the Celery queue name for a given site, queue, and function.

ParsedQueueName

A class representing a parsed Celery queue name with site, queue, and function attributes.

Example:

parsed_queue = ParsedQueueName.current_task_queue()

print(parsed_queue.site)      # london
print(parsed_queue.queue)     # highpriority
print(parsed_queue.function)  # custom

print(str(parsed_queue))      # london.highpriority#custom

Important

The format of Celery queue names is subject to change. Always use the above functions rather than manually parsing or formatting queue names.