2.5. Workflows¶
Once your plugin is installed and the Worker restarted, you reference it in workflow definitions on Ngenea Hub. Workflow definitions are created via the REST API at POST /api/workflows/ (see Workflows API). They are not currently editable via the Ngenea Hub UI.
2.5.1. Workflow Structure¶
A workflow definition is a JSON object with these top-level fields:
{
"name": "my_workflow",
"label": "My Workflow",
"icon_classes": ["fa fa-cog fa-stack-2x text-primary"],
"discovery": null,
"discovery_options": null,
"enabled": true,
"visible": true,
"fields": [],
"filter_rules": []
}
Field |
Type |
Description |
|---|---|---|
|
string |
Unique internal identifier (max 100 chars). Used in API calls and policies. |
|
string |
Human-readable name displayed in the UI (max 100 chars). |
|
list[string] |
Font Awesome CSS classes for the workflow icon in the UI. |
|
string or null |
Discovery method: |
|
object or null |
Options for the discovery method (e.g., |
|
bool |
Whether the workflow can be triggered. |
|
bool |
Whether the workflow appears in the UI file browser. |
|
list[object] |
Runtime parameter definitions. See Workflow Fields below. |
|
list[object] |
Rules that match paths and define task chains. See Filter Rules below. |
2.5.2. Filter Rules¶
Filter rules are the core of a workflow definition. Each rule matches a subset of input paths and routes them through an ordered chain of task actions.
{
"type": "all",
"state": "all",
"action": [
{"name": "dynamo.tasks.migrate"},
{"name": "dynamo.custom.my_plugin", "my_param": "value"}
]
}
2.5.2.1. Required Keys¶
Key |
Values |
Description |
|---|---|---|
|
|
Filters paths by filesystem object type. |
|
|
Filters paths by their state (relevant for discovery-based workflows). |
|
list of action objects |
Ordered chain of tasks to execute on the matched paths. |
2.5.2.2. Optional Keys¶
Key |
Type |
Description |
|---|---|---|
|
list of glob strings |
Glob patterns to limit actions to matching paths only. |
|
list of glob strings |
Glob patterns to exclude matching paths from the action. |
|
bool |
If |
|
bool |
If |
2.5.2.3. Action Objects¶
Each action object must have a "name" key with the task’s registered name. It may also include "site" and "queue" keys to control where the task executes. Any other key-value pairs are passed to the task function as keyword arguments.
{
"name": "dynamo.custom.my_plugin",
"site": "london",
"queue": "highpriority",
"destination_path": "/archive/project",
"notify": true
}
Key |
Required |
Description |
|---|---|---|
|
Yes |
The registered task name (e.g., |
|
No |
Target site for task execution. Overrides the job’s default site. |
|
No |
Target queue for task execution. Overrides the job’s default queue. |
Important
site and queue are consumed by the DAG engine and are not passed to your task function. All other key-value pairs arrive in the task’s **kwargs.
Warning
If "site" or "queue" is set incorrectly, the task will execute on a Worker at the wrong location. That Worker may not have access to the target filesystem, causing the task to fail — or worse, it may have access to a different filesystem mounted at the same path, causing file operations to affect the wrong data. Always verify that the target site has access to the paths your workflow will process.
2.5.3. Task Chaining¶
Actions within a rule’s "action" list are executed in order, forming a chain. The output "paths" from one task become the input paths for the next:
"action": [
{"name": "dynamo.tasks.migrate"},
{"name": "dynamo.custom.validate_checksums"},
{"name": "dynamo.custom.email_staff", "staff_members": ["admin@org.com"]}
]
Execution flow:
migrateprocesses the input paths, returns processed/skipped paths.validate_checksumsreceives only the paths thatmigratemarked as processed or skipped.email_staffreceives only the paths thatvalidate_checksumspassed through.
2.5.4. Path Filtering with Include/Exclude¶
Rules can use glob patterns to limit which paths reach the action chain:
{
"type": "file",
"state": "created",
"include": ["/mmfs1/data/project_a/*"],
"exclude": ["*.tmp", "*.log"],
"action": [
{"name": "dynamo.tasks.migrate"},
{"name": "dynamo.custom.my_plugin"}
]
}
A path must match at least one include pattern (if defined) and not match any exclude pattern. If no include is defined, all paths are considered included unless excluded.
Sites can also define global include/exclude patterns. These are appended to per-rule patterns by default. Set "ignore_site_includes": true or "ignore_site_excludes": true on a rule to override this behaviour.
2.5.5. Multiple Rules (Branching)¶
Warning
In discovery workflows, paths can have various states (created, deleted, updated, modified, moved). If your filter rules only cover some states, paths with unmatched states are silently dropped — they will not be processed, reported, or flagged. Add a "state": "default" rule as a catch-all to ensure no paths are silently lost.
You can define multiple filter rules to create branching workflows. A common pattern with discovery workflows:
{
"filter_rules": [
{
"type": "all",
"state": "created",
"action": [
{"name": "dynamo.tasks.migrate"},
{"name": "dynamo.custom.email_staff", "subject": "New files migrated"}
]
},
{
"type": "all",
"state": "deleted",
"action": [
{"name": "dynamo.tasks.delete_paths_from_gpfs"}
]
}
]
}
2.5.6. Referencing Your Plugin¶
The task name you use in workflow definitions must exactly match the name parameter in your @app.task decorator:
# In your plugin code:
@app.task(bind=True, name="dynamo.custom.file_mover")
{"name": "dynamo.custom.file_mover", "destination_path": "/archive"}
2.5.7. Workflow Fields¶
Fields let you define runtime parameters that users provide when triggering a workflow. Field values are passed to your task function as keyword arguments.
2.5.7.1. Defining Fields¶
Fields are defined in the workflow’s "fields" array:
Key |
Type |
Description |
|---|---|---|
|
string |
Required. Internal name — used to reference the field in action definitions and received as a kwarg in your task. |
|
string |
Required. Human-readable label shown in the UI. |
|
string |
Required. One of: |
|
varies |
Optional. Default value if the user doesn’t provide one. |
|
list of objects |
Required if |
2.5.7.2. Field Type Reference¶
Type |
Description |
|---|---|
|
Free-text input. Value arrives as a Python |
|
Numeric input. Value arrives as a Python |
|
Checkbox. Value arrives as a Python |
|
Dropdown menu. Requires a |
|
Multi-value input. Value arrives as a Python |
|
Dropdown populated with configured sites. Value is the site name string. |
|
Dropdown populated with configured queues. Value is the queue name string. |
Warning
String fields used as filesystem paths are passed to your task without validation or sanitization. A user could enter paths like /, ../../../etc, or paths with special characters. Since plugins typically run as root, this can result in file operations targeting any location on the system. Always validate user-supplied paths in your task code: check that they are absolute, within an expected base directory, and do not contain traversal sequences.
2.5.7.3. Referencing Fields in Actions¶
To pass a field’s runtime value to a task, use the *field_name syntax in the action definition:
{
"fields": [
{"name": "dest", "label": "Destination Path", "type": "string"},
{"name": "notify", "label": "Send Notification", "type": "bool", "default": false}
],
"filter_rules": [
{
"type": "all",
"state": "all",
"action": [
{
"name": "dynamo.custom.file_mover",
"destination_path": "*dest"
},
{
"name": "dynamo.custom.email_staff",
"send_notification": "*notify",
"staff_members": ["admin@org.com"]
}
]
}
]
}
The *dest syntax tells Ngenea Hub to substitute the runtime value of the dest field. Hardcoded values (like "staff_members") are passed directly.
Choices field example:
{
"fields": [
{
"name": "compression",
"label": "Compression Mode",
"type": "choices",
"choices": [
{"label": "Fast", "value": "fast"},
{"label": "Balanced", "value": "balanced"},
{"label": "Maximum", "value": "max"}
],
"default": "balanced"
}
],
"filter_rules": [
{
"type": "all",
"state": "all",
"action": [
{
"name": "dynamo.custom.my_plugin",
"compression_mode": "*compression"
}
]
}
]
}
2.5.8. Submitting a Workflow via API¶
Use the following JSON body POST /api/file/workflow to execute a workflow programmatically.:
{
"paths": ["/mmfs1/data/sample_data/file1.dat"],
"site": "london",
"workflow": "my_workflow",
"fields": {
"dest": "/archive/project",
"notify": true
}
}
This endpoint requires authentication, see API Endpoints - Workflows, Automation & Jobs for full details.