2.1. Quick Start¶
This page gets you from zero to a working plugin in minutes. For full details on each topic, follow the cross-references to the relevant sections.
2.1.1. Prerequisites¶
SSH access to a Ngenea Worker host
Write access to
/var/lib/ngenea-worker/pluginsPermission to restart the ngenea-worker service
enable_plugins=trueset in/etc/ngenea/ngenea-worker.conf
2.1.2. 1. Scaffold¶
ngenea-worker plugins create my_plugin
Creates:
/var/lib/ngenea-worker/plugins/my_plugin/
├── setup.py
├── constraints.txt
└── my_plugin/
├── __init__.py
└── my_plugin.py <-- your code goes here
2.1.3. 2. Write Your Task¶
Edit my_plugin/my_plugin.py:
import logging
from typing import List, Dict, Any
from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name
logger = logging.getLogger(__name__)
@app.task(bind=True, name="dynamo.custom.my_plugin")
def my_plugin(
self, *args,
paths: List[Dict[str, Any]] = None,
jobid: int = None,
**kwargs,
):
# 1. Create a status tracker
status = FileActionStatus(
self.name, paths, jobid,
get_current_task_queue_name()
)
# 2. Process each path
for item in paths:
try:
logger.info("Processing %s", item["path"])
# --- YOUR LOGIC HERE ---
status.add("processed", item["path"])
except Exception as e:
status.add("failures", item["path"], message=str(e))
# 3. Return results to Hub
return status.dump()
2.1.3.1. The Rules¶
Rule |
Why |
|---|---|
Decorator must have |
Gives access to |
Name must be |
Ngenea Hub uses this exact string to route the task. |
Must accept |
This is the data contract with the DAG engine. See The Task Contract. |
Must return |
Ngenea Hub needs this structure to update the Job Progress UI. |
Every path must be categorised |
Use |
2.1.4. 3. Configure the Entry Point¶
Edit setup.py:
from setuptools import setup, find_packages
setup(
name="my_plugin",
version="1.0.0",
packages=find_packages(),
install_requires=[
# The template may include celery here — do not modify its version.
# Add your external dependencies below.
],
entry_points={
"worker_plugin": [
"my_plugin = my_plugin.my_plugin:my_plugin",
],
},
)
Entry point format: alias = package.module:function
2.1.5. 4. Install and Restart¶
ngenea-worker plugins install my_plugin
systemctl restart ngenea-worker
# Verify it's loaded
ngenea-worker plugins list
2.1.6. 5. Create a Workflow¶
Custom workflows are created via the REST API at POST /api/workflows/ (see API Endpoints - Workflows, Automation & Jobs). Minimal JSON:
{
"name": "my_workflow",
"label": "My Custom Workflow",
"icon_classes": ["fa fa-plug fa-stack-2x text-primary"],
"discovery": null,
"enabled": true,
"visible": true,
"fields": [],
"filter_rules": [
{
"type": "all",
"state": "all",
"action": [
{"name": "dynamo.custom.my_plugin"}
]
}
]
}
The workflow appears in the Ngenea Hub UI. Select files, choose your workflow, and run it. Or execute via the API with POST /api/file/workflow.
2.1.7. Passing Parameters¶
Define fields in the workflow and reference them with *field_name syntax:
{
"fields": [
{"name": "dest", "label": "Destination", "type": "string"}
],
"filter_rules": [
{
"type": "all",
"state": "all",
"action": [
{
"name": "dynamo.custom.my_plugin",
"destination": "*dest"
}
]
}
]
}
The value arrives in your task’s **kwargs:
def my_plugin(self, *args, paths=None, jobid=None, **kwargs):
destination = kwargs.get("destination")
You can also hardcode values directly in the action object:
{"name": "dynamo.custom.my_plugin", "retries": 3, "mode": "fast"}
For all field types and advanced workflow configuration, see Workflows.
2.1.8. Chaining Tasks¶
List multiple tasks in the action array. Output paths from one task feed into the next:
"action": [
{"name": "dynamo.tasks.migrate"},
{"name": "dynamo.custom.my_plugin"},
{"name": "dynamo.custom.email_staff",
"staff_members": ["admin@org.com"]}
]
Flow: migrate passes succeeded paths to my_plugin, which passes its succeeded paths to email_staff.
2.1.9. File States Reference¶
State |
Meaning |
Passed to next task? |
|---|---|---|
|
Successfully handled. |
Yes |
|
Already in desired state. |
Yes |
|
Error occurred. |
No |
|
Intentionally excluded. |
No |
2.1.10. Updating Your Plugin¶
Edit the source code.
Bump the version in
setup.py(the Worker skips unchanged versions).Reinstall and restart:
ngenea-worker plugins install my_plugin
systemctl restart ngenea-worker
2.1.11. Troubleshooting Cheat Sheet¶
Problem |
Fix |
|---|---|
Task stuck in Pending |
Ensure your function returns |
Plugin not in |
Check entry point format in |
Workflow says task not found |
Verify the task |
Dependency conflict on install |
Do not modify the |
View Worker logs:
journalctl -u ngenea-worker
For the full troubleshooting guide, see Troubleshooting.
2.1.12. What’s Next?¶
This quick start covers the essentials. The rest of this guide covers:
Architecture Overview — architecture and key concepts
The Task Contract — full task contract and return payload details
Workflows — filter rules, branching, fields, and the
*field_namesyntaxAdvanced Topics — dynamic tasks, sub-DAGs, streaming results, queue management
Examples — complete worked examples with workflow definitions