4.3.3. Custom Tasks

Warning

This feature is currently in alpha

In addition to the predefined tasks, Ngenea Worker plugins allow custom tasks to be defined.

Once created, custom tasks can be included in Custom Workflows.

Custom tasks must accept the same standard arguments, and return a payload in the same format, as predefined tasks do.

4.3.3.1. The Structure of a Task

4.3.3.1.1. Arguments

Tasks must accept the following keyword arguments:

name

description

example

default

jobid

ID of the job the task is associated with.

100

None

paths

List of paths (may be files, folders or symlinks) to be processed. Each path is given as a dict with a "path" key, and can also have other keys such as "size" and "message".

[{"path": "/mmfs1/data/my-fileset/file1", "size": 264321}, {"path": "/mmfs1/data/my-fileset/file2", "size": 15}]

None

To pass specific keyword arguments to a task, include these in the workflow definition as hardcoded values or Runtime fields.

In addition, tasks should accept arbitrary other keyword arguments via a **kwargs parameter, but do not need to process them.

Here is an example function signature, from the definition of the predefined task dynamo.tasks.migrate:

def migrate(self, *args, jobid=None, paths=None, skip_modified_during_migrate=False, **kwargs)

4.3.3.1.2. Return Payload

Ngenea Hub expects task results to be returned in a specific format.

There is a class FileActionStatus that the worker code uses as a convenience for constructing the return payload. This may be used as an alternative to constructing the return payload from scratch.

Using the FileActionStatus class

The class is imported like this:

from arcapix.dynamo.server.status import FileActionStatus

A status object is instantiated like this:

status = FileActionStatus(taskname, input_paths, jobid, queue_name)
  • taskname is the name of the current task

  • input_paths is the list of path objects as passed to the task in the paths parameter

  • jobid is the job's numerical id. This is provided as an input to the task.

  • queue_name is the queue the task is currently running on. This can be retrieved with get_current_task_queue_name() from arcapix.dynamo.server.queue

For each path operated on by the task, call the .add(key, path) method on the status object, where key is the state of the file:

status.add("processed", "/mmfs1/data/my-fileset/file1")

The status object keeps track of files as they are added. At the completion of the task, return the results like this:

return status.dump()

Constructing a return payload from scratch

Tasks return a dict containing at least these 3 keys:

{
    "jobid": <job_id>,
    "paths": <list of path objects that were processed or skipped>,
    "status": {
        "task": <task_name>,
        "input_paths": <list of path objects input to the task>,
        "input_total": <number of paths input to the task>,
        "summary": <dict giving number of files keyed by state>,
        "details": <dict giving list of file objects keyed by state>,
        "started": <timezone-aware datetime for when the task started>
    }
}

The paths key lists the paths to be passed to the next task in the workflow.

Here is an example payload:

{
    "jobid": 100,
    "paths": [{"path": "/mmfs1/data/my-fileset/file1"}],
    "status": {
        "task": "my-plugin",
        "input_paths": [{"path": "/mmfs1/data/my-fileset/file1"}, {"path": "/mmfs1/data/my-fileset/file2"}],
        "input_total": 2,
        "summary": {
            "failures": 1,
            "processed": 1,
            "skipped": 0
        },
        "details": {
            "failures": [{"path": "/mmfs1/data/my-fileset/file2", "message": ["Reason xyz for the failure"]}],
            "processed": [{"path": "/mmfs1/data/my-fileset/file1"}],
            "skipped": []
        },
        "started": "2023-04-25T14:56:31.253043",
    }
}

Supported file states

These are the supported file states for tasks that perform operations on files:

  • "processed": file was successfully processed

  • "skipped": file was not processed because they were already in the desired state

  • "aborted": file was not processed and should not be processed by subsequent tasks in the workflow

  • "failures": file could not be processed because of an error

  • "inprogress": file is still being processed (this state is rarely used)

Generally, the files that should be passed to the next task are those that were processed or skipped.

Streaming task results

This applies to delete and move handlers in a snapdiff workflow.

Rather than returning, results for delete and move handler in snapdiff workflows must be streamed back to Ngenea Hub

In this case Ngenea Hub will pass keyword argument stream_results=True to the task to indicate that it expects streamed results

Important

If your task will be used as a move or delete handler in a snapdiff workflow, it must accept stream_results either as an explicit parameter or via **kwargs

A StreamTaskResults class is provided to facilitate this.

from arcapix.dynamo.server.streamstatus import StreamTaskResults

The class behaves the same as a FileActionStatus

if stream_results:
    status = StreamTaskResults(taskname, input_paths, jobid, queue_name, taskid)
else:
    status = FileActionStatus(taskname, input_paths, jobid, queue_name)

The parameters are the same as FileActionStatus, described above, with the addition on taskid. The task must be defined with bind=True. The taskid can then be retrieved with self.request.id

For each path operated on by the task, call the .add(key, path) method on the status object, where key is the state of the file:

status.add("processed", "/mmfs1/data/my-fileset/file1")

The status object keeps track of files as they are added, and in the background sends them to Ngenea Hub in batches.

At the completion of the task, return the results like this:

return status.dump()

Calling status.dump() also flushes any outstanding files to Ngenea Hub.

4.3.3.2. Support for Streaming Paths from API

This applies to delete and move handlers in a snapdiff workflow.

To fetch the streaming paths, the function expand_paths from the module arcapix.dynamo.server.utils.inputs has to be used like below.

   from arcapix.dynamo.server.utils.inputs import expand_paths

Warning

expand_paths provides a generator function fetching pages of paths on demand. Developers must ensure that memory starvation of Hub is negated by following the principles of generator functions thereby avoiding storing large number of values in memory concurrently.

4.3.3.3. Queues

There are various functions available to get information about the celery queue a custom task is running on

Note

The way queues are handled, and the way celery queue names are formatted, has changed in Ngenea Hub 2.4.0

For more information on Ngenea Hub queues see Queues

from arcapix.dynamo.server.queue import ...
  • get_current_task_queue_name() - returns the name of the celery queue the current task is running on

  • get_current_site() - returns the name of the site the current task is running on

  • get_formatted_queue_name(site, queue, function) - returns the celery queue name for a given site, queue, and function

  • ParsedQueueName - a class representing a celery queue name, parsed into site, queue, and function

parsed_queue = ParsedQueueName.current_task_queue()

print(parsed_queue.site)  # london
print(parsed_queue.queue)  # highpriority
print(parsed_queue.function)  # custom

print(str(parsed_queue))  # london.highpriority#custom

Important

The format of celery queue names is subject to change. You should always use the above functions, rather than manually parsing and formatting queue names.