5.3.3. Custom Tasks¶
Warning
This feature is currently in alpha
In addition to the predefined tasks, Ngenea Worker plugins allow custom tasks to be defined.
Once created, custom tasks can be included in Custom Workflows.
Custom tasks must accept the same standard arguments, and return a payload in the same format, as predefined tasks do.
5.3.3.1. The Structure of a Task¶
5.3.3.1.1. Arguments¶
Tasks must accept the following keyword arguments:
| name | description | example | default | 
|---|---|---|---|
| 
 | ID of the job the task is associated with. | 
 | 
 | 
| 
 | List of paths (may be files, folders or symlinks) to be processed. Each path is given as a dict with a “path” key, and can also have other keys such as “size” and “message”. | 
 | 
 | 
To pass specific keyword arguments to a task, include these in the workflow definition as hardcoded values or Runtime fields.
In addition, tasks should accept arbitrary other keyword arguments via a **kwargs parameter, but do not need to process them.
Here is an example function signature, from the definition of the predefined task dynamo.tasks.migrate:
def migrate(self, *args, jobid=None, paths=None, skip_modified_during_migrate=False, **kwargs)
5.3.3.1.2. Return Payload¶
Ngenea Hub expects task results to be returned in a specific format.
There is a class FileActionStatus that the worker code uses as a convenience for constructing the return payload. This may be used as an alternative to constructing the return payload from scratch.
Using the FileActionStatus class
The class is imported like this:
from arcapix.dynamo.server.status import FileActionStatus
A status object is instantiated like this:
status = FileActionStatus(taskname, input_paths, jobid, queue_name)
- tasknameis the name of the current task
- input_pathsis the list of path objects as passed to the task in the- pathsparameter
- jobidis the job’s numerical id. This is provided as an input to the task.
- queue_nameis the queue the task is currently running on. This can be retrieved with- get_current_task_queue_name()from- arcapix.dynamo.server.queue
For each path operated on by the task, call the .add(key, path) method on the status object, where key is the state of the file:
status.add("processed", "/mmfs1/data/my-fileset/file1")
The status object keeps track of files as they are added. At the completion of the task, return the results like this:
return status.dump()
Constructing a return payload from scratch
Tasks return a dict containing at least these 3 keys:
{
    "jobid": <job_id>,
    "paths": <list of path objects that were processed or skipped>,
    "status": {
        "task": <task_name>,
        "input_paths": <list of path objects input to the task>,
        "input_total": <number of paths input to the task>,
        "summary": <dict giving number of files keyed by state>,
        "details": <dict giving list of file objects keyed by state>,
        "started": <timezone-aware datetime for when the task started>
    }
}
The paths key lists the paths to be passed to the next task in the workflow.
Here is an example payload:
{
    "jobid": 100,
    "paths": [{"path": "/mmfs1/data/my-fileset/file1"}],
    "status": {
        "task": "my-plugin",
        "input_paths": [{"path": "/mmfs1/data/my-fileset/file1"}, {"path": "/mmfs1/data/my-fileset/file2"}],
        "input_total": 2,
        "summary": {
            "failures": 1,
            "processed": 1,
            "skipped": 0
        },
        "details": {
            "failures": [{"path": "/mmfs1/data/my-fileset/file2", "message": ["Reason xyz for the failure"]}],
            "processed": [{"path": "/mmfs1/data/my-fileset/file1"}],
            "skipped": []
        },
        "started": "2023-04-25T14:56:31.253043",
    }
}
Supported file states
These are the supported file states for tasks that perform operations on files:
- “processed”: file was successfully processed 
- “skipped”: file was not processed because they were already in the desired state 
- “aborted”: file was not processed and should not be processed by subsequent tasks in the workflow 
- “failures”: file could not be processed because of an error 
- “inprogress”: file is still being processed (this state is rarely used) 
Generally, the files that should be passed to the next task are those that were processed or skipped.
Streaming task results
This applies to delete and move handlers in a snapdiff workflow.
Rather than returning, results for delete and move handler in snapdiff workflows must be streamed back to Ngenea Hub
In this case Ngenea Hub will pass keyword argument stream_results=True to the task to indicate that it expects streamed results
Important
If your task will be used as a move or delete handler in a snapdiff workflow,
it must accept stream_results either as an explicit parameter or via **kwargs
A StreamTaskResults class is provided to facilitate this.
from arcapix.dynamo.server.streamstatus import StreamTaskResults
The class behaves the same as a FileActionStatus
if stream_results:
    status = StreamTaskResults(taskname, input_paths, jobid, queue_name, taskid)
else:
    status = FileActionStatus(taskname, input_paths, jobid, queue_name)
The parameters are the same as FileActionStatus, described above, with the addition on taskid.
The task must be defined with bind=True. The taskid can then be retrieved with self.request.id
For each path operated on by the task, call the .add(key, path) method on the status object, where key is the state of the file:
status.add("processed", "/mmfs1/data/my-fileset/file1")
The status object keeps track of files as they are added, and in the background sends them to Ngenea Hub in batches.
At the completion of the task, return the results like this:
return status.dump()
Calling status.dump() also flushes any outstanding files to Ngenea Hub.
5.3.3.2. Support for Streaming Paths from API¶
This applies to delete and move handlers in a snapdiff workflow.
To fetch the streaming paths, the function expand_paths from the module arcapix.dynamo.server.utils.inputs has to be used like below.
   from arcapix.dynamo.server.utils.inputs import expand_paths
Warning
expand_paths provides a generator function fetching pages of paths on demand. Developers must ensure that memory starvation
of Hub is negated by following the principles of generator functions thereby avoiding storing large number of values in memory concurrently.
5.3.3.3. Queues¶
There are various functions available to get information about the celery queue a custom task is running on
Note
The way queues are handled, and the way celery queue names are formatted, has changed in Ngenea Hub 2.4.0
For more information on Ngenea Hub queues see Queues
from arcapix.dynamo.server.queue import ...
- get_current_task_queue_name()- returns the name of the celery queue the current task is running on
- get_current_site()- returns the name of the site the current task is running on
- get_formatted_queue_name(site, queue, function)- returns the celery queue name for a given- site,- queue, and- function
- ParsedQueueName- a class representing a celery queue name, parsed into- site,- queue, and- function
parsed_queue = ParsedQueueName.current_task_queue()
print(parsed_queue.site)  # london
print(parsed_queue.queue)  # highpriority
print(parsed_queue.function)  # custom
print(str(parsed_queue))  # london.highpriority#custom
Important
The format of celery queue names is subject to change. You should always use the above functions, rather than manually parsing and formatting queue names.