2.4. The Task Contract

Every plugin task must satisfy a strict contract so that Ngenea Hub’s DAG engine can schedule it, pass data, receive results, and update the UI.

2.4.1. Task Registration

Your function must be registered as a Celery task:

@app.task(bind=True, name="dynamo.custom.<your_task_name>")

Parameter

Requirement

bind=True

Required. Gives access to self for task metadata (e.g., self.name, self.request.id).

name

Required. Must follow the pattern dynamo.custom.<your_task_name>. This exact string is used to reference the task in workflow definitions.

2.4.2. Function Signature

def my_task(self, *args, paths=None, jobid=None, **kwargs):

Parameter

Type

Description

self

Task instance

Provided by bind=True. Use self.name for the task name, self.request.id for the Celery task ID.

*args

positional

Accept but generally ignore. Present for forward compatibility.

paths

List[Dict]

The list of path objects to process. Each dict has at minimum a "path" key with an absolute filesystem path. May also include "size", "state", "message", and other keys.

jobid

int

The numeric ID of the parent Job in Ngenea Hub’s database.

**kwargs

keyword args

Required. Captures workflow fields and other arguments passed by Ngenea Hub.

Warning

Omitting **kwargs from the function signature will cause the task to raise a TypeError if Ngenea Hub passes any unexpected keyword arguments (e.g., new system parameters in a future version, or workflow fields). This will fail the entire task, not just individual paths, and halt the workflow chain. Always include **kwargs even if you do not currently use it.

To pass specific keyword arguments to a task, include these in the workflow definition as hardcoded values or runtime fields (see Workflows).

2.4.3. Path Object Structure

Each element in the paths list is a dictionary. The guaranteed key is "path". Other keys are optional:

{
    "path": "/mmfs1/data/project/file1.dat",   # Always present — absolute path
    "size": 264321,                              # File size in bytes (optional)
    "state": "created",                          # File state from discovery (optional)
    "message": "...",                            # Status message from upstream (optional)
    "site": "london"                             # Origin site (optional)
}

2.4.4. Return Payload

Your task must return a dictionary with a specific structure. The recommended approach is to use the FileActionStatus helper class. The raw structure is documented here for reference.

2.4.5. Supported File States

State

Meaning

Forwarded to next task?

processed

File was successfully processed.

Yes

skipped

File was already in the desired state; no action taken.

Yes

failures

An error occurred processing this file.

No

aborted

File was intentionally excluded and should not be processed by subsequent tasks.

No

inprogress

File is still being processed asynchronously. Rarely used.

No

Danger

Any input path not categorised into a state (processed, skipped, failures, or aborted) will silently disappear from the job. It will not be forwarded to downstream tasks, not appear in reports, and not be retried. In workflows involving file migration or deletion, this means files can be permanently orphaned — never moved, never cleaned up, and never flagged as an error. Always iterate over every path in the input list and assign each one a state.

2.4.6. Complete Pattern

from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name

@app.task(bind=True, name="dynamo.custom.my_task")
def my_task(self, *args, paths=None, jobid=None, **kwargs):
    status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())

    for item in paths:
        try:
            do_something(item["path"])
            status.add("processed", item["path"])
        except FileNotFoundError:
            status.add("skipped", item["path"], message="File no longer exists")
        except Exception as e:
            status.add("failures", item["path"], message=str(e))

    return status.dump()

2.4.7. What Happens If the Contract Is Violated

Problem

Symptom in Ngenea Hub

Task doesn’t return status.dump() or equivalent dict

Task stuck in Pending state; traceback in Worker logs.

Missing "jobid" in return payload

Job progress does not update.

"summary" counts don’t match "details" lists

Inaccurate statistics in the Job Progress UI.

Unhandled exception in task function

Task marked as Failed; downstream tasks may not run. See warning below.

Paths not accounted for

“Missing” files in the job report.

Danger

An unhandled exception aborts the entire task, not just the current file. All remaining input paths — and all downstream tasks in the chain — will not be processed. In a multi-step workflow (e.g., migrate then delete source), this can leave the system in an inconsistent state where some files have been migrated but cleanup was never performed. Always catch exceptions per-path inside your processing loop and categorize failed paths with status.add("failures", ...).