5.3.1. Creating your plugin

Ngenea Hub allows the integration of custom Celery tasks through a plugin system. These plugins are Python modules recognized by the Ngenea Worker and can be used within defined workflows in Ngenea Hub.

5.3.1.1. Creating a Custom Plugin: echo_args

Ngenea Hub supports plugin development through its CLI, allowing users to extend functionality via custom task plugins. This section walks through creating a sample plugin named echo_args, which simply returns the arguments provided to it.

Plugin Directory

All custom plugins must reside in the following directory for correct installation and execution:

/var/lib/ngenea-worker/plugins

Each plugin will persist in this location, and Ngenea Hub will reference this path to discover and load available plugins.

Step 1: Create the Plugin Template

To generate the basic structure for the echo_args plugin, use the following command:

ngenea-worker plugins create echo_args

This command creates the plugin at: /var/lib/ngenea-worker/plugins/echo_args.

Inside this directory, you will find a Python module at: /var/lib/ngenea-worker/plugins/echo_args/echo_args/echo_args.py.

This module contains a default task function named example_task, which we will customize in the next step.

Step 2: Implement the Plugin Logic

The default example_task function is provided as a template. Here’s the original sample code:

from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name

@app.task(bind=True, name="dynamo.custom.example_task")
def example_task(self, *args, paths: List = None, jobid: int = None, **kwargs):
    print(paths)
    print(jobid)
    status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
    for path in paths:
        status.add("processed", path["path"])
    return status.dump()

For plugins that may spawn dynamic children, also consider including the will_spawn_children field in your task results to ensure proper job completion tracking. See the “Dynamic Children and Job Completion” section in the Custom Tasks documentation.

Important

Custom tasks must return their results using the FileActionStatus object, as shown above. Failure to follow this return structure will result in the task being stuck in a Pending state, with tracebacks visible in the Ngenea Hub logs.

Refer to Custom Tasks > Return Payload for required return formats.

Step 3: Modify the Task for echo_args

Update the function and task name to reflect the custom behavior of our plugin. The revised implementation should look like this:

Note

The name provided to app.task will be the task name that is used to call the task within a workflow.

from arcapix.dynamo.server.celeryapp import app
from arcapix.dynamo.server.status import FileActionStatus
from arcapix.dynamo.server.queue import get_current_task_queue_name

@app.task(bind=True, name="dynamo.custom.echo_args")
def echo_args(self, *args, paths: List = None, jobid: int = None, **kwargs):
    status = FileActionStatus(self.name, paths, jobid, get_current_task_queue_name())
    for path in paths:
        status.add("processed", path["path"])
    return status.dump()

Note

The value provided to the name parameter in the @shared_task decorator determines the task name that will be used within workflow definitions.

Step 4: Update the setup.py Entry Point

For Ngenea Hub to recognize and register your custom task, you must define an appropriate entry point in your plugin’s setup.py file. Entry points for custom tasks should be declared under the worker_plugin group.

Original setup:

"worker_plugin": [
    "echo_args=echo_args.echo_args:example_task",
]

Updated setup (to reflect the new function name):

"worker_plugin": [
    "echo_args=echo_args.echo_args:echo_args",
]

5.3.1.2. External Dependencies

If your plugin relies on external Python packages, you can manage these dependencies through the setup.py file, which is automatically included in the plugin template created using:

ngenea-worker plugins create

To specify additional dependencies, add them to the install_requires list within setup.py. This ensures the required packages are installed into the Ngenea Worker environment when the plugin is deployed.

Example:

install_requires = [
    "celery<=5.4",
    "example-module==0.1.0"
]

Note

Do not modify the version or presence of the celery package in this list.

Ngenea Worker enforces constraints on critical packages, such as celery. Altering these packages could lead to compatibility issues or disrupt core Ngenea Worker functionality.

If your plugin requests a dependency which is incompatible with any of these critical packages, you will see an error like below

ERROR: Cannot install example_plugin==0.1.0 because these package versions have conflicting dependencies.

The conflict is caused by:
    example_plugin 0.1.0 depends on celery==5.4
    The user requested (constraint) celery==5.3.6

The user requested (constraint) refers to Ngenea Worker.

Your plugin directory includes a file called constraints.txt, which lists the required critical package versions. This is included for information and testing. Note, editing this file will not change the constraints which are applied when you install a plugin.

By managing dependencies in this way, you ensure plugin compatibility, reduce runtime errors, and maintain seamless integration with the Ngenea Worker framework.