4.3.3. Custom Tasks¶
Warning
This feature is currently in alpha
In addition to the predefined tasks, Ngenea Worker plugins allow custom tasks to be defined.
Once created, custom tasks can be included in Custom Workflows.
Custom tasks must accept the same standard arguments, and return a payload in the same format, as predefined tasks do.
4.3.3.1. The Structure of a Task¶
4.3.3.1.1. Arguments¶
Tasks must accept the following keyword arguments:
name |
description |
example |
default |
---|---|---|---|
|
ID of the job the task is associated with. |
|
|
|
List of paths (may be files, folders or symlinks) to be processed. Each path is given as a dict with a “path” key, and can also have other keys such as “size” and “message”. |
|
|
To pass specific keyword arguments to a task, include these in the workflow definition as hardcoded values or Runtime fields.
In addition, tasks should accept arbitrary other keyword arguments via a **kwargs
parameter, but do not need to process them.
Here is an example function signature, from the definition of the predefined task dynamo.tasks.migrate
:
def migrate(self, *args, jobid=None, paths=None, skip_modified_during_migrate=False, **kwargs)
4.3.3.1.2. Return Payload¶
Ngenea Hub expects task results to be returned in a specific format.
There is a class FileActionStatus
that the worker code uses as a convenience for constructing the return payload. This may be used as an alternative to constructing the return payload from scratch.
Using the FileActionStatus class
The class is imported like this:
from arcapix.dynamo.server.status import FileActionStatus
A status object is instantiated like this:
status = FileActionStatus(taskname, input_paths, jobid, queue_name)
where input_paths
is the list of path objects as passed to the task in the paths
parameter, and queue_name
is "<site>-custom"
for worker plugins.
For each path operated on by the task, call the .add(key, path)
method on the status object, where key
is the state of the file:
status.add("processed", "/mmfs1/data/my-fileset/file1")
The status object keeps track of files as they are added. At the completion of the task, return the results like this:
return status.dump()
Constructing a return payload from scratch
Tasks return a dict containing at least these 3 keys:
{
"jobid": <job_id>,
"paths": <list of path objects that were processed or skipped>,
"status": {
"task": <task_name>,
"input_paths": <list of path objects input to the task>,
"input_total": <number of paths input to the task>,
"summary": <dict giving number of files keyed by state>,
"details": <dict giving list of file objects keyed by state>,
"started": <timezone-aware datetime for when the task started>
}
}
The paths
key lists the paths to be passed to the next task in the workflow.
Here is an example payload:
{
"jobid": 100,
"paths": [{"path": "/mmfs1/data/my-fileset/file1"}],
"status": {
"task": "my-plugin",
"input_paths": [{"path": "/mmfs1/data/my-fileset/file1"}, {"path": "/mmfs1/data/my-fileset/file2"}],
"input_total": 2,
"summary": {
"failures": 1,
"processed": 1,
"skipped": 0
},
"details": {
"failures": [{"path": "/mmfs1/data/my-fileset/file2", "message": ["Reason xyz for the failure"]}],
"processed": [{"path": "/mmfs1/data/my-fileset/file1"}],
"skipped": []
},
"started": "2023-04-25T14:56:31.253043",
}
}
Supported file states
These are the supported file states for tasks that perform operations on files:
“processed”: file was successfully processed
“skipped”: file was not processed because they were already in the desired state
“aborted”: file was not processed and should not be processed by subsequent tasks in the workflow
“failures”: file could not be processed because of an error
“inprogress”: file is still being processed (this state is rarely used)
Generally, the files that should be passed to the next task are those that were processed or skipped.
4.3.3.2. Support for Streaming Paths from API¶
This applies to delete and move handlers in a snapdiff workflow.
To fetch the streaming paths, the function expand_paths
from the module arcapix.dynamo.server.utils.inputs
has to be used like below.
from arcapix.dynamo.server.utils.inputs import expand_paths
Warning
expand_paths
provides a generator function fetching pages of paths on demand. Developers must ensure that memory starvation
of Hub is negated by following the principles of generator functions thereby avoiding storing large number of values in memory concurrently.