2.4. The Task Contract¶
Every plugin task must satisfy a strict contract so that Ngenea Hub’s DAG engine can schedule it, pass data, receive results, and update the UI.
2.4.1. Task Registration¶
Your function must be registered as a Celery task:
@app.task(bind=True, name="dynamo.custom.<your_task_name>")
Parameter |
Requirement |
|---|---|
|
Required. Gives access to |
|
Required. Must follow the pattern |
2.4.2. Function Signature¶
def my_task(self, *args, paths=None, jobid=None, **kwargs):
Parameter |
Type |
Description |
|---|---|---|
|
Task instance |
Provided by |
|
positional |
Accept but generally ignore. Present for forward compatibility. |
|
|
The list of path objects to process. Each dict has at minimum a |
|
|
The numeric ID of the parent Job in Ngenea Hub’s database. |
|
keyword args |
Required. Captures workflow fields and other arguments passed by Ngenea Hub. |
Warning
Omitting **kwargs from the function signature will cause the task to raise a TypeError if Ngenea Hub passes any unexpected keyword arguments (e.g., new system parameters in a future version, or workflow fields). This will fail the entire task, not just individual paths, and halt the workflow chain. Always include **kwargs even if you do not currently use it.
To pass specific keyword arguments to a task, include these in the workflow definition as hardcoded values or runtime fields (see Workflows).
2.4.3. Path Object Structure¶
Each element in the paths list is a dictionary. The guaranteed key is "path". Other keys are optional:
{
"path": "/mmfs1/data/project/file1.dat", # Always present — absolute path
"size": 264321, # File size in bytes (optional)
"state": "created", # File state from discovery (optional)
"message": "...", # Status message from upstream (optional)
"site": "london" # Origin site (optional)
}
2.4.4. Return Payload¶
Your task must return a dictionary with a specific structure. The recommended approach is to use the FileActionStatus helper class. The raw structure is documented here for reference.
2.4.4.1. Using FileActionStatus (Recommended)¶
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name
status = FileActionStatus(
taskname=self.name,
input_paths=paths,
jobid=jobid,
queue_name=get_current_task_queue_name()
)
For each path your task processes, call status.add() with the appropriate state:
# File processed successfully
status.add("processed", "/mmfs1/data/project/file1.dat")
# File skipped (already in desired state)
status.add("skipped", "/mmfs1/data/project/file2.dat")
# File failed with an error message
status.add("failures", "/mmfs1/data/project/file3.dat", message="Permission denied")
# File intentionally excluded from further processing
status.add("aborted", "/mmfs1/data/project/file4.dat", message="Missing required metadata")
You can also add log messages visible in the Ngenea Hub job detail view:
status.add_log("Connected to external API successfully")
Always end your task function with:
return status.dump()
2.4.5. Supported File States¶
State |
Meaning |
Forwarded to next task? |
|---|---|---|
|
File was successfully processed. |
Yes |
|
File was already in the desired state; no action taken. |
Yes |
|
An error occurred processing this file. |
No |
|
File was intentionally excluded and should not be processed by subsequent tasks. |
No |
|
File is still being processed asynchronously. Rarely used. |
No |
Danger
Any input path not categorised into a state (processed, skipped, failures, or aborted) will silently disappear from the job. It will not be forwarded to downstream tasks, not appear in reports, and not be retried. In workflows involving file migration or deletion, this means files can be permanently orphaned — never moved, never cleaned up, and never flagged as an error. Always iterate over every path in the input list and assign each one a state.
2.4.6. Complete Pattern¶
from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name
@app.task(bind=True, name="dynamo.custom.my_task")
def my_task(self, *args, paths=None, jobid=None, **kwargs):
status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
for item in paths:
try:
do_something(item["path"])
status.add("processed", item["path"])
except FileNotFoundError:
status.add("skipped", item["path"], message="File no longer exists")
except Exception as e:
status.add("failures", item["path"], message=str(e))
return status.dump()
2.4.7. What Happens If the Contract Is Violated¶
Problem |
Symptom in Ngenea Hub |
|---|---|
Task doesn’t return |
Task stuck in Pending state; traceback in Worker logs. |
Missing |
Job progress does not update. |
|
Inaccurate statistics in the Job Progress UI. |
Unhandled exception in task function |
Task marked as Failed; downstream tasks may not run. See warning below. |
Paths not accounted for |
“Missing” files in the job report. |
Danger
An unhandled exception aborts the entire task, not just the current file. All remaining input paths — and all downstream tasks in the chain — will not be processed. In a multi-step workflow (e.g., migrate then delete source), this can leave the system in an inconsistent state where some files have been migrated but cleanup was never performed. Always catch exceptions per-path inside your processing loop and categorize failed paths with status.add("failures", ...).