2.1. Quick Start

This page gets you from zero to a working plugin in minutes. For full details on each topic, follow the cross-references to the relevant sections.

2.1.1. Prerequisites

  • SSH access to a Ngenea Worker host

  • Write access to /var/lib/ngenea-worker/plugins

  • Permission to restart the ngenea-worker service

  • enable_plugins=true set in /etc/ngenea/ngenea-worker.conf

2.1.2. 1. Scaffold

ngenea-worker plugins create my_plugin

Creates:

/var/lib/ngenea-worker/plugins/my_plugin/
├── setup.py
├── constraints.txt
└── my_plugin/
    ├── __init__.py
    └── my_plugin.py       <-- your code goes here

2.1.3. 2. Write Your Task

Edit my_plugin/my_plugin.py:

import logging
from typing import List, Dict, Any

from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name

logger = logging.getLogger(__name__)


@app.task(bind=True, name="dynamo.custom.my_plugin")
def my_plugin(
    self, *args,
    paths: List[Dict[str, Any]] = None,
    jobid: int = None,
    **kwargs,
):
    # 1. Create a status tracker
    status = FileActionStatus(
        self.name, paths, jobid,
        get_current_task_queue_name()
    )

    # 2. Process each path
    for item in paths:
        try:
            logger.info("Processing %s", item["path"])
            # --- YOUR LOGIC HERE ---
            status.add("processed", item["path"])
        except Exception as e:
            status.add("failures", item["path"], message=str(e))

    # 3. Return results to Hub
    return status.dump()

2.1.3.1. The Rules

Rule

Why

Decorator must have bind=True

Gives access to self for task metadata.

Name must be dynamo.custom.<name>

Ngenea Hub uses this exact string to route the task.

Must accept paths, jobid, **kwargs

This is the data contract with the DAG engine. See The Task Contract.

Must return status.dump()

Ngenea Hub needs this structure to update the Job Progress UI.

Every path must be categorised

Use processed, skipped, failures, or aborted.

2.1.4. 3. Configure the Entry Point

Edit setup.py:

from setuptools import setup, find_packages

setup(
    name="my_plugin",
    version="1.0.0",
    packages=find_packages(),
    install_requires=[
        # The template may include celery here — do not modify its version.
        # Add your external dependencies below.
    ],
    entry_points={
        "worker_plugin": [
            "my_plugin = my_plugin.my_plugin:my_plugin",
        ],
    },
)

Entry point format: alias = package.module:function

2.1.5. 4. Install and Restart

ngenea-worker plugins install my_plugin
systemctl restart ngenea-worker

# Verify it's loaded
ngenea-worker plugins list

2.1.6. 5. Create a Workflow

Custom workflows are created via the REST API at POST /api/workflows/ (see API Endpoints - Workflows, Automation & Jobs). Minimal JSON:

{
    "name": "my_workflow",
    "label": "My Custom Workflow",
    "icon_classes": ["fa fa-plug fa-stack-2x text-primary"],
    "discovery": null,
    "enabled": true,
    "visible": true,
    "fields": [],
    "filter_rules": [
        {
            "type": "all",
            "state": "all",
            "action": [
                {"name": "dynamo.custom.my_plugin"}
            ]
        }
    ]
}

The workflow appears in the Ngenea Hub UI. Select files, choose your workflow, and run it. Or execute via the API with POST /api/file/workflow.

2.1.7. Passing Parameters

Define fields in the workflow and reference them with *field_name syntax:

{
    "fields": [
        {"name": "dest", "label": "Destination", "type": "string"}
    ],
    "filter_rules": [
        {
            "type": "all",
            "state": "all",
            "action": [
                {
                    "name": "dynamo.custom.my_plugin",
                    "destination": "*dest"
                }
            ]
        }
    ]
}

The value arrives in your task’s **kwargs:

def my_plugin(self, *args, paths=None, jobid=None, **kwargs):
    destination = kwargs.get("destination")

You can also hardcode values directly in the action object:

{"name": "dynamo.custom.my_plugin", "retries": 3, "mode": "fast"}

For all field types and advanced workflow configuration, see Workflows.

2.1.8. Chaining Tasks

List multiple tasks in the action array. Output paths from one task feed into the next:

"action": [
    {"name": "dynamo.tasks.migrate"},
    {"name": "dynamo.custom.my_plugin"},
    {"name": "dynamo.custom.email_staff",
     "staff_members": ["admin@org.com"]}
]

Flow: migrate passes succeeded paths to my_plugin, which passes its succeeded paths to email_staff.

2.1.9. File States Reference

State

Meaning

Passed to next task?

processed

Successfully handled.

Yes

skipped

Already in desired state.

Yes

failures

Error occurred.

No

aborted

Intentionally excluded.

No

2.1.10. Updating Your Plugin

  1. Edit the source code.

  2. Bump the version in setup.py (the Worker skips unchanged versions).

  3. Reinstall and restart:

ngenea-worker plugins install my_plugin
systemctl restart ngenea-worker

2.1.11. Troubleshooting Cheat Sheet

Problem

Fix

Task stuck in Pending

Ensure your function returns status.dump(). Check Worker logs for tracebacks.

Plugin not in ngenea-worker plugins list

Check entry point format in setup.py. Re-run install and restart. Check the plugin is importable. This can be done by running a python interpreter in the worker environment by calling ngenea-worker -d and then trying to import your plugin import my.worker.plugin

Workflow says task not found

Verify the task name in your decorator matches the workflow JSON exactly. Restart the Worker.

Dependency conflict on install

Do not modify the celery version in setup.py. Check constraints.txt.

View Worker logs:

journalctl -u ngenea-worker

For the full troubleshooting guide, see Troubleshooting.

2.1.12. What’s Next?

This quick start covers the essentials. The rest of this guide covers: