2.3. Getting Started

This section walks through creating, installing, and running a plugin end to end.

2.3.1. Prerequisites

  • SSH access to a Ngenea Worker host

  • Write permissions to /var/lib/ngenea-worker/plugins

  • Permission to restart the ngenea-worker service

  • enable_plugins=true set in /etc/ngenea/ngenea-worker.conf

  • Access to the Ngenea Hub REST API for creating workflows

2.3.2. Scaffold the Plugin

On a Worker host, run:

ngenea-worker plugins create hello_world

This creates the following structure:

/var/lib/ngenea-worker/plugins/hello_world/
├── setup.py                    # Package metadata and entry points
├── constraints.txt             # Critical package version constraints (read-only reference)
└── hello_world/
    ├── __init__.py
    └── hello_world.py          # Your task implementation goes here

2.3.3. Implement the Task

Edit /var/lib/ngenea-worker/plugins/hello_world/hello_world/hello_world.py:

import logging
from typing import List, Dict, Any

from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name

logger = logging.getLogger(__name__)


@app.task(bind=True, name="dynamo.custom.hello_world")
def hello_world(self, *args, paths: List[Dict[str, Any]] = None, jobid: int = None, **kwargs):
    """A minimal plugin that logs each path and marks it as processed."""
    status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())

    for item in paths:
        logger.info("Hello from plugin! Processing: %s", item["path"])
        status.add("processed", item["path"])

    return status.dump()

2.3.4. Configure the Entry Point

Edit /var/lib/ngenea-worker/plugins/hello_world/setup.py:

from setuptools import setup, find_packages

setup(
    name="hello_world",
    version="1.0.0",
    packages=find_packages(),
    install_requires=[
        # The template may include celery here — do not modify its version.
        # Add your external dependencies below.
    ],
    entry_points={
        "worker_plugin": [
            "hello_world = hello_world.hello_world:hello_world",
        ],
    },
)

The entry point format is: alias = package.module:function

  • alias: A human-readable name used by the Worker to register the task.

  • package.module: The dotted Python import path to the module containing the task function.

  • function: The name of the decorated task function.

2.3.5. Install and Activate

# Install the plugin into the Worker's Python environment
ngenea-worker plugins install hello_world

# Restart the Worker to load the new task
# IMPORTANT: Check for active jobs first — restart will wait for upto 900s for tasks to complete
systemctl restart ngenea-worker

# Verify it appears in the plugin list
ngenea-worker plugins list

2.3.6. Create a Workflow

Custom workflow definitions are created via the REST API at POST /api/workflows/ (see Workflows API). They are not currently editable via the Ngenea Hub UI.

{
    "name": "hello_test",
    "label": "Hello World Test",
    "icon_classes": [
        "fa fa-plug fa-stack-2x text-primary"
    ],
    "discovery": null,
    "enabled": true,
    "visible": true,
    "fields": [],
    "filter_rules": [
        {
            "type": "all",
            "state": "all",
            "action": [
                {
                    "name": "dynamo.custom.hello_world"
                }
            ]
        }
    ]
}

2.3.7. Run It

Once created, the workflow appears in the Ngenea Hub UI. Select files in the file browser, choose the “Hello World Test” workflow, and submit.

Alternatively, execute via the API with POST /api/file/workflow:

{
    "paths": ["/mmfs1/data/sample_data/file1.dat"],
    "site": "london",
    "workflow": "hello_test",
    "fields": {}
}

The job progress view will show your paths being marked as “processed”.