4.3.3. Custom Tasks

Warning

This feature is currently in alpha

In addition to the predefined tasks, Ngenea Worker plugins allow custom tasks to be defined.

Once created, custom tasks can be included in Custom Workflows.

Custom tasks must accept the same standard arguments, and return a payload in the same format, as predefined tasks do.

4.3.3.1. The Structure of a Task

4.3.3.1.1. Arguments

Tasks must accept the following keyword arguments:

name

description

example

default

jobid

ID of the job the task is associated with.

100

None

paths

List of paths (may be files, folders or symlinks) to be processed. Each path is given as a dict with a "path" key, and can also have other keys such as "size" and "message".

[{"path": "/mmfs1/data/my-fileset/file1", "size": 264321}, {"path": "/mmfs1/data/my-fileset/file2", "size": 15}]

None

To pass specific keyword arguments to a task, include these in the workflow definition as hardcoded values or Runtime fields.

In addition, tasks should accept arbitrary other keyword arguments via a **kwargs parameter, but do not need to process them.

Here is an example function signature, from the definition of the predefined task dynamo.tasks.migrate:

def migrate(self, *args, jobid=None, paths=None, skip_modified_during_migrate=False, **kwargs)

4.3.3.1.2. Return Payload

Ngenea Hub expects task results to be returned in a specific format.

There is a class FileActionStatus that the worker code uses as a convenience for constructing the return payload. This may be used as an alternative to constructing the return payload from scratch.

Using the FileActionStatus class

The class is imported like this:

from arcapix.dynamo.server.status import FileActionStatus

A status object is instantiated like this:

status = FileActionStatus(taskname, input_paths, jobid, queue_name)

where input_paths is the list of path objects as passed to the task in the paths parameter, and queue_name is "<site>-custom" for worker plugins.

For each path operated on by the task, call the .add(key, path) method on the status object, where key is the state of the file:

status.add("processed", "/mmfs1/data/my-fileset/file1")

The status object keeps track of files as they are added. At the completion of the task, return the results like this:

return status.dump()

Constructing a return payload from scratch

Tasks return a dict containing at least these 3 keys:

{
    "jobid": <job_id>,
    "paths": <list of path objects that were processed or skipped>,
    "status": {
        "task": <task_name>,
        "input_paths": <list of path objects input to the task>,
        "input_total": <number of paths input to the task>,
        "summary": <dict giving number of files keyed by state>,
        "details": <dict giving list of file objects keyed by state>,
        "started": <timezone-aware datetime for when the task started>
    }
}

The paths key lists the paths to be passed to the next task in the workflow.

Here is an example payload:

{
    "jobid": 100,
    "paths": [{"path": "/mmfs1/data/my-fileset/file1"}],
    "status": {
        "task": "my-plugin",
        "input_paths": [{"path": "/mmfs1/data/my-fileset/file1"}, {"path": "/mmfs1/data/my-fileset/file2"}],
        "input_total": 2,
        "summary": {
            "failures": 1,
            "processed": 1,
            "skipped": 0
        },
        "details": {
            "failures": [{"path": "/mmfs1/data/my-fileset/file2", "message": ["Reason xyz for the failure"]}],
            "processed": [{"path": "/mmfs1/data/my-fileset/file1"}],
            "skipped": []
        },
        "started": "2023-04-25T14:56:31.253043",
    }
}

Supported file states

These are the supported file states for tasks that perform operations on files:

  • "processed": file was successfully processed

  • "skipped": file was not processed because they were already in the desired state

  • "aborted": file was not processed and should not be processed by subsequent tasks in the workflow

  • "failures": file could not be processed because of an error

  • "inprogress": file is still being processed (this state is rarely used)

Generally, the files that should be passed to the next task are those that were processed or skipped.

4.3.3.2. Support for Streaming Paths from API

This applies to delete and move handlers in a snapdiff workflow.

To fetch the streaming paths, the function expand_paths from the module arcapix.dynamo.server.utils.inputs has to be used like below.

   from arcapix.dynamo.server.utils.inputs import expand_paths

Warning

expand_paths provides a generator function fetching pages of paths on demand. Developers must ensure that memory starvation of Hub is negated by following the principles of generator functions thereby avoiding storing large number of values in memory concurrently.