5.3.3. Custom Tasks¶
In addition to the predefined tasks, Ngenea Worker plugins allow custom tasks to be defined.
Once created, custom tasks can be included in Custom Workflows.
Custom tasks must accept the same standard arguments, and return a payload in the same format, as predefined tasks do.
5.3.3.1. The Structure of a Task¶
5.3.3.1.1. Arguments¶
Tasks must accept the following keyword arguments:
| name | description | example | default | 
|---|---|---|---|
| 
 | ID of the job the task is associated with. | 
 | 
 | 
| 
 | List of paths (may be files, folders or symlinks) to be processed. Each path is given as a dict with a “path” key, and can also have other keys such as “size” and “message”. | 
 | 
 | 
To pass specific keyword arguments to a task, include these in the workflow definition as hardcoded values or Runtime fields.
In addition, tasks should accept arbitrary other keyword arguments via a **kwargs parameter, but do not need to process them.
Here is an example function signature, from the definition of the predefined task dynamo.tasks.migrate:
def migrate(self, *args, jobid=None, paths=None, skip_modified_during_migrate=False, **kwargs)
5.3.3.1.2. Return Payload¶
Ngenea Hub expects task results to be returned in a specific format.
There is a class FileActionStatus that the worker code uses as a convenience for constructing the return payload. This may be used as an alternative to constructing the return payload from scratch.
Using the FileActionStatus class
The class is imported like this:
from arcapix.dynamo.server.status import FileActionStatus
A status object is instantiated like this:
status = FileActionStatus(taskname, input_paths, jobid, queue_name)
- tasknameis the name of the current task
- input_pathsis the list of path objects as passed to the task in the- pathsparameter
- jobidis the job’s numerical id. This is provided as an input to the task.
- queue_nameis the queue the task is currently running on. This can be retrieved with- get_current_task_queue_name()from- arcapix.dynamo.server.queue
For each path operated on by the task, call the .add(key, path) method on the status object, where key is the state of the file:
status.add("processed", "/mmfs1/data/my-fileset/file1")
The status object keeps track of files as they are added. At the completion of the task, return the results like this:
return status.dump()
Constructing a return payload from scratch
Tasks return a dict containing at least these 3 keys:
{
    "jobid": <job_id>,
    "paths": <list of path objects that were processed or skipped>,
    "status": {
        "task": <task_name>,
        "input_paths": <list of path objects input to the task>,
        "input_total": <number of paths input to the task>,
        "summary": <dict giving number of files keyed by state>,
        "details": <dict giving list of file objects keyed by state>,
        "started": <timezone-aware datetime for when the task started>
    }
}
The paths key lists the paths to be passed to the next task in the workflow.
Here is an example payload:
{
    "jobid": 100,
    "paths": [{"path": "/mmfs1/data/my-fileset/file1"}],
    "status": {
        "task": "my-plugin",
        "input_paths": [{"path": "/mmfs1/data/my-fileset/file1"}, {"path": "/mmfs1/data/my-fileset/file2"}],
        "input_total": 2,
        "summary": {
            "failures": 1,
            "processed": 1,
            "skipped": 0
        },
        "details": {
            "failures": [{"path": "/mmfs1/data/my-fileset/file2", "message": ["Reason xyz for the failure"]}],
            "processed": [{"path": "/mmfs1/data/my-fileset/file1"}],
            "skipped": []
        },
        "started": "2023-04-25T14:56:31.253043",
    }
}
Supported file states
These are the supported file states for tasks that perform operations on files:
- “processed”: file was successfully processed 
- “skipped”: file was not processed because they were already in the desired state 
- “aborted”: file was not processed and should not be processed by subsequent tasks in the workflow 
- “failures”: file could not be processed because of an error 
- “inprogress”: file is still being processed (this state is rarely used) 
Generally, the files that should be passed to the next task are those that were processed or skipped.
Streaming task results
This applies to delete and move handlers in a snapdiff workflow.
Rather than returning, results for delete and move handler in snapdiff workflows must be streamed back to Ngenea Hub
In this case Ngenea Hub will pass keyword argument stream_results=True to the task to indicate that it expects streamed results
Important
If your task will be used as a move or delete handler in a snapdiff workflow,
it must accept stream_results either as an explicit parameter or via **kwargs
A StreamTaskResults class is provided to facilitate this.
from arcapix.dynamo.server.streamstatus import StreamTaskResults
The class behaves the same as a FileActionStatus
if stream_results:
    status = StreamTaskResults(taskname, input_paths, jobid, queue_name, taskid)
else:
    status = FileActionStatus(taskname, input_paths, jobid, queue_name)
The parameters are the same as FileActionStatus, described above, with the addition on taskid.
The task must be defined with bind=True. The taskid can then be retrieved with self.request.id
For each path operated on by the task, call the .add(key, path) method on the status object, where key is the state of the file:
status.add("processed", "/mmfs1/data/my-fileset/file1")
The status object keeps track of files as they are added, and in the background sends them to Ngenea Hub in batches.
At the completion of the task, return the results like this:
return status.dump()
Calling status.dump() also flushes any outstanding files to Ngenea Hub.
5.3.3.2. Support for Streaming Paths from API¶
This applies to delete and move handlers in a snapdiff workflow.
To fetch the streaming paths, the function expand_paths from the module arcapix.dynamo.server.utils.inputs has to be used like below.
   from arcapix.dynamo.server.utils.inputs import expand_paths
Warning
expand_paths provides a generator function fetching pages of paths on demand. Developers must ensure that memory starvation
of Hub is negated by following the principles of generator functions thereby avoiding storing large number of values in memory concurrently.
5.3.3.3. Queues¶
There are various functions available to get information about the celery queue a custom task is running on
Note
The way queues are handled, and the way celery queue names are formatted, has changed in Ngenea Hub 2.4.0
For more information on Ngenea Hub queues see Queues
from arcapix.dynamo.server.queue import ...
- get_current_task_queue_name()- returns the name of the celery queue the current task is running on
- get_current_site()- returns the name of the site the current task is running on
- get_formatted_queue_name(site, queue, function)- returns the celery queue name for a given- site,- queue, and- function
- ParsedQueueName- a class representing a celery queue name, parsed into- site,- queue, and- function
parsed_queue = ParsedQueueName.current_task_queue()
print(parsed_queue.site)  # london
print(parsed_queue.queue)  # highpriority
print(parsed_queue.function)  # custom
print(str(parsed_queue))  # london.highpriority#custom
Important
The format of celery queue names is subject to change. You should always use the above functions, rather than manually parsing and formatting queue names.
5.3.3.4. Dynamic Children and Job Completion¶
When creating plugins that may spawn dynamic child tasks, it’s important to understand how the hub tracks job completion.
5.3.3.4.1. The will_spawn_children Field¶
The will_spawn_children field is automatically managed by the DynamicSubmitter class when you use it to spawn dynamic child tasks. This field indicates whether your plugin has spawned any dynamic children (processing or traversal tasks) and is crucial for the hub’s job completion tracking.
5.3.3.4.2. Using DynamicSubmitter (Recommended)¶
For plugins that spawn dynamic children, use the DynamicSubmitter class which automatically handles the will_spawn_children field:
from arcapix.dynamo.server.dags.status import DynamicSubmitter
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name
@app.task(bind=True, name="dynamo.custom.dynamic_plugin")
def dynamic_plugin(self, *args, paths: List = None, jobid: int = None, **kwargs):
    status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
    # Create a submitter to handle dynamic child task spawning
    submitter = DynamicSubmitter(
        status=status,
        settings={},  # Your dynamic template settings here
        task_id=self.request.id,
        chunk_size=40,
        chunk_gb=1,
        auto_chunk_size=False,
    )
    # Process paths and potentially spawn children
    for path in paths:
        submitter.add("files", path["path"])
    # The submitter automatically handles will_spawn_children flag
    # based on whether any children were actually spawned
    return submitter.dump()
5.3.3.4.3. Customizing will_spawn_children Logic¶
You can override the will_spawn_children() method in your submitter to implement custom logic:
class MyCustomSubmitter(DynamicSubmitter):
    def will_spawn_children(self) -> bool:
        """
        Override to implement custom logic for determining if children will be spawned.
        Default behavior: returns True if any dynamic children were submitted.
        Customize this method to implement your own business logic.
        """
        # Example: only count certain types of children
        # return len(self._chunks) > 0  # Only count processing children
        # Example: add thresholds
        # return self._spawned_children and len(self._status.details["files"]) > 100
        # Default: return True if any children were spawned
        return self._spawned_children
@app.task(bind=True, name="dynamo.custom.custom_plugin")
def custom_plugin(self, *args, paths: List = None, jobid: int = None, **kwargs):
    status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
    submitter = MyCustomSubmitter(
        status=status,
        settings={},
        task_id=self.request.id,
        chunk_size=40,
        chunk_gb=1,
        auto_chunk_size=False,
    )
    # Your plugin logic here...
    return submitter.dump()
5.3.3.4.4. Simple Plugins (No Dynamic Children)¶
For plugins that don’t spawn dynamic children, simply return status.dump() without any submitter:
@app.task(bind=True, name="dynamo.custom.simple_plugin")
def simple_plugin(self, *args, paths: List = None, jobid: int = None, **kwargs):
    status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
    # Process all paths without spawning children
    for path in paths:
        status.add("processed", path["path"])
    return status.dump()
5.3.3.4.5. Why This Matters¶
The will_spawn_children field helps the hub track when your plugin has finished spawning dynamic children. Without proper tracking, a job may not report the correct status, such as reporting “ONGOING” after all tasks have completed, or reporting as completed while tasks are still in flight.
5.3.3.4.6. Backward Compatibility¶
The DynamicSubmitter automatically handles the will_spawn_children field, so you don’t need to manually set it. This ensures consistent behavior across all plugins that spawn dynamic children.